diff --git a/src/lib/types/alpaca/api/incoming/bar.rs b/src/lib/types/alpaca/api/incoming/bar.rs index e77afdf..ece3265 100644 --- a/src/lib/types/alpaca/api/incoming/bar.rs +++ b/src/lib/types/alpaca/api/incoming/bar.rs @@ -7,7 +7,6 @@ use serde::Deserialize; use std::{collections::HashMap, time::Duration}; use time::OffsetDateTime; - #[derive(Deserialize)] pub struct Bar { #[serde(rename = "t")] diff --git a/src/lib/types/alpaca/mod.rs b/src/lib/types/alpaca/mod.rs index eaf5691..e4e0157 100644 --- a/src/lib/types/alpaca/mod.rs +++ b/src/lib/types/alpaca/mod.rs @@ -1,4 +1,3 @@ pub mod api; pub mod shared; pub mod websocket; - diff --git a/src/routes/assets.rs b/src/routes/assets.rs index f24170b..4b2e057 100644 --- a/src/routes/assets.rs +++ b/src/routes/assets.rs @@ -87,8 +87,9 @@ pub async fn add( .map(|asset| (asset.symbol.clone(), asset)) .collect::>(); + let num_symbols = request.symbols.len(); let (assets, skipped, failed) = request.symbols.into_iter().fold( - (vec![], vec![], vec![]), + (Vec::with_capacity(num_symbols), vec![], vec![]), |(mut assets, mut skipped, mut failed), symbol| { if database_symbols.contains(&symbol) { skipped.push(symbol); diff --git a/src/threads/data/backfill/bars.rs b/src/threads/data/backfill/bars.rs index 0767f20..61ec584 100644 --- a/src/threads/data/backfill/bars.rs +++ b/src/threads/data/backfill/bars.rs @@ -4,7 +4,6 @@ use crate::{ database, }; use async_trait::async_trait; -use itertools::{Either, Itertools}; use log::{error, info}; use qrust::{ types::{ @@ -116,7 +115,7 @@ impl super::Handler for Handler { info!("Backfilling bars for {:?}.", symbols); - let mut bars = vec![]; + let mut bars = Vec::with_capacity(database::bars::BATCH_FLUSH_SIZE); let mut last_times = symbols .iter() .map(|symbol| (symbol.clone(), None)) @@ -163,44 +162,39 @@ impl super::Handler for Handler { ) .await .unwrap(); - bars = vec![]; - } - if message.next_page_token.is_none() { - break; - } - next_page_token = message.next_page_token; - } + let backfilled = last_times + .into_iter() + .filter_map(|(symbol, time)| { + if let Some(time) = time { + return Some(Backfill { symbol, time }); + } + None + }) + .collect::>(); - let (backfilled, skipped): (Vec<_>, Vec<_>) = - last_times.into_iter().partition_map(|(symbol, time)| { - if let Some(time) = time { - Either::Left(Backfill { symbol, time }) - } else { - Either::Right(symbol) + database::backfills_bars::upsert_batch( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + &backfilled, + ) + .await + .unwrap(); + + if message.next_page_token.is_none() { + break; } - }); - database::backfills_bars::upsert_batch( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &backfilled, - ) - .await - .unwrap(); - - let backfilled = backfilled - .into_iter() - .map(|backfill| backfill.symbol) - .collect::>(); - - if !skipped.is_empty() { - info!("No bars to backfill for {:?}.", skipped); + next_page_token = message.next_page_token; + bars = Vec::with_capacity(database::bars::BATCH_FLUSH_SIZE); + last_times = symbols + .iter() + .map(|symbol| (symbol.clone(), None)) + .collect::>(); + } } - if !backfilled.is_empty() { - info!("Backfilled bars for {:?}.", backfilled); - } + info!("Backfilled bars for {:?}.", symbols); } fn max_limit(&self) -> i64 { diff --git a/src/threads/data/backfill/mod.rs b/src/threads/data/backfill/mod.rs index ff32276..473e23f 100644 --- a/src/threads/data/backfill/mod.rs +++ b/src/threads/data/backfill/mod.rs @@ -13,7 +13,7 @@ use qrust::{ }, utils::{last_minute, ONE_SECOND}, }; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, hash::Hash, sync::Arc}; use time::OffsetDateTime; use tokio::{ spawn, @@ -82,10 +82,8 @@ impl Jobs { self.uuid_to_job.insert(uuid, fut); } - pub fn get(&self, symbol: &str) -> Option<&JoinHandle<()>> { - self.symbol_to_uuid - .get(symbol) - .and_then(|uuid| self.uuid_to_job.get(uuid)) + pub fn contains_key(&self, symbol: &str) -> bool { + self.symbol_to_uuid.contains_key(symbol) } pub fn remove(&mut self, symbol: &str) -> Option> { @@ -93,6 +91,17 @@ impl Jobs { .remove(symbol) .and_then(|uuid| self.uuid_to_job.remove(&uuid)) } + + pub fn remove_many(&mut self, symbols: &[T]) + where + T: AsRef + Hash + Eq, + { + for symbol in symbols { + self.symbol_to_uuid + .remove(symbol.as_ref()) + .and_then(|uuid| self.uuid_to_job.remove(&uuid)); + } + } } pub async fn run(handler: Arc>, mut receiver: mpsc::Receiver) { @@ -116,6 +125,7 @@ async fn handle_backfill_message( backfill_jobs: Arc>, message: Message, ) { + let backfill_jobs_clone = backfill_jobs.clone(); let mut backfill_jobs = backfill_jobs.lock().await; match message.action { @@ -131,17 +141,15 @@ async fn handle_backfill_message( .map(|backfill| (backfill.symbol.clone(), backfill)) .collect::>(); - let mut jobs = vec![]; + let mut jobs = Vec::with_capacity(message.symbols.len()); for symbol in message.symbols { - if let Some(job) = backfill_jobs.get(&symbol) { - if !job.is_finished() { - warn!( - "Backfill for {} {} is already running, skipping.", - symbol, log_string - ); - continue; - } + if backfill_jobs.contains_key(&symbol) { + warn!( + "Backfill for {} {} is already running, skipping.", + symbol, log_string + ); + continue; } let fetch_from = backfills @@ -168,7 +176,7 @@ async fn handle_backfill_message( let jobs = jobs .into_iter() - .sorted_by_key(|job| job.1.fetch_from) + .sorted_unstable_by_key(|job| job.1.fetch_from) .collect::>(); let mut job_groups = vec![HashMap::new()]; @@ -194,9 +202,13 @@ async fn handle_backfill_message( let symbols = job_group.keys().cloned().collect::>(); let handler = handler.clone(); + let symbols_clone = symbols.clone(); + let backfill_jobs_clone = backfill_jobs_clone.clone(); + let fut = spawn(async move { handler.queue_backfill(&job_group).await; handler.backfill(job_group).await; + backfill_jobs_clone.lock().await.remove_many(&symbols_clone); }); backfill_jobs.insert(symbols, fut); @@ -205,9 +217,7 @@ async fn handle_backfill_message( Action::Purge => { for symbol in &message.symbols { if let Some(job) = backfill_jobs.remove(symbol) { - if !job.is_finished() { - job.abort(); - } + job.abort(); let _ = job.await; } } diff --git a/src/threads/data/backfill/news.rs b/src/threads/data/backfill/news.rs index e57e654..ffc2486 100644 --- a/src/threads/data/backfill/news.rs +++ b/src/threads/data/backfill/news.rs @@ -5,7 +5,6 @@ use crate::{ }; use async_trait::async_trait; use futures_util::future::join_all; -use itertools::{Either, Itertools}; use log::{error, info}; use qrust::{ types::{ @@ -78,7 +77,7 @@ impl super::Handler for Handler { info!("Backfilling news for {:?}.", symbols); - let mut news = vec![]; + let mut news = Vec::with_capacity(database::news::BATCH_FLUSH_SIZE); let mut last_times = symbols .iter() .map(|symbol| (symbol.clone(), None)) @@ -160,44 +159,39 @@ impl super::Handler for Handler { ) .await .unwrap(); - news = vec![]; - } - if message.next_page_token.is_none() { - break; - } - next_page_token = message.next_page_token; - } + let backfilled = last_times + .into_iter() + .filter_map(|(symbol, time)| { + if let Some(time) = time { + return Some(Backfill { symbol, time }); + } + None + }) + .collect::>(); - let (backfilled, skipped): (Vec<_>, Vec<_>) = - last_times.into_iter().partition_map(|(symbol, time)| { - if let Some(time) = time { - Either::Left(Backfill { symbol, time }) - } else { - Either::Right(symbol) + database::backfills_news::upsert_batch( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + &backfilled, + ) + .await + .unwrap(); + + if message.next_page_token.is_none() { + break; } - }); - database::backfills_news::upsert_batch( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &backfilled, - ) - .await - .unwrap(); - - let backfilled = backfilled - .into_iter() - .map(|backfill| backfill.symbol) - .collect::>(); - - if !skipped.is_empty() { - info!("No news to backfill for {:?}.", skipped); + next_page_token = message.next_page_token; + news = Vec::with_capacity(database::news::BATCH_FLUSH_SIZE); + last_times = symbols + .iter() + .map(|symbol| (symbol.clone(), None)) + .collect::>(); + } } - if !backfilled.is_empty() { - info!("Backfilled news for {:?}.", backfilled); - } + info!("Backfilled news for {:?}.", symbols); } fn max_limit(&self) -> i64 { diff --git a/src/threads/data/mod.rs b/src/threads/data/mod.rs index 15fc7f7..1b99bda 100644 --- a/src/threads/data/mod.rs +++ b/src/threads/data/mod.rs @@ -245,7 +245,7 @@ async fn handle_message( let (mut assets, mut positions) = join!(assets, positions); - let mut batch = vec![]; + let mut batch = Vec::with_capacity(symbols.len()); for symbol in &symbols { if let Some(asset) = assets.remove(symbol) {