diff --git a/src/data/clock.rs b/src/data/clock.rs new file mode 100644 index 0000000..ac504e1 --- /dev/null +++ b/src/data/clock.rs @@ -0,0 +1,49 @@ +use crate::{ + config::{Config, ALPACA_CLOCK_API_URL}, + types::{ + alpaca, + state::{self, BroadcastMessage}, + }, + utils::duration_until, +}; +use backoff::{future::retry, ExponentialBackoff}; +use log::info; +use std::sync::Arc; +use tokio::{sync::broadcast::Sender, time::sleep}; + +pub async fn run(app_config: Arc, broadcast_bus_sender: Sender) { + loop { + let clock = retry(ExponentialBackoff::default(), || async { + app_config.alpaca_rate_limit.until_ready().await; + app_config + .alpaca_client + .get(ALPACA_CLOCK_API_URL) + .send() + .await? + .error_for_status()? + .json::() + .await + .map_err(backoff::Error::Permanent) + }) + .await + .unwrap(); + + let sleep_until = duration_until(if clock.is_open { + info!("Market is open, will close at {}.", clock.next_close); + clock.next_close + } else { + info!("Market is closed, will reopen at {}.", clock.next_open); + clock.next_open + }); + + sleep(sleep_until).await; + + broadcast_bus_sender + .send(BroadcastMessage::Clock(if clock.is_open { + state::clock::BroadcastMessage::Open + } else { + state::clock::BroadcastMessage::Close + })) + .unwrap(); + } +} diff --git a/src/data/market.rs b/src/data/market.rs index 15445d2..1420d50 100644 --- a/src/data/market.rs +++ b/src/data/market.rs @@ -1,13 +1,10 @@ use crate::{ - config::{ - Config, ALPACA_CLOCK_API_URL, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL, - }, + config::{Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL}, data::authenticate_websocket, database, - state::{self, BroadcastMessage}, types::{ alpaca::{api, websocket, Source}, - Asset, Backfill, Bar, Class, + state, Asset, Backfill, Bar, BroadcastMessage, Class, }, utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}, }; @@ -26,10 +23,7 @@ use time::OffsetDateTime; use tokio::{ net::TcpStream, spawn, - sync::{ - broadcast::{Receiver, Sender}, - Mutex, RwLock, - }, + sync::{broadcast::Sender, Mutex, RwLock}, task::JoinHandle, time::sleep, }; @@ -47,7 +41,7 @@ pub async fn run( class: Class, broadcast_bus_sender: Sender, ) { - info!("Running live data threads for {:?}.", class); + info!("Running live threads for {:?}.", class); let websocket_url = match class { Class::UsEquity => format!( @@ -73,7 +67,7 @@ pub async fn run( app_config.clone(), class, sink.clone(), - broadcast_bus_sender.subscribe(), + broadcast_bus_sender.clone(), guard.clone(), )); @@ -85,12 +79,6 @@ pub async fn run( guard.clone(), )); - spawn(clock_handler( - app_config.clone(), - class, - broadcast_bus_sender.clone(), - )); - let assets = database::assets::select_where_class(&app_config.clickhouse_client, &class).await; broadcast_bus_sender .send(BroadcastMessage::Asset(( @@ -100,184 +88,159 @@ pub async fn run( .unwrap(); } -#[allow(clippy::too_many_lines)] -#[allow(clippy::significant_drop_tightening)] pub async fn broadcast_bus_handler( app_config: Arc, class: Class, sink: Arc>, Message>>>, - mut broadcast_bus_receiver: Receiver, + broadcast_bus_sender: Sender, guard: Arc>, ) { + let mut broadcast_bus_receiver = broadcast_bus_sender.subscribe(); + loop { - match broadcast_bus_receiver.recv().await.unwrap() { - BroadcastMessage::Asset((action, mut assets)) => { - assets.retain(|asset| asset.class == class); + let app_config = app_config.clone(); + let sink = sink.clone(); + let broadcast_bus_sender = broadcast_bus_sender.clone(); + let guard = guard.clone(); + let message = broadcast_bus_receiver.recv().await.unwrap(); - if assets.is_empty() { - continue; - } - - let symbols = assets - .iter() - .map(|asset| asset.symbol.clone()) - .collect::>(); - - match action { - state::asset::BroadcastMessage::Add => { - database::assets::upsert_batch(&app_config.clickhouse_client, &assets) - .await; - - info!("Added {:?}.", symbols); - - let mut guard = guard.write().await; - - guard.pending_subscriptions.extend( - assets - .into_iter() - .map(|asset| (asset.symbol.clone(), asset)), - ); - - guard.symbols.extend(symbols.clone()); - - sink.lock() - .await - .send(Message::Text( - to_string(&websocket::data::outgoing::Message::Subscribe( - websocket::data::outgoing::subscribe::Message::new(symbols), - )) - .unwrap(), - )) - .await - .unwrap(); - } - state::asset::BroadcastMessage::Delete => { - database::assets::delete_where_symbols( - &app_config.clickhouse_client, - &symbols, - ) - .await; - - info!("Deleted {:?}.", symbols); - - let mut guard = guard.write().await; - - guard.pending_unsubscriptions.extend( - assets - .into_iter() - .map(|asset| (asset.symbol.clone(), asset)), - ); - - guard.symbols.retain(|symbol| !symbols.contains(symbol)); - - sink.lock() - .await - .send(Message::Text( - to_string(&websocket::data::outgoing::Message::Unsubscribe( - websocket::data::outgoing::subscribe::Message::new(symbols), - )) - .unwrap(), - )) - .await - .unwrap(); - } - state::asset::BroadcastMessage::Backfill => { - let guard_clone = guard.clone(); - let mut guard = guard.write().await; - - info!("Creating backfill jobs for {:?}.", symbols); - - for asset in assets { - if let Some(backfill_job) = guard.backfill_jobs.remove(&asset.symbol) { - backfill_job.abort(); - backfill_job.await.unwrap_err(); - } - - guard.backfill_jobs.insert(asset.symbol.clone(), { - let guard = guard_clone.clone(); - let app_config = app_config.clone(); - - spawn(async move { - backfill(app_config, class, asset.clone()).await; - - let mut guard = guard.write().await; - guard.backfill_jobs.remove(&asset.symbol); - }) - }); - } - } - state::asset::BroadcastMessage::Purge => { - let mut guard = guard.write().await; - - info!("Purging {:?}.", symbols); - - for asset in assets { - if let Some(backfill_job) = guard.backfill_jobs.remove(&asset.symbol) { - backfill_job.abort(); - backfill_job.await.unwrap_err(); - } - } - - database::backfills::delete_where_symbols( - &app_config.clickhouse_client, - &symbols, - ) - .await; - - database::bars::delete_where_symbols( - &app_config.clickhouse_client, - &symbols, - ) - .await; - } - } - } - } + spawn(broadcast_bus_handle_message( + app_config, + class, + sink, + broadcast_bus_sender, + guard, + message, + )); } } -pub async fn clock_handler( +#[allow(clippy::significant_drop_tightening)] +#[allow(clippy::too_many_lines)] +async fn broadcast_bus_handle_message( app_config: Arc, class: Class, + sink: Arc>, Message>>>, broadcast_bus_sender: Sender, + guard: Arc>, + message: BroadcastMessage, ) { - loop { - let clock = retry(ExponentialBackoff::default(), || async { - app_config.alpaca_rate_limit.until_ready().await; - app_config - .alpaca_client - .get(ALPACA_CLOCK_API_URL) - .send() - .await? - .error_for_status()? - .json::() - .await - .map_err(backoff::Error::Permanent) - }) - .await - .unwrap(); - - let sleep_until = duration_until(if clock.is_open { - if class == Class::UsEquity { - info!("Market is open, will close at {}.", clock.next_close); + match message { + BroadcastMessage::Asset((action, mut assets)) => { + assets.retain(|asset| asset.class == class); + if assets.is_empty() { + return; } - clock.next_close - } else { - if class == Class::UsEquity { - info!("Market is closed, will reopen at {}.", clock.next_open); + + let assets = assets + .into_iter() + .map(|asset| (asset.symbol.clone(), asset)) + .collect::>(); + + let symbols = assets.keys().cloned().collect::>(); + + match action { + state::asset::BroadcastMessage::Add => { + database::assets::upsert_batch( + &app_config.clickhouse_client, + assets.clone().into_values(), + ) + .await; + + let mut guard = guard.write().await; + guard.symbols.extend(symbols.clone()); + guard.pending_subscriptions.extend(assets); + + info!("Added {:?}.", symbols); + + sink.lock() + .await + .send(Message::Text( + to_string(&websocket::data::outgoing::Message::Subscribe( + websocket::data::outgoing::subscribe::Message::new(symbols), + )) + .unwrap(), + )) + .await + .unwrap(); + } + state::asset::BroadcastMessage::Delete => { + database::assets::delete_where_symbols(&app_config.clickhouse_client, &symbols) + .await; + + let mut guard = guard.write().await; + guard.symbols.retain(|symbol| !assets.contains_key(symbol)); + guard.pending_unsubscriptions.extend(assets); + + info!("Deleted {:?}.", symbols); + + sink.lock() + .await + .send(Message::Text( + to_string(&websocket::data::outgoing::Message::Unsubscribe( + websocket::data::outgoing::subscribe::Message::new(symbols), + )) + .unwrap(), + )) + .await + .unwrap(); + } + state::asset::BroadcastMessage::Backfill => { + let guard_clone = guard.clone(); + let mut guard = guard.write().await; + + info!("Creating backfill jobs for {:?}.", symbols); + + for (symbol, asset) in assets { + if let Some(backfill_job) = guard.backfill_jobs.remove(&symbol) { + backfill_job.abort(); + backfill_job.await.unwrap_err(); + } + + guard.backfill_jobs.insert(symbol.clone(), { + let guard = guard_clone.clone(); + let app_config = app_config.clone(); + + spawn(async move { + backfill(app_config, class, asset.clone()).await; + + let mut guard = guard.write().await; + guard.backfill_jobs.remove(&symbol); + }) + }); + } + } + state::asset::BroadcastMessage::Purge => { + let mut guard = guard.write().await; + + info!("Purging {:?}.", symbols); + + for (symbol, _) in assets { + if let Some(backfill_job) = guard.backfill_jobs.remove(&symbol) { + backfill_job.abort(); + backfill_job.await.unwrap_err(); + } + } + + database::backfills::delete_where_symbols( + &app_config.clickhouse_client, + &symbols, + ) + .await; + + database::bars::delete_where_symbols(&app_config.clickhouse_client, &symbols) + .await; + } } - clock.next_open - }); - - sleep(sleep_until).await; - - let assets = database::assets::select(&app_config.clickhouse_client).await; - broadcast_bus_sender - .send(BroadcastMessage::Asset(( - state::asset::BroadcastMessage::Backfill, - assets, - ))) - .unwrap(); + } + BroadcastMessage::Clock(_) => { + broadcast_bus_sender + .send(BroadcastMessage::Asset(( + state::asset::BroadcastMessage::Backfill, + database::assets::select(&app_config.clickhouse_client).await, + ))) + .unwrap(); + } } } @@ -343,22 +306,18 @@ async fn websocket_handle_message( let newly_subscribed_assets = guard .pending_subscriptions .extract_if(|symbol, _| symbols.contains(symbol)) - .map(|(_, asset)| asset) - .collect::>(); + .collect::>(); if !newly_subscribed_assets.is_empty() { info!( "Subscribed to {:?}.", - newly_subscribed_assets - .iter() - .map(|asset| asset.symbol.clone()) - .collect::>() + newly_subscribed_assets.keys().collect::>() ); broadcast_bus_sender .send(BroadcastMessage::Asset(( state::asset::BroadcastMessage::Backfill, - newly_subscribed_assets, + newly_subscribed_assets.into_values().collect::>(), ))) .unwrap(); } @@ -366,22 +325,18 @@ async fn websocket_handle_message( let newly_unsubscribed_assets = guard .pending_unsubscriptions .extract_if(|symbol, _| !symbols.contains(symbol)) - .map(|(_, asset)| asset) - .collect::>(); + .collect::>(); if !newly_unsubscribed_assets.is_empty() { info!( "Unsubscribed from {:?}.", - newly_unsubscribed_assets - .iter() - .map(|asset| asset.symbol.clone()) - .collect::>() + newly_unsubscribed_assets.keys().collect::>() ); broadcast_bus_sender .send(BroadcastMessage::Asset(( state::asset::BroadcastMessage::Purge, - newly_unsubscribed_assets, + newly_unsubscribed_assets.into_values().collect::>(), ))) .unwrap(); } @@ -429,13 +384,13 @@ pub async fn backfill(app_config: Arc, class: Class, asset: Asset) { if app_config.alpaca_source == Source::Iex { let task_run_delay = duration_until(fetch_until + FIFTEEN_MINUTES + ONE_MINUTE); info!( - "Queing historical data backfill for {} in {:?}.", + "Queing backfill for {} in {:?}.", asset.symbol, task_run_delay ); sleep(task_run_delay).await; } - info!("Running historical data backfill for {}.", asset.symbol); + info!("Running backfill for {}.", asset.symbol); let mut bars = Vec::new(); let mut next_page_token = None; @@ -466,10 +421,7 @@ pub async fn backfill(app_config: Arc, class: Class, asset: Asset) { let message = match message { Ok(message) => message, Err(e) => { - error!( - "Failed to backfill historical data for {}: {}.", - asset.symbol, e - ); + error!("Failed to backfill data for {}: {}.", asset.symbol, e); return; } }; @@ -486,12 +438,12 @@ pub async fn backfill(app_config: Arc, class: Class, asset: Asset) { next_page_token = message.next_page_token; } - database::bars::upsert_batch(&app_config.clickhouse_client, &bars).await; + database::bars::upsert_batch(&app_config.clickhouse_client, bars).await; database::backfills::upsert( &app_config.clickhouse_client, &Backfill::new(asset.symbol.clone(), fetch_until), ) .await; - info!("Backfilled historical data for {}.", asset.symbol); + info!("Backfilled data for {}.", asset.symbol); } diff --git a/src/data/mod.rs b/src/data/mod.rs index 8764a5d..cacd796 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,3 +1,4 @@ +pub mod clock; pub mod market; use crate::{config::Config, types::alpaca::websocket}; diff --git a/src/database/assets.rs b/src/database/assets.rs index 1c4ce83..19343c2 100644 --- a/src/database/assets.rs +++ b/src/database/assets.rs @@ -1,5 +1,6 @@ use crate::types::{Asset, Class}; use clickhouse::Client; +use serde::Serialize; pub async fn select(clickhouse_client: &Client) -> Vec { clickhouse_client @@ -18,7 +19,10 @@ pub async fn select_where_class(clickhouse_client: &Client, class: &Class) -> Ve .unwrap() } -pub async fn select_where_symbol(clickhouse_client: &Client, symbol: &str) -> Option { +pub async fn select_where_symbol(clickhouse_client: &Client, symbol: &T) -> Option +where + T: AsRef + Serialize + Send + Sync, +{ clickhouse_client .query("SELECT ?fields FROM assets FINAL WHERE symbol = ?") .bind(symbol) @@ -27,15 +31,22 @@ pub async fn select_where_symbol(clickhouse_client: &Client, symbol: &str) -> Op .unwrap() } -pub async fn upsert_batch(clickhouse_client: &Client, assets: &Vec) { +pub async fn upsert_batch(clickhouse_client: &Client, assets: T) +where + T: IntoIterator + Send + Sync, + T::IntoIter: Send, +{ let mut insert = clickhouse_client.insert("assets").unwrap(); for asset in assets { - insert.write(asset).await.unwrap(); + insert.write(&asset).await.unwrap(); } insert.end().await.unwrap(); } -pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &Vec) { +pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &[T]) +where + T: AsRef + Serialize + Send + Sync, +{ clickhouse_client .query("DELETE FROM assets WHERE symbol IN ?") .bind(symbols) diff --git a/src/database/backfills.rs b/src/database/backfills.rs index c879e66..e1751e9 100644 --- a/src/database/backfills.rs +++ b/src/database/backfills.rs @@ -1,10 +1,14 @@ use crate::types::Backfill; use clickhouse::Client; +use serde::Serialize; -pub async fn select_latest_where_symbol( +pub async fn select_latest_where_symbol( clickhouse_client: &Client, - symbol: &str, -) -> Option { + symbol: &T, +) -> Option +where + T: AsRef + Serialize + Send + Sync, +{ clickhouse_client .query("SELECT ?fields FROM backfills FINAL WHERE symbol = ? ORDER BY time DESC LIMIT 1") .bind(symbol) @@ -19,7 +23,10 @@ pub async fn upsert(clickhouse_client: &Client, backfill: &Backfill) { insert.end().await.unwrap(); } -pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &Vec) { +pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &[T]) +where + T: AsRef + Serialize + Send + Sync, +{ clickhouse_client .query("DELETE FROM backfills WHERE symbol IN ?") .bind(symbols) @@ -28,7 +35,10 @@ pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &Vec) { +pub async fn delete_where_not_symbols(clickhouse_client: &Client, symbols: &[T]) +where + T: AsRef + Serialize + Send + Sync, +{ clickhouse_client .query("DELETE FROM backfills WHERE symbol NOT IN ?") .bind(symbols) diff --git a/src/database/bars.rs b/src/database/bars.rs index 5ccff43..c67185b 100644 --- a/src/database/bars.rs +++ b/src/database/bars.rs @@ -1,5 +1,6 @@ use crate::types::Bar; use clickhouse::Client; +use serde::Serialize; pub async fn upsert(clickhouse_client: &Client, bar: &Bar) { let mut insert = clickhouse_client.insert("bars").unwrap(); @@ -7,15 +8,22 @@ pub async fn upsert(clickhouse_client: &Client, bar: &Bar) { insert.end().await.unwrap(); } -pub async fn upsert_batch(clickhouse_client: &Client, bars: &[Bar]) { +pub async fn upsert_batch(clickhouse_client: &Client, bars: T) +where + T: IntoIterator + Send + Sync, + T::IntoIter: Send, +{ let mut insert = clickhouse_client.insert("bars").unwrap(); for bar in bars { - insert.write(bar).await.unwrap(); + insert.write(&bar).await.unwrap(); } insert.end().await.unwrap(); } -pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &Vec) { +pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &[T]) +where + T: AsRef + Serialize + Send + Sync, +{ clickhouse_client .query("DELETE FROM bars WHERE symbol IN ?") .bind(symbols) @@ -24,7 +32,10 @@ pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &Vec) { +pub async fn delete_where_not_symbols(clickhouse_client: &Client, symbols: &[T]) +where + T: AsRef + Serialize + Send + Sync, +{ clickhouse_client .query("DELETE FROM bars WHERE symbol NOT IN ?") .bind(symbols) diff --git a/src/main.rs b/src/main.rs index a07df9e..499d507 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,6 @@ mod config; mod data; mod database; mod routes; -mod state; mod types; mod utils; @@ -14,9 +13,8 @@ use crate::utils::cleanup; use config::Config; use dotenv::dotenv; use log4rs::config::Deserializers; -use state::BroadcastMessage; use tokio::{spawn, sync::broadcast}; -use types::Class; +use types::{BroadcastMessage, Class}; #[tokio::main] async fn main() { @@ -40,5 +38,7 @@ async fn main() { broadcast_bus.clone(), )); + spawn(data::clock::run(app_config.clone(), broadcast_bus.clone())); + routes::run(app_config, broadcast_bus).await; } diff --git a/src/routes/assets.rs b/src/routes/assets.rs index 728b54f..7f79198 100644 --- a/src/routes/assets.rs +++ b/src/routes/assets.rs @@ -1,9 +1,9 @@ use crate::{ config::{Config, ALPACA_ASSET_API_URL}, database, - state::{self, BroadcastMessage}, types::{ alpaca::api::incoming::{self, asset::Status}, + state::{self, BroadcastMessage}, Asset, }, }; diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 311180b..d8e4593 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,4 +1,4 @@ -use crate::{config::Config, state::BroadcastMessage}; +use crate::{config::Config, types::BroadcastMessage}; use axum::{ routing::{delete, get, post}, serve, Extension, Router, diff --git a/src/types/alpaca/api/outgoing/bar.rs b/src/types/alpaca/api/outgoing/bar.rs index 5617593..124ef57 100644 --- a/src/types/alpaca/api/outgoing/bar.rs +++ b/src/types/alpaca/api/outgoing/bar.rs @@ -1,6 +1,5 @@ -use std::time::Duration; - use serde::{Serialize, Serializer}; +use std::time::Duration; use time::OffsetDateTime; fn serialize_symbols(symbols: &[String], serializer: S) -> Result diff --git a/src/types/mod.rs b/src/types/mod.rs index 266d69f..45494c5 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -2,7 +2,9 @@ pub mod alpaca; pub mod asset; pub mod backfill; pub mod bar; +pub mod state; pub use asset::{Asset, Class, Exchange}; pub use backfill::Backfill; pub use bar::Bar; +pub use state::BroadcastMessage; diff --git a/src/state/asset.rs b/src/types/state/asset.rs similarity index 100% rename from src/state/asset.rs rename to src/types/state/asset.rs diff --git a/src/types/state/clock.rs b/src/types/state/clock.rs new file mode 100644 index 0000000..5433a8b --- /dev/null +++ b/src/types/state/clock.rs @@ -0,0 +1,5 @@ +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum BroadcastMessage { + Open, + Close, +} diff --git a/src/state/mod.rs b/src/types/state/mod.rs similarity index 75% rename from src/state/mod.rs rename to src/types/state/mod.rs index e027a0f..c1f6023 100644 --- a/src/state/mod.rs +++ b/src/types/state/mod.rs @@ -1,8 +1,10 @@ use crate::types::Asset; pub mod asset; +pub mod clock; #[derive(Clone, Debug, PartialEq, Eq)] pub enum BroadcastMessage { Asset((asset::BroadcastMessage, Vec)), + Clock(clock::BroadcastMessage), } diff --git a/src/utils/cleanup.rs b/src/utils/cleanup.rs index f30b040..3e75110 100644 --- a/src/utils/cleanup.rs +++ b/src/utils/cleanup.rs @@ -3,7 +3,6 @@ use clickhouse::Client; pub async fn cleanup(clickhouse_client: &Client) { let assets = database::assets::select(clickhouse_client).await; - let symbols = assets .iter() .map(|asset| asset.symbol.clone())