Foldify for loops
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
@@ -178,26 +178,25 @@ async fn handle_message(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let jobs = jobs
|
let mut current_minutes = 0;
|
||||||
|
let job_groups = jobs
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.sorted_unstable_by_key(|job| job.fetch_from)
|
.sorted_unstable_by_key(|job| job.fetch_from)
|
||||||
.collect::<Vec<_>>();
|
.fold(Vec::<NonEmpty<Job>>::new(), |mut job_groups, job| {
|
||||||
|
let minutes = (job.fetch_to - job.fetch_from).whole_minutes();
|
||||||
|
|
||||||
let mut job_groups: Vec<NonEmpty<Job>> = vec![];
|
if let Some(job_group) = job_groups.last_mut() {
|
||||||
let mut current_minutes = 0;
|
if current_minutes + minutes <= max_limit {
|
||||||
|
job_group.push(job);
|
||||||
|
current_minutes += minutes;
|
||||||
|
return job_groups;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for job in jobs {
|
|
||||||
let minutes = (job.fetch_to - job.fetch_from).whole_minutes();
|
|
||||||
|
|
||||||
if job_groups.last().is_some() && current_minutes + minutes <= max_limit {
|
|
||||||
let job_group = job_groups.last_mut().unwrap();
|
|
||||||
job_group.push(job);
|
|
||||||
current_minutes += minutes;
|
|
||||||
} else {
|
|
||||||
job_groups.push(nonempty![job]);
|
job_groups.push(nonempty![job]);
|
||||||
current_minutes = minutes;
|
current_minutes = minutes;
|
||||||
}
|
job_groups
|
||||||
}
|
});
|
||||||
|
|
||||||
for job_group in job_groups {
|
for job_group in job_groups {
|
||||||
let symbols = job_group
|
let symbols = job_group
|
||||||
|
@@ -236,16 +236,18 @@ async fn handle_message(
|
|||||||
|
|
||||||
let (mut assets, mut positions) = join!(assets, positions);
|
let (mut assets, mut positions) = join!(assets, positions);
|
||||||
|
|
||||||
let mut batch = Vec::with_capacity(symbols.len());
|
let batch =
|
||||||
|
symbols
|
||||||
for symbol in &symbols {
|
.iter()
|
||||||
if let Some(asset) = assets.remove(symbol) {
|
.fold(Vec::with_capacity(symbols.len()), |mut batch, symbol| {
|
||||||
let position = positions.remove(symbol);
|
if let Some(asset) = assets.remove(symbol) {
|
||||||
batch.push(Asset::from((asset, position)));
|
let position = positions.remove(symbol);
|
||||||
} else {
|
batch.push(Asset::from((asset, position)));
|
||||||
error!("Failed to find asset for symbol: {}.", symbol);
|
} else {
|
||||||
}
|
error!("Failed to find asset for symbol: {}.", symbol);
|
||||||
}
|
}
|
||||||
|
batch
|
||||||
|
});
|
||||||
|
|
||||||
database::assets::upsert_batch(
|
database::assets::upsert_batch(
|
||||||
&config.clickhouse_client,
|
&config.clickhouse_client,
|
||||||
|
Reference in New Issue
Block a user