diff --git a/Cargo.lock b/Cargo.lock index 23619dc..55e34aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1098,6 +1098,15 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" +[[package]] +name = "nonempty" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "303e8749c804ccd6ca3b428de7fe0d86cb86bc7606bc15291f100fd487960bb8" +dependencies = [ + "serde", +] + [[package]] name = "nonzero_ext" version = "0.3.0" @@ -1316,6 +1325,7 @@ dependencies = [ "lazy_static", "log", "log4rs", + "nonempty", "regex", "reqwest", "serde", diff --git a/Cargo.toml b/Cargo.toml index 50faff8..0d64967 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,3 +68,6 @@ regex = "1.10.3" async-trait = "0.1.77" itertools = "0.12.1" lazy_static = "1.4.0" +nonempty = { version = "0.10.0", features = [ + "serialize", +] } diff --git a/src/init.rs b/src/init.rs index 524d8c7..359fc35 100644 --- a/src/init.rs +++ b/src/init.rs @@ -44,21 +44,25 @@ pub async fn rehydrate_orders(config: &Arc) { let mut orders = vec![]; let mut after = OffsetDateTime::UNIX_EPOCH; - while let Some(message) = alpaca::orders::get( - &config.alpaca_client, - &config.alpaca_rate_limiter, - &types::alpaca::api::outgoing::order::Order { - status: Some(types::alpaca::api::outgoing::order::Status::All), - after: Some(after), - ..Default::default() - }, - None, - &ALPACA_API_BASE, - ) - .await - .ok() - .filter(|message| !message.is_empty()) - { + loop { + let message = alpaca::orders::get( + &config.alpaca_client, + &config.alpaca_rate_limiter, + &types::alpaca::api::outgoing::order::Order { + status: Some(types::alpaca::api::outgoing::order::Status::All), + after: Some(after), + ..Default::default() + }, + None, + &ALPACA_API_BASE, + ) + .await + .unwrap(); + + if message.is_empty() { + break; + } + orders.extend(message); after = orders.last().unwrap().submitted_at; } diff --git a/src/lib/alpaca/account.rs b/src/lib/alpaca/account.rs index 6d36936..92a5588 100644 --- a/src/lib/alpaca/account.rs +++ b/src/lib/alpaca/account.rs @@ -1,4 +1,4 @@ -use super::{error_to_backoff, status_error_to_backoff}; +use super::error_to_backoff; use crate::types::alpaca::api::incoming::account::Account; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -22,7 +22,7 @@ pub async fn get( .await .map_err(error_to_backoff)? .error_for_status() - .map_err(status_error_to_backoff)? + .map_err(error_to_backoff)? .json::() .await .map_err(error_to_backoff) diff --git a/src/lib/alpaca/assets.rs b/src/lib/alpaca/assets.rs index 8310ee3..3de0fbf 100644 --- a/src/lib/alpaca/assets.rs +++ b/src/lib/alpaca/assets.rs @@ -1,4 +1,4 @@ -use super::{error_to_backoff, status_error_to_backoff}; +use super::error_to_backoff; use crate::types::alpaca::api::{ incoming::asset::{Asset, Class}, outgoing, @@ -29,7 +29,7 @@ pub async fn get( .await .map_err(error_to_backoff)? .error_for_status() - .map_err(status_error_to_backoff)? + .map_err(error_to_backoff)? .json::>() .await .map_err(error_to_backoff) @@ -65,7 +65,7 @@ pub async fn get_by_symbol( .await .map_err(error_to_backoff)? .error_for_status() - .map_err(status_error_to_backoff)? + .map_err(error_to_backoff)? .json::() .await .map_err(error_to_backoff) @@ -88,6 +88,10 @@ pub async fn get_by_symbols( backoff: Option, api_base: &str, ) -> Result, Error> { + if symbols.is_empty() { + return Ok(vec![]); + } + if symbols.len() == 1 { let asset = get_by_symbol(client, rate_limiter, &symbols[0], backoff, api_base).await?; return Ok(vec![asset]); diff --git a/src/lib/alpaca/bars.rs b/src/lib/alpaca/bars.rs index b5f72e9..48d49ab 100644 --- a/src/lib/alpaca/bars.rs +++ b/src/lib/alpaca/bars.rs @@ -1,4 +1,4 @@ -use super::{error_to_backoff, status_error_to_backoff}; +use super::error_to_backoff; use crate::types::alpaca::api::{incoming::bar::Bar, outgoing}; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -33,7 +33,7 @@ pub async fn get( .await .map_err(error_to_backoff)? .error_for_status() - .map_err(status_error_to_backoff)? + .map_err(error_to_backoff)? .json::() .await .map_err(error_to_backoff) diff --git a/src/lib/alpaca/calendar.rs b/src/lib/alpaca/calendar.rs index 789b219..0b29395 100644 --- a/src/lib/alpaca/calendar.rs +++ b/src/lib/alpaca/calendar.rs @@ -1,4 +1,4 @@ -use super::{error_to_backoff, status_error_to_backoff}; +use super::error_to_backoff; use crate::types::alpaca::api::{incoming::calendar::Calendar, outgoing}; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -24,7 +24,7 @@ pub async fn get( .await .map_err(error_to_backoff)? .error_for_status() - .map_err(status_error_to_backoff)? + .map_err(error_to_backoff)? .json::>() .await .map_err(error_to_backoff) diff --git a/src/lib/alpaca/clock.rs b/src/lib/alpaca/clock.rs index 2423c43..925f59c 100644 --- a/src/lib/alpaca/clock.rs +++ b/src/lib/alpaca/clock.rs @@ -1,4 +1,4 @@ -use super::{error_to_backoff, status_error_to_backoff}; +use super::error_to_backoff; use crate::types::alpaca::api::incoming::clock::Clock; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -22,7 +22,7 @@ pub async fn get( .await .map_err(error_to_backoff)? .error_for_status() - .map_err(status_error_to_backoff)? + .map_err(error_to_backoff)? .json::() .await .map_err(error_to_backoff) diff --git a/src/lib/alpaca/mod.rs b/src/lib/alpaca/mod.rs index 65477a9..26d431e 100644 --- a/src/lib/alpaca/mod.rs +++ b/src/lib/alpaca/mod.rs @@ -9,18 +9,13 @@ pub mod positions; use reqwest::StatusCode; -pub fn status_error_to_backoff(err: reqwest::Error) -> backoff::Error { - match err.status() { - Some(StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND) | None => { - backoff::Error::Permanent(err) - } - _ => err.into(), - } -} - pub fn error_to_backoff(err: reqwest::Error) -> backoff::Error { if err.is_status() { - return status_error_to_backoff(err); + return match err.status() { + Some(StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND) + | None => backoff::Error::Permanent(err), + _ => err.into(), + }; } if err.is_builder() || err.is_request() || err.is_redirect() || err.is_decode() || err.is_body() diff --git a/src/lib/alpaca/news.rs b/src/lib/alpaca/news.rs index be1812b..86a3ed2 100644 --- a/src/lib/alpaca/news.rs +++ b/src/lib/alpaca/news.rs @@ -1,4 +1,4 @@ -use super::{error_to_backoff, status_error_to_backoff}; +use super::error_to_backoff; use crate::types::alpaca::api::{incoming::news::News, outgoing, ALPACA_NEWS_DATA_API_URL}; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -32,7 +32,7 @@ pub async fn get( .await .map_err(error_to_backoff)? .error_for_status() - .map_err(status_error_to_backoff)? + .map_err(error_to_backoff)? .json::() .await .map_err(error_to_backoff) diff --git a/src/lib/alpaca/orders.rs b/src/lib/alpaca/orders.rs index 04eccc3..37437b1 100644 --- a/src/lib/alpaca/orders.rs +++ b/src/lib/alpaca/orders.rs @@ -1,4 +1,4 @@ -use super::{error_to_backoff, status_error_to_backoff}; +use super::error_to_backoff; use crate::types::alpaca::{api::outgoing, shared::order}; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -26,7 +26,7 @@ pub async fn get( .await .map_err(error_to_backoff)? .error_for_status() - .map_err(status_error_to_backoff)? + .map_err(error_to_backoff)? .json::>() .await .map_err(error_to_backoff) diff --git a/src/lib/alpaca/positions.rs b/src/lib/alpaca/positions.rs index 6b1da18..143826c 100644 --- a/src/lib/alpaca/positions.rs +++ b/src/lib/alpaca/positions.rs @@ -1,4 +1,4 @@ -use super::{error_to_backoff, status_error_to_backoff}; +use super::error_to_backoff; use crate::types::alpaca::api::incoming::position::Position; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -22,7 +22,7 @@ pub async fn get( .await .map_err(error_to_backoff)? .error_for_status() - .map_err(status_error_to_backoff)? + .map_err(error_to_backoff)? .json::>() .await .map_err(error_to_backoff) @@ -64,7 +64,7 @@ pub async fn get_by_symbol( response .error_for_status() - .map_err(status_error_to_backoff)? + .map_err(error_to_backoff)? .json::() .await .map_err(error_to_backoff) @@ -88,6 +88,10 @@ pub async fn get_by_symbols( backoff: Option, api_base: &str, ) -> Result, reqwest::Error> { + if symbols.is_empty() { + return Ok(vec![]); + } + if symbols.len() == 1 { let position = get_by_symbol(client, rate_limiter, &symbols[0], backoff, api_base).await?; return Ok(position.into_iter().collect()); diff --git a/src/lib/database/calendar.rs b/src/lib/database/calendar.rs index 7001be1..ff6331e 100644 --- a/src/lib/database/calendar.rs +++ b/src/lib/database/calendar.rs @@ -6,14 +6,14 @@ use tokio::{sync::Semaphore, try_join}; optimize!("calendar"); -pub async fn upsert_batch_and_delete<'a, T>( +pub async fn upsert_batch_and_delete<'a, I>( client: &Client, concurrency_limiter: &Arc, - records: T, + records: I, ) -> Result<(), Error> where - T: IntoIterator + Send + Sync + Clone, - T::IntoIter: Send, + I: IntoIterator + Send + Sync + Clone, + I::IntoIter: Send, { let upsert_future = async { let mut insert = client.insert("calendar")?; diff --git a/src/lib/database/mod.rs b/src/lib/database/mod.rs index c4816fd..247de1d 100644 --- a/src/lib/database/mod.rs +++ b/src/lib/database/mod.rs @@ -92,14 +92,14 @@ macro_rules! upsert { #[macro_export] macro_rules! upsert_batch { ($record:ty, $table_name:expr) => { - pub async fn upsert_batch<'a, T>( + pub async fn upsert_batch<'a, I>( client: &clickhouse::Client, concurrency_limiter: &std::sync::Arc, - records: T, + records: I, ) -> Result<(), clickhouse::error::Error> where - T: IntoIterator + Send + Sync, - T::IntoIter: Send, + I: IntoIterator + Send + Sync, + I::IntoIter: Send, { let _ = concurrency_limiter.acquire().await.unwrap(); let mut insert = client.insert($table_name)?; diff --git a/src/lib/types/alpaca/websocket/data/outgoing/subscribe.rs b/src/lib/types/alpaca/websocket/data/outgoing/subscribe.rs index e942d4b..85981f9 100644 --- a/src/lib/types/alpaca/websocket/data/outgoing/subscribe.rs +++ b/src/lib/types/alpaca/websocket/data/outgoing/subscribe.rs @@ -1,4 +1,5 @@ use crate::utils::ser; +use nonempty::NonEmpty; use serde::Serialize; #[derive(Serialize)] @@ -6,14 +7,14 @@ use serde::Serialize; pub enum Market { #[serde(rename_all = "camelCase")] UsEquity { - bars: Vec, - updated_bars: Vec, - statuses: Vec, + bars: NonEmpty, + updated_bars: NonEmpty, + statuses: NonEmpty, }, #[serde(rename_all = "camelCase")] Crypto { - bars: Vec, - updated_bars: Vec, + bars: NonEmpty, + updated_bars: NonEmpty, }, } @@ -23,12 +24,12 @@ pub enum Message { Market(Market), News { #[serde(serialize_with = "ser::remove_slash_from_symbols")] - news: Vec, + news: NonEmpty, }, } impl Message { - pub fn new_market_us_equity(symbols: Vec) -> Self { + pub fn new_market_us_equity(symbols: NonEmpty) -> Self { Self::Market(Market::UsEquity { bars: symbols.clone(), updated_bars: symbols.clone(), @@ -36,14 +37,14 @@ impl Message { }) } - pub fn new_market_crypto(symbols: Vec) -> Self { + pub fn new_market_crypto(symbols: NonEmpty) -> Self { Self::Market(Market::Crypto { bars: symbols.clone(), updated_bars: symbols, }) } - pub fn new_news(symbols: Vec) -> Self { + pub fn new_news(symbols: NonEmpty) -> Self { Self::News { news: symbols } } } diff --git a/src/lib/utils/ser.rs b/src/lib/utils/ser.rs index 42053d9..09a3499 100644 --- a/src/lib/utils/ser.rs +++ b/src/lib/utils/ser.rs @@ -58,12 +58,13 @@ where } } -pub fn remove_slash_from_symbols(pairs: &[String], serializer: S) -> Result +pub fn remove_slash_from_symbols<'a, S, I>(pairs: I, serializer: S) -> Result where S: Serializer, + I: IntoIterator, { let symbols = pairs - .iter() + .into_iter() .map(|pair| remove_slash(pair)) .collect::>(); diff --git a/src/main.rs b/src/main.rs index 3dba781..8012228 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use config::{ use dotenv::dotenv; use log::info; use log4rs::config::Deserializers; +use nonempty::NonEmpty; use qrust::{create_send_await, database}; use tokio::{join, spawn, sync::mpsc, try_join}; @@ -101,12 +102,14 @@ async fn main() { spawn(threads::clock::run(config.clone(), clock_sender)); - create_send_await!( - data_sender, - threads::data::Message::new, - threads::data::Action::Enable, - assets - ); + if let Some(assets) = NonEmpty::from_vec(assets) { + create_send_await!( + data_sender, + threads::data::Message::new, + threads::data::Action::Enable, + assets + ); + } routes::run(config, data_sender).await; } diff --git a/src/routes/assets.rs b/src/routes/assets.rs index 64b1aa2..113afdd 100644 --- a/src/routes/assets.rs +++ b/src/routes/assets.rs @@ -4,6 +4,7 @@ use crate::{ }; use axum::{extract::Path, Extension, Json}; use http::StatusCode; +use nonempty::{nonempty, NonEmpty}; use qrust::{ alpaca, types::{self, Asset}, @@ -113,15 +114,17 @@ pub async fn add( }, ); - create_send_await!( - data_sender, - threads::data::Message::new, - threads::data::Action::Add, - assets.clone() - ); + if let Some(assets) = NonEmpty::from_vec(assets.clone()) { + create_send_await!( + data_sender, + threads::data::Message::new, + threads::data::Action::Add, + assets + ); + } Ok(( - StatusCode::CREATED, + StatusCode::OK, Json(AddAssetsResponse { added: assets.into_iter().map(|asset| asset.0).collect(), skipped, @@ -173,7 +176,7 @@ pub async fn add_symbol( data_sender, threads::data::Message::new, threads::data::Action::Add, - vec![(asset.symbol, asset.class.into())] + nonempty![(asset.symbol, asset.class.into())] ); Ok(StatusCode::CREATED) @@ -197,7 +200,7 @@ pub async fn delete( data_sender, threads::data::Message::new, threads::data::Action::Remove, - vec![(asset.symbol, asset.class)] + nonempty![(asset.symbol, asset.class)] ); Ok(StatusCode::NO_CONTENT) diff --git a/src/threads/data/backfill/bars.rs b/src/threads/data/backfill/bars.rs index 1f03994..c5afc92 100644 --- a/src/threads/data/backfill/bars.rs +++ b/src/threads/data/backfill/bars.rs @@ -6,6 +6,7 @@ use crate::{ }; use async_trait::async_trait; use log::{error, info}; +use nonempty::NonEmpty; use qrust::{ alpaca, types::{ @@ -98,27 +99,27 @@ impl super::Handler for Handler { .await } - async fn queue_backfill(&self, jobs: &HashMap) { - if jobs.is_empty() || *ALPACA_SOURCE == Source::Sip { + async fn queue_backfill(&self, jobs: &NonEmpty) { + if *ALPACA_SOURCE == Source::Sip { return; } - let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); + let fetch_to = jobs.maximum_by_key(|job| job.fetch_to).fetch_to; let run_delay = duration_until(fetch_to + FIFTEEN_MINUTES + ONE_MINUTE); - let symbols = jobs.keys().collect::>(); + let symbols = jobs.iter().map(|job| &job.symbol).collect::>(); info!("Queing bar backfill for {:?} in {:?}.", symbols, run_delay); sleep(run_delay).await; } - async fn backfill(&self, jobs: HashMap) { - if jobs.is_empty() { - return; - } - - let symbols = jobs.keys().cloned().collect::>(); - let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); - let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); + async fn backfill(&self, jobs: NonEmpty) { + let symbols = Vec::from(jobs.clone().map(|job| job.symbol)); + let fetch_from = jobs.minimum_by_key(|job| job.fetch_from).fetch_from; + let fetch_to = jobs.maximum_by_key(|job| job.fetch_to).fetch_to; + let freshness = jobs + .into_iter() + .map(|job| (job.symbol, job.fresh)) + .collect::>(); let mut bars = Vec::with_capacity(*CLICKHOUSE_BATCH_BARS_SIZE); let mut last_times = HashMap::new(); @@ -173,7 +174,7 @@ impl super::Handler for Handler { let backfilled = last_times .drain() .map(|(symbol, time)| Backfill { - fresh: jobs[&symbol].fresh, + fresh: freshness[&symbol], symbol, time, }) diff --git a/src/threads/data/backfill/mod.rs b/src/threads/data/backfill/mod.rs index 3f599f7..b684173 100644 --- a/src/threads/data/backfill/mod.rs +++ b/src/threads/data/backfill/mod.rs @@ -4,6 +4,7 @@ pub mod news; use async_trait::async_trait; use itertools::Itertools; use log::{info, warn}; +use nonempty::{nonempty, NonEmpty}; use qrust::{ types::Backfill, utils::{last_minute, ONE_SECOND}, @@ -25,12 +26,12 @@ pub enum Action { pub struct Message { pub action: Action, - pub symbols: Vec, + pub symbols: NonEmpty, pub response: oneshot::Sender<()>, } impl Message { - pub fn new(action: Action, symbols: Vec) -> (Self, oneshot::Receiver<()>) { + pub fn new(action: Action, symbols: NonEmpty) -> (Self, oneshot::Receiver<()>) { let (sender, receiver) = oneshot::channel::<()>(); ( Self { @@ -45,6 +46,7 @@ impl Message { #[derive(Clone)] pub struct Job { + pub symbol: String, pub fetch_from: OffsetDateTime, pub fetch_to: OffsetDateTime, pub fresh: bool, @@ -58,8 +60,8 @@ pub trait Handler: Send + Sync { ) -> Result, clickhouse::error::Error>; async fn delete_backfills(&self, symbol: &[String]) -> Result<(), clickhouse::error::Error>; async fn delete_data(&self, symbol: &[String]) -> Result<(), clickhouse::error::Error>; - async fn queue_backfill(&self, jobs: &HashMap); - async fn backfill(&self, jobs: HashMap); + async fn queue_backfill(&self, jobs: &NonEmpty); + async fn backfill(&self, jobs: NonEmpty); fn max_limit(&self) -> i64; fn log_string(&self) -> &'static str; } @@ -108,7 +110,7 @@ pub async fn run(handler: Arc>, mut receiver: mpsc::Receiver>, mut receiver: mpsc::Receiver>, backfill_jobs: Arc>, message: Message, ) { let backfill_jobs_clone = backfill_jobs.clone(); let mut backfill_jobs = backfill_jobs.lock().await; + let symbols = Vec::from(message.symbols); match message.action { Action::Backfill => { @@ -130,16 +133,16 @@ async fn handle_backfill_message( let max_limit = handler.max_limit(); let backfills = handler - .select_latest_backfills(&message.symbols) + .select_latest_backfills(&symbols) .await .unwrap() .into_iter() .map(|backfill| (backfill.symbol.clone(), backfill)) .collect::>(); - let mut jobs = Vec::with_capacity(message.symbols.len()); + let mut jobs = Vec::with_capacity(symbols.len()); - for symbol in message.symbols { + for symbol in symbols { if backfill_jobs.contains_key(&symbol) { warn!( "Backfill for {} {} is already running, skipping.", @@ -148,11 +151,11 @@ async fn handle_backfill_message( continue; } - let fetch_from = backfills - .get(&symbol) - .map_or(OffsetDateTime::UNIX_EPOCH, |backfill| { - backfill.time + ONE_SECOND - }); + let backfill = backfills.get(&symbol); + + let fetch_from = backfill.map_or(OffsetDateTime::UNIX_EPOCH, |backfill| { + backfill.time + ONE_SECOND + }); let fetch_to = last_minute(); @@ -161,50 +164,42 @@ async fn handle_backfill_message( return; } - let fresh = backfills - .get(&symbol) - .map_or(false, |backfill| backfill.fresh); + let fresh = backfill.map_or(false, |backfill| backfill.fresh); - jobs.push(( + jobs.push(Job { symbol, - Job { - fetch_from, - fetch_to, - fresh, - }, - )); - } - - if jobs.is_empty() { - return; + fetch_from, + fetch_to, + fresh, + }); } let jobs = jobs .into_iter() - .sorted_unstable_by_key(|job| job.1.fetch_from) + .sorted_unstable_by_key(|job| job.fetch_from) .collect::>(); - let mut job_groups = vec![HashMap::new()]; + let mut job_groups: Vec> = vec![]; let mut current_minutes = 0; for job in jobs { - let minutes = (job.1.fetch_to - job.1.fetch_from).whole_minutes(); + let minutes = (job.fetch_to - job.fetch_from).whole_minutes(); - if job_groups.last().unwrap().is_empty() || (current_minutes + minutes) <= max_limit - { + if job_groups.last().is_some() && current_minutes + minutes <= max_limit { let job_group = job_groups.last_mut().unwrap(); - job_group.insert(job.0, job.1); + job_group.push(job); current_minutes += minutes; } else { - let mut job_group = HashMap::new(); - job_group.insert(job.0, job.1); - job_groups.push(job_group); + job_groups.push(nonempty![job]); current_minutes = minutes; } } for job_group in job_groups { - let symbols = job_group.keys().cloned().collect::>(); + let symbols = job_group + .iter() + .map(|job| job.symbol.clone()) + .collect::>(); let handler = handler.clone(); let symbols_clone = symbols.clone(); @@ -220,7 +215,7 @@ async fn handle_backfill_message( } } Action::Purge => { - for symbol in &message.symbols { + for symbol in &symbols { if let Some(job) = backfill_jobs.remove(symbol) { job.abort(); let _ = job.await; @@ -228,8 +223,8 @@ async fn handle_backfill_message( } try_join!( - handler.delete_backfills(&message.symbols), - handler.delete_data(&message.symbols) + handler.delete_backfills(&symbols), + handler.delete_data(&symbols) ) .unwrap(); } diff --git a/src/threads/data/backfill/news.rs b/src/threads/data/backfill/news.rs index cd07b5d..4661a64 100644 --- a/src/threads/data/backfill/news.rs +++ b/src/threads/data/backfill/news.rs @@ -5,6 +5,7 @@ use crate::{ }; use async_trait::async_trait; use log::{error, info}; +use nonempty::NonEmpty; use qrust::{ alpaca, types::{ @@ -56,14 +57,14 @@ impl super::Handler for Handler { .await } - async fn queue_backfill(&self, jobs: &HashMap) { - if jobs.is_empty() || *ALPACA_SOURCE == Source::Sip { + async fn queue_backfill(&self, jobs: &NonEmpty) { + if *ALPACA_SOURCE == Source::Sip { return; } - let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); + let fetch_to = jobs.maximum_by_key(|job| job.fetch_to).fetch_to; let run_delay = duration_until(fetch_to + FIFTEEN_MINUTES + ONE_MINUTE); - let symbols = jobs.keys().cloned().collect::>(); + let symbols = jobs.iter().map(|job| &job.symbol).collect::>(); info!("Queing news backfill for {:?} in {:?}.", symbols, run_delay); sleep(run_delay).await; @@ -71,15 +72,15 @@ impl super::Handler for Handler { #[allow(clippy::too_many_lines)] #[allow(clippy::iter_with_drain)] - async fn backfill(&self, jobs: HashMap) { - if jobs.is_empty() { - return; - } - - let symbols = jobs.keys().cloned().collect::>(); + async fn backfill(&self, jobs: NonEmpty) { + let symbols = Vec::from(jobs.clone().map(|job| job.symbol)); let symbols_set = symbols.clone().into_iter().collect::>(); - let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); - let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); + let fetch_from = jobs.minimum_by_key(|job| job.fetch_from).fetch_from; + let fetch_to = jobs.maximum_by_key(|job| job.fetch_to).fetch_to; + let freshness = jobs + .into_iter() + .map(|job| (job.symbol, job.fresh)) + .collect::>(); let mut news = Vec::with_capacity(*CLICKHOUSE_BATCH_NEWS_SIZE); let mut last_times = HashMap::new(); @@ -137,7 +138,7 @@ impl super::Handler for Handler { let backfilled = last_times .drain() .map(|(symbol, time)| Backfill { - fresh: jobs[&symbol].fresh, + fresh: freshness[&symbol], symbol, time, }) diff --git a/src/threads/data/mod.rs b/src/threads/data/mod.rs index 7ff0587..2fbc46f 100644 --- a/src/threads/data/mod.rs +++ b/src/threads/data/mod.rs @@ -8,6 +8,7 @@ use crate::{ }; use itertools::{Either, Itertools}; use log::error; +use nonempty::NonEmpty; use qrust::{ alpaca, types::{ @@ -35,12 +36,12 @@ pub enum Action { pub struct Message { pub action: Action, - pub assets: Vec<(String, Class)>, + pub assets: NonEmpty<(String, Class)>, pub response: oneshot::Sender<()>, } impl Message { - pub fn new(action: Action, assets: Vec<(String, Class)>) -> (Self, oneshot::Receiver<()>) { + pub fn new(action: Action, assets: NonEmpty<(String, Class)>) -> (Self, oneshot::Receiver<()>) { let (sender, receiver) = oneshot::channel(); ( Self { @@ -150,11 +151,6 @@ async fn handle_message( news_backfill_sender: mpsc::Sender, message: Message, ) { - if message.assets.is_empty() { - message.response.send(()).unwrap(); - return; - } - let (us_equity_symbols, crypto_symbols): (Vec<_>, Vec<_>) = message .assets .clone() @@ -164,36 +160,28 @@ async fn handle_message( Class::Crypto => Either::Right(asset.0), }); - let symbols = message - .assets - .into_iter() - .map(|(symbol, _)| symbol) - .collect::>(); + let symbols = message.assets.map(|(symbol, _)| symbol); let bars_us_equity_future = async { - if us_equity_symbols.is_empty() { - return; + if let Some(us_equity_symbols) = NonEmpty::from_vec(us_equity_symbols.clone()) { + create_send_await!( + bars_us_equity_websocket_sender, + websocket::Message::new, + message.action.into(), + us_equity_symbols + ); } - - create_send_await!( - bars_us_equity_websocket_sender, - websocket::Message::new, - message.action.into(), - us_equity_symbols.clone() - ); }; let bars_crypto_future = async { - if crypto_symbols.is_empty() { - return; + if let Some(crypto_symbols) = NonEmpty::from_vec(crypto_symbols.clone()) { + create_send_await!( + bars_crypto_websocket_sender, + websocket::Message::new, + message.action.into(), + crypto_symbols + ); } - - create_send_await!( - bars_crypto_websocket_sender, - websocket::Message::new, - message.action.into(), - crypto_symbols.clone() - ); }; let news_future = async { @@ -207,8 +195,15 @@ async fn handle_message( join!(bars_us_equity_future, bars_crypto_future, news_future); + if message.action == Action::Disable { + message.response.send(()).unwrap(); + return; + } + match message.action { - Action::Add => { + Action::Add | Action::Enable => { + let symbols = Vec::from(symbols.clone()); + let assets = async { alpaca::assets::get_by_symbols( &config.alpaca_client, @@ -264,51 +259,42 @@ async fn handle_message( database::assets::delete_where_symbols( &config.clickhouse_client, &config.clickhouse_concurrency_limiter, - &symbols, + &Vec::from(symbols.clone()), ) .await .unwrap(); } - _ => {} - } - - if message.action == Action::Disable { - message.response.send(()).unwrap(); - return; + Action::Disable => unreachable!(), } let bars_us_equity_future = async { - if us_equity_symbols.is_empty() { - return; + if let Some(us_equity_symbols) = NonEmpty::from_vec(us_equity_symbols) { + create_send_await!( + bars_us_equity_backfill_sender, + backfill::Message::new, + match message.action { + Action::Add | Action::Enable => backfill::Action::Backfill, + Action::Remove => backfill::Action::Purge, + Action::Disable => unreachable!(), + }, + us_equity_symbols + ); } - - create_send_await!( - bars_us_equity_backfill_sender, - backfill::Message::new, - match message.action { - Action::Add | Action::Enable => backfill::Action::Backfill, - Action::Remove => backfill::Action::Purge, - Action::Disable => unreachable!(), - }, - us_equity_symbols - ); }; let bars_crypto_future = async { - if crypto_symbols.is_empty() { - return; + if let Some(crypto_symbols) = NonEmpty::from_vec(crypto_symbols) { + create_send_await!( + bars_crypto_backfill_sender, + backfill::Message::new, + match message.action { + Action::Add | Action::Enable => backfill::Action::Backfill, + Action::Remove => backfill::Action::Purge, + Action::Disable => unreachable!(), + }, + crypto_symbols + ); } - - create_send_await!( - bars_crypto_backfill_sender, - backfill::Message::new, - match message.action { - Action::Add | Action::Enable => backfill::Action::Backfill, - Action::Remove => backfill::Action::Purge, - Action::Disable => unreachable!(), - }, - crypto_symbols - ); }; let news_future = async { @@ -363,30 +349,36 @@ async fn handle_clock_message( .collect::>(); let bars_us_equity_future = async { - create_send_await!( - bars_us_equity_backfill_sender, - backfill::Message::new, - backfill::Action::Backfill, - us_equity_symbols - ); + if let Some(us_equity_symbols) = NonEmpty::from_vec(us_equity_symbols) { + create_send_await!( + bars_us_equity_backfill_sender, + backfill::Message::new, + backfill::Action::Backfill, + us_equity_symbols + ); + } }; let bars_crypto_future = async { - create_send_await!( - bars_crypto_backfill_sender, - backfill::Message::new, - backfill::Action::Backfill, - crypto_symbols - ); + if let Some(crypto_symbols) = NonEmpty::from_vec(crypto_symbols) { + create_send_await!( + bars_crypto_backfill_sender, + backfill::Message::new, + backfill::Action::Backfill, + crypto_symbols + ); + } }; let news_future = async { - create_send_await!( - news_backfill_sender, - backfill::Message::new, - backfill::Action::Backfill, - symbols - ); + if let Some(symbols) = NonEmpty::from_vec(symbols) { + create_send_await!( + news_backfill_sender, + backfill::Message::new, + backfill::Action::Backfill, + symbols + ); + } }; join!(bars_us_equity_future, bars_crypto_future, news_future); diff --git a/src/threads/data/websocket/bars.rs b/src/threads/data/websocket/bars.rs index 166c1f4..255c6b8 100644 --- a/src/threads/data/websocket/bars.rs +++ b/src/threads/data/websocket/bars.rs @@ -7,6 +7,7 @@ use crate::{ use async_trait::async_trait; use clickhouse::inserter::Inserter; use log::{debug, error, info}; +use nonempty::NonEmpty; use qrust::{ types::{alpaca::websocket, Bar, Class}, utils::ONE_SECOND, @@ -21,14 +22,14 @@ pub struct Handler { pub config: Arc, pub inserter: Arc>>, pub subscription_message_constructor: - fn(Vec) -> websocket::data::outgoing::subscribe::Message, + fn(NonEmpty) -> websocket::data::outgoing::subscribe::Message, } #[async_trait] impl super::Handler for Handler { fn create_subscription_message( &self, - symbols: Vec, + symbols: NonEmpty, ) -> websocket::data::outgoing::subscribe::Message { (self.subscription_message_constructor)(symbols) } diff --git a/src/threads/data/websocket/mod.rs b/src/threads/data/websocket/mod.rs index b9818ad..18a2615 100644 --- a/src/threads/data/websocket/mod.rs +++ b/src/threads/data/websocket/mod.rs @@ -7,6 +7,7 @@ use backoff::{future::retry_notify, ExponentialBackoff}; use clickhouse::{inserter::Inserter, Row}; use futures_util::{future::join_all, SinkExt, StreamExt}; use log::error; +use nonempty::NonEmpty; use qrust::types::alpaca::{self, websocket}; use serde::Serialize; use serde_json::{from_str, to_string}; @@ -38,12 +39,12 @@ impl From for Option { pub struct Message { pub action: Option, - pub symbols: Vec, + pub symbols: NonEmpty, pub response: oneshot::Sender<()>, } impl Message { - pub fn new(action: Option, symbols: Vec) -> (Self, oneshot::Receiver<()>) { + pub fn new(action: Option, symbols: NonEmpty) -> (Self, oneshot::Receiver<()>) { let (sender, receiver) = oneshot::channel(); ( Self { @@ -66,7 +67,7 @@ pub struct State { pub trait Handler: Send + Sync + 'static { fn create_subscription_message( &self, - symbols: Vec, + symbols: NonEmpty, ) -> websocket::data::outgoing::subscribe::Message; async fn handle_websocket_message( &self, @@ -206,7 +207,7 @@ async fn run_connection( drop(state); - if !pending_subscriptions.is_empty() { + if let Some(pending_subscriptions) = NonEmpty::from_vec(pending_subscriptions) { if let Err(err) = sink .send(tungstenite::Message::Text( to_string(&websocket::data::outgoing::Message::Subscribe( @@ -277,11 +278,6 @@ async fn handle_message( sink_sender: mpsc::Sender, message: Message, ) { - if message.symbols.is_empty() { - message.response.send(()).unwrap(); - return; - } - match message.action { Some(Action::Subscribe) => { let (pending_subscriptions, receivers) = message diff --git a/src/threads/data/websocket/news.rs b/src/threads/data/websocket/news.rs index 2b36300..dff8762 100644 --- a/src/threads/data/websocket/news.rs +++ b/src/threads/data/websocket/news.rs @@ -3,6 +3,7 @@ use crate::config::{Config, CLICKHOUSE_BATCH_NEWS_SIZE}; use async_trait::async_trait; use clickhouse::inserter::Inserter; use log::{debug, error, info}; +use nonempty::NonEmpty; use qrust::{ types::{alpaca::websocket, News}, utils::ONE_SECOND, @@ -19,7 +20,7 @@ pub struct Handler { impl super::Handler for Handler { fn create_subscription_message( &self, - symbols: Vec, + symbols: NonEmpty, ) -> websocket::data::outgoing::subscribe::Message { websocket::data::outgoing::subscribe::Message::new_news(symbols) }