From d02f958865677ae945b43b3ae49de79e1bafabcd Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Mon, 11 Mar 2024 20:41:59 +0000 Subject: [PATCH] Optimize backfill early saving allocations Signed-off-by: Nikolaos Karaolidis --- src/threads/data/backfill/bars.rs | 22 +++++----------------- src/threads/data/backfill/news.rs | 26 +++++++------------------- 2 files changed, 12 insertions(+), 36 deletions(-) diff --git a/src/threads/data/backfill/bars.rs b/src/threads/data/backfill/bars.rs index 0486ea7..0d82359 100644 --- a/src/threads/data/backfill/bars.rs +++ b/src/threads/data/backfill/bars.rs @@ -120,10 +120,7 @@ impl super::Handler for Handler { info!("Backfilling bars for {:?}.", symbols); let mut bars = Vec::with_capacity(database::bars::BATCH_FLUSH_SIZE); - let mut last_times = symbols - .iter() - .map(|symbol| (symbol.clone(), None)) - .collect::>(); + let mut last_times = HashMap::new(); let mut next_page_token = None; loop { @@ -150,7 +147,7 @@ impl super::Handler for Handler { for (symbol, bar_vec) in message.bars { if let Some(last) = bar_vec.last() { - last_times.insert(symbol.clone(), Some(last.time)); + last_times.insert(symbol.clone(), last.time); } for bar in bar_vec { @@ -168,13 +165,8 @@ impl super::Handler for Handler { .unwrap(); let backfilled = last_times - .into_iter() - .filter_map(|(symbol, time)| { - if let Some(time) = time { - return Some(Backfill { symbol, time }); - } - None - }) + .drain() + .map(|(symbol, time)| Backfill { symbol, time }) .collect::>(); database::backfills_bars::upsert_batch( @@ -190,11 +182,7 @@ impl super::Handler for Handler { } next_page_token = message.next_page_token; - bars = Vec::with_capacity(database::bars::BATCH_FLUSH_SIZE); - last_times = symbols - .iter() - .map(|symbol| (symbol.clone(), None)) - .collect::>(); + bars.clear(); } } diff --git a/src/threads/data/backfill/news.rs b/src/threads/data/backfill/news.rs index 0bb4d16..4f7e23d 100644 --- a/src/threads/data/backfill/news.rs +++ b/src/threads/data/backfill/news.rs @@ -69,23 +69,20 @@ impl super::Handler for Handler { sleep(run_delay).await; } - #[allow(clippy::too_many_lines)] async fn backfill(&self, jobs: HashMap) { if jobs.is_empty() { return; } let symbols = jobs.keys().cloned().collect::>(); + let symbols_set = symbols.iter().collect::>(); let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); info!("Backfilling news for {:?}.", symbols); let mut news = Vec::with_capacity(database::news::BATCH_FLUSH_SIZE); - let mut last_times = symbols - .iter() - .map(|symbol| (symbol.clone(), None)) - .collect::>(); + let mut last_times = HashMap::new(); let mut next_page_token = None; loop { @@ -115,8 +112,8 @@ impl super::Handler for Handler { let news_item = News::from(news_item); for symbol in &news_item.symbols { - if last_times.contains_key(symbol) { - last_times.insert(symbol.clone(), Some(news_item.time_created)); + if symbols_set.contains(symbol) { + last_times.insert(symbol.clone(), news_item.time_created); } } @@ -165,13 +162,8 @@ impl super::Handler for Handler { .unwrap(); let backfilled = last_times - .into_iter() - .filter_map(|(symbol, time)| { - if let Some(time) = time { - return Some(Backfill { symbol, time }); - } - None - }) + .drain() + .map(|(symbol, time)| Backfill { symbol, time }) .collect::>(); database::backfills_news::upsert_batch( @@ -187,11 +179,7 @@ impl super::Handler for Handler { } next_page_token = message.next_page_token; - news = Vec::with_capacity(database::news::BATCH_FLUSH_SIZE); - last_times = symbols - .iter() - .map(|symbol| (symbol.clone(), None)) - .collect::>(); + news.clear(); } }