diff --git a/src/bin/qrust/threads/data/backfill/mod.rs b/src/bin/qrust/threads/data/backfill/mod.rs index 8627022..c2ae668 100644 --- a/src/bin/qrust/threads/data/backfill/mod.rs +++ b/src/bin/qrust/threads/data/backfill/mod.rs @@ -178,26 +178,25 @@ async fn handle_message( }); } - let jobs = jobs + let mut current_minutes = 0; + let job_groups = jobs .into_iter() .sorted_unstable_by_key(|job| job.fetch_from) - .collect::>(); + .fold(Vec::>::new(), |mut job_groups, job| { + let minutes = (job.fetch_to - job.fetch_from).whole_minutes(); - let mut job_groups: Vec> = vec![]; - let mut current_minutes = 0; + if let Some(job_group) = job_groups.last_mut() { + if current_minutes + minutes <= max_limit { + job_group.push(job); + current_minutes += minutes; + return job_groups; + } + } - for job in jobs { - let minutes = (job.fetch_to - job.fetch_from).whole_minutes(); - - if job_groups.last().is_some() && current_minutes + minutes <= max_limit { - let job_group = job_groups.last_mut().unwrap(); - job_group.push(job); - current_minutes += minutes; - } else { job_groups.push(nonempty![job]); current_minutes = minutes; - } - } + job_groups + }); for job_group in job_groups { let symbols = job_group diff --git a/src/bin/qrust/threads/data/mod.rs b/src/bin/qrust/threads/data/mod.rs index 2fbc46f..5814564 100644 --- a/src/bin/qrust/threads/data/mod.rs +++ b/src/bin/qrust/threads/data/mod.rs @@ -236,16 +236,18 @@ async fn handle_message( let (mut assets, mut positions) = join!(assets, positions); - let mut batch = Vec::with_capacity(symbols.len()); - - for symbol in &symbols { - if let Some(asset) = assets.remove(symbol) { - let position = positions.remove(symbol); - batch.push(Asset::from((asset, position))); - } else { - error!("Failed to find asset for symbol: {}.", symbol); - } - } + let batch = + symbols + .iter() + .fold(Vec::with_capacity(symbols.len()), |mut batch, symbol| { + if let Some(asset) = assets.remove(symbol) { + let position = positions.remove(symbol); + batch.push(Asset::from((asset, position))); + } else { + error!("Failed to find asset for symbol: {}.", symbol); + } + batch + }); database::assets::upsert_batch( &config.clickhouse_client,