diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index fc6aa7b..a64aa9b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -22,7 +22,7 @@ build: cache: <<: *global_cache script: - - cargo build --verbose + - cargo build test: image: registry.karaolidis.com/karaolidis/qrust/rust @@ -30,7 +30,7 @@ test: cache: <<: *global_cache script: - - cargo test --verbose + - cargo test lint: image: registry.karaolidis.com/karaolidis/qrust/rust @@ -38,6 +38,7 @@ lint: cache: <<: *global_cache script: + - cargo fmt --all -- --check - cargo clippy --all-targets --all-features depcheck: @@ -55,7 +56,7 @@ build-release: cache: <<: *global_cache script: - - cargo build --release --verbose + - cargo build --release after_script: - echo "JOB_ID=$CI_JOB_ID" >> job.env artifacts: diff --git a/Cargo.lock b/Cargo.lock index a85c95c..1a61c6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,9 +145,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.1" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" [[package]] name = "block-buffer" @@ -872,9 +872,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "linux-raw-sys" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" @@ -1064,7 +1064,7 @@ version = "0.10.62" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8cde4d2d9200ad5909f8dac647e29482e07c3a35de8a13fce7c9c7747ad9f671" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "cfg-if", "foreign-types", "libc", @@ -1174,9 +1174,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" +checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" [[package]] name = "powerfmt" @@ -1344,7 +1344,7 @@ version = "0.38.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "errno", "libc", "linux-raw-sys", diff --git a/src/data/market.rs b/src/data/market.rs index 5769000..4c15323 100644 --- a/src/data/market.rs +++ b/src/data/market.rs @@ -5,10 +5,10 @@ use crate::{ }, data::authenticate_websocket, database, + state::{self, BroadcastMessage}, types::{ alpaca::{api, websocket, Source}, - asset::{self, Asset}, - Bar, BarValidity, BroadcastMessage, Class, + Asset, Backfill, Bar, Class, }, utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}, }; @@ -18,11 +18,12 @@ use futures_util::{ SinkExt, StreamExt, }; use log::{error, info, warn}; -use serde_json::from_str; +use serde_json::{from_str, to_string}; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; +use time::OffsetDateTime; use tokio::{ net::TcpStream, spawn, @@ -30,14 +31,22 @@ use tokio::{ broadcast::{Receiver, Sender}, RwLock, }, + task::JoinHandle, time::sleep, }; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +pub struct Guard { + symbols: HashSet, + backfill_jobs: HashMap>, + pending_subscriptions: HashMap, + pending_unsubscriptions: HashMap, +} + pub async fn run( app_config: Arc, class: Class, - broadcast_sender: Sender, + broadcast_bus_sender: Sender, ) { info!("Running live data threads for {:?}.", class); @@ -54,208 +63,204 @@ pub async fn run( authenticate_websocket(&app_config, &mut stream, &mut sink).await; let sink = Arc::new(RwLock::new(sink)); - spawn(broadcast_handler( + let guard = Arc::new(RwLock::new(Guard { + symbols: HashSet::new(), + backfill_jobs: HashMap::new(), + pending_subscriptions: HashMap::new(), + pending_unsubscriptions: HashMap::new(), + })); + + spawn(broadcast_bus_handler( + app_config.clone(), class, sink.clone(), - broadcast_sender.subscribe(), + broadcast_bus_sender.subscribe(), + guard.clone(), )); - let assets = database::assets::select_where_class(&app_config.clickhouse_client, class).await; - broadcast_sender + spawn(websocket_handler( + app_config.clone(), + stream, + sink, + broadcast_bus_sender.clone(), + guard.clone(), + )); + + spawn(clock_handler( + app_config.clone(), + class, + broadcast_bus_sender.clone(), + )); + + let assets = database::assets::select_where_class(&app_config.clickhouse_client, &class).await; + broadcast_bus_sender .send(BroadcastMessage::Asset(( - asset::BroadcastMessage::Added, + state::asset::BroadcastMessage::Add, assets, ))) .unwrap(); - - websocket_handler(app_config, class, stream, sink).await; - - unreachable!() } -async fn broadcast_handler( +#[allow(clippy::too_many_lines)] +#[allow(clippy::significant_drop_tightening)] +pub async fn broadcast_bus_handler( + app_config: Arc, class: Class, sink: Arc>, Message>>>, - mut broadcast_receiver: Receiver, + mut broadcast_bus_receiver: Receiver, + guard: Arc>, ) { loop { - match broadcast_receiver.recv().await.unwrap() { + match broadcast_bus_receiver.recv().await.unwrap() { BroadcastMessage::Asset((action, assets)) => { - let symbols = assets + let assets = assets .into_iter() .filter(|asset| asset.class == class) - .map(|asset| asset.symbol) .collect::>(); - if symbols.is_empty() { + if assets.is_empty() { continue; } - sink.write() - .await - .send(Message::Text( - serde_json::to_string(&match action { - asset::BroadcastMessage::Added => { - websocket::data::outgoing::Message::Subscribe( - websocket::data::outgoing::subscribe::Message::new( - symbols.clone(), - ), - ) + let symbols = assets + .iter() + .map(|asset| asset.symbol.clone()) + .collect::>(); + + match action { + state::asset::BroadcastMessage::Add => { + database::assets::upsert_batch(&app_config.clickhouse_client, &assets) + .await; + + info!("Added {:?}.", symbols); + + let mut guard = guard.write().await; + + guard.pending_subscriptions.extend( + assets + .into_iter() + .map(|asset| (asset.symbol.clone(), asset)), + ); + + guard.symbols.extend(symbols.clone()); + + sink.write() + .await + .send(Message::Text( + to_string(&websocket::data::outgoing::Message::Subscribe( + websocket::data::outgoing::subscribe::Message::new(symbols), + )) + .unwrap(), + )) + .await + .unwrap(); + } + state::asset::BroadcastMessage::Delete => { + database::assets::delete_where_symbols( + &app_config.clickhouse_client, + &symbols, + ) + .await; + + info!("Deleted {:?}.", symbols); + + let mut guard = guard.write().await; + + guard.pending_unsubscriptions.extend( + assets + .into_iter() + .map(|asset| (asset.symbol.clone(), asset)), + ); + + guard.symbols = guard + .symbols + .clone() + .into_iter() + .filter(|symbol| !symbols.contains(symbol)) + .collect::>(); + + sink.write() + .await + .send(Message::Text( + to_string(&websocket::data::outgoing::Message::Unsubscribe( + websocket::data::outgoing::subscribe::Message::new(symbols), + )) + .unwrap(), + )) + .await + .unwrap(); + } + state::asset::BroadcastMessage::Backfill => { + info!("Creating backfill jobs for {:?}.", symbols); + + let guard_clone = guard.clone(); + let mut guard = guard.write().await; + + for asset in assets { + let mut handles = Vec::new(); + if let Some(backfill_job) = guard.backfill_jobs.remove(&asset.symbol) { + backfill_job.abort(); + handles.push(backfill_job); } - asset::BroadcastMessage::Deleted => { - websocket::data::outgoing::Message::Unsubscribe( - websocket::data::outgoing::subscribe::Message::new( - symbols.clone(), - ), - ) + + for handle in handles { + handle.await.unwrap_err(); } - }) - .unwrap(), - )) - .await - .unwrap(); + + guard.backfill_jobs.insert(asset.symbol.clone(), { + let guard = guard_clone.clone(); + let app_config = app_config.clone(); + + spawn(async move { + backfill(app_config, class, asset.clone()).await; + + let mut guard = guard.write().await; + guard.backfill_jobs.remove(&asset.symbol); + }) + }); + } + } + state::asset::BroadcastMessage::Purge => { + let mut guard = guard.write().await; + + info!("Purging {:?}.", symbols); + + for asset in assets { + let mut handles = Vec::new(); + if let Some(backfill_job) = guard.backfill_jobs.remove(&asset.symbol) { + backfill_job.abort(); + handles.push(backfill_job); + } + + for handle in handles { + handle.await.unwrap_err(); + } + } + + database::backfills::delete_where_symbols( + &app_config.clickhouse_client, + &symbols, + ) + .await; + + database::bars::delete_where_symbols( + &app_config.clickhouse_client, + &symbols, + ) + .await; + } + } } } } } -async fn websocket_handler( +pub async fn clock_handler( app_config: Arc, class: Class, - mut stream: SplitStream>>, - sink: Arc>, Message>>>, + broadcast_bus_sender: Sender, ) { - let backfilled = Arc::new(RwLock::new(HashMap::new())); - loop { - match stream.next().await { - Some(Ok(Message::Text(data))) => { - let parsed_data = from_str::>(&data); - if let Err(e) = &parsed_data { - warn!( - "Unparsed websocket::data::incoming message: {:?}: {}", - data, e - ); - } - - for message in parsed_data.unwrap_or_default() { - spawn(websocket_handle_message( - app_config.clone(), - class, - backfilled.clone(), - message, - )); - } - } - Some(Ok(Message::Ping(_))) => sink - .write() - .await - .send(Message::Pong(vec![])) - .await - .unwrap(), - Some(unknown) => error!("Unknown websocket::data::incoming message: {:?}", unknown), - None => panic!(), - } - } -} - -async fn websocket_handle_message( - app_config: Arc, - class: Class, - backfilled: Arc>>, - message: websocket::data::incoming::Message, -) { - match message { - websocket::data::incoming::Message::Subscription(message) => { - let added_asset_symbols; - let deleted_asset_symbols; - - { - let mut backfilled = backfilled.write().await; - - let old_asset_sybols = backfilled.keys().cloned().collect::>(); - let new_asset_symbols = message.bars.into_iter().collect::>(); - - added_asset_symbols = new_asset_symbols - .difference(&old_asset_sybols) - .cloned() - .collect::>(); - - for asset_symbol in &added_asset_symbols { - backfilled.insert(asset_symbol.clone(), false); - } - - deleted_asset_symbols = old_asset_sybols - .difference(&new_asset_symbols) - .cloned() - .collect::>(); - - for asset_symbol in &deleted_asset_symbols { - backfilled.remove(asset_symbol); - } - - drop(backfilled); - - info!( - "Subscription update for {:?}: {:?} added, {:?} deleted.", - class, added_asset_symbols, deleted_asset_symbols - ); - } - - for asset_symbol in added_asset_symbols { - let asset = database::assets::select_where_symbol( - &app_config.clickhouse_client, - &asset_symbol, - ) - .await - .unwrap(); - - database::bars::insert_validity_if_not_exists( - &app_config.clickhouse_client, - &BarValidity::none(asset.symbol.clone()), - ) - .await; - - spawn(backfill(app_config.clone(), backfilled.clone(), asset)); - } - - for asset_symbol in deleted_asset_symbols { - database::bars::delete_validity_where_symbol( - &app_config.clickhouse_client, - &asset_symbol, - ) - .await; - - database::bars::delete_where_symbol(&app_config.clickhouse_client, &asset_symbol) - .await; - } - } - websocket::data::incoming::Message::Bars(bar_message) - | websocket::data::incoming::Message::UpdatedBars(bar_message) => { - let bar = Bar::from(bar_message); - info!("websocket::Incoming bar for {}: {}", bar.symbol, bar.time); - - database::bars::upsert(&app_config.clickhouse_client, &bar).await; - if *backfilled.read().await.get(&bar.symbol).unwrap() { - database::bars::upsert_validity(&app_config.clickhouse_client, &bar.into()).await; - } - } - websocket::data::incoming::Message::Success(_) => {} - } -} - -pub async fn backfill( - app_config: Arc, - backfilled: Arc>>, - asset: Asset, -) { - let bar_validity = - database::bars::select_validity_where_symbol(&app_config.clickhouse_client, &asset.symbol) - .await - .unwrap(); - - let fetch_from = bar_validity.time_last + ONE_MINUTE; - let fetch_until = if app_config.alpaca_source == Source::Iex { app_config.alpaca_rate_limit.until_ready().await; let clock = app_config .alpaca_client @@ -267,15 +272,177 @@ pub async fn backfill( .await .unwrap(); - if clock.is_open { - last_minute() + let sleep_until = duration_until(if clock.is_open { + if class == Class::UsEquity { + info!("Market is open, will close at {}.", clock.next_close); + } + clock.next_close } else { + if class == Class::UsEquity { + info!("Market is closed, will reopen at {}.", clock.next_open); + } clock.next_open + }); + + sleep(sleep_until).await; + + let assets = database::assets::select(&app_config.clickhouse_client).await; + broadcast_bus_sender + .send(BroadcastMessage::Asset(( + state::asset::BroadcastMessage::Backfill, + assets, + ))) + .unwrap(); + } +} + +async fn websocket_handler( + app_config: Arc, + mut stream: SplitStream>>, + sink: Arc>, Message>>>, + broadcast_bus_sender: Sender, + guard: Arc>, +) { + loop { + let app_config = app_config.clone(); + let sink = sink.clone(); + let broadcast_bus_sender = broadcast_bus_sender.clone(); + let guard = guard.clone(); + let message = stream.next().await; + + spawn(async move { + match message { + Some(Ok(Message::Text(data))) => { + let parsed_data = from_str::>(&data); + + if let Ok(messages) = parsed_data { + for message in messages { + websocket_handle_message( + app_config.clone(), + broadcast_bus_sender.clone(), + guard.clone(), + message, + ) + .await; + } + } else { + error!( + "Unparsed websocket::data::incoming message: {:?}: {}", + data, + parsed_data.err().unwrap() + ); + } + } + Some(Ok(Message::Ping(_))) => sink + .write() + .await + .send(Message::Pong(vec![])) + .await + .unwrap(), + Some(unknown) => error!("Unknown websocket::data::incoming message: {:?}", unknown), + _ => panic!(), + } + }); + } +} + +#[allow(clippy::significant_drop_tightening)] +async fn websocket_handle_message( + app_config: Arc, + broadcast_bus_sender: Sender, + guard: Arc>, + message: websocket::data::incoming::Message, +) { + match message { + websocket::data::incoming::Message::Subscription(message) => { + let symbols = message.bars.into_iter().collect::>(); + + let mut guard = guard.write().await; + + let newly_subscribed_assets = guard + .pending_subscriptions + .drain() + .filter(|(symbol, _)| symbols.contains(symbol)) + .map(|(_, asset)| asset) + .collect::>(); + + if !newly_subscribed_assets.is_empty() { + info!( + "Subscribed to {:?}.", + newly_subscribed_assets + .iter() + .map(|asset| asset.symbol.clone()) + .collect::>() + ); + + broadcast_bus_sender + .send(BroadcastMessage::Asset(( + state::asset::BroadcastMessage::Backfill, + newly_subscribed_assets, + ))) + .unwrap(); + } + + let newly_unsubscribed_assets = guard + .pending_unsubscriptions + .drain() + .filter(|(symbol, _)| !symbols.contains(symbol)) + .map(|(_, asset)| asset) + .collect::>(); + + if !newly_unsubscribed_assets.is_empty() { + info!( + "Unsubscribed from {:?}.", + newly_unsubscribed_assets + .iter() + .map(|asset| asset.symbol.clone()) + .collect::>() + ); + + broadcast_bus_sender + .send(BroadcastMessage::Asset(( + state::asset::BroadcastMessage::Purge, + newly_unsubscribed_assets, + ))) + .unwrap(); + } } + websocket::data::incoming::Message::Bars(bar_message) + | websocket::data::incoming::Message::UpdatedBars(bar_message) => { + let bar = Bar::from(bar_message); + + let guard = guard.read().await; + let symbol_status = guard.symbols.get(&bar.symbol); + + if symbol_status.is_none() { + warn!( + "Race condition: received bar for unsubscribed symbol: {:?}.", + bar.symbol + ); + return; + } + + info!("Received bar for {}: {}", bar.symbol, bar.time); + database::bars::upsert(&app_config.clickhouse_client, &bar).await; + } + websocket::data::incoming::Message::Success(_) => {} + } +} + +pub async fn backfill(app_config: Arc, class: Class, asset: Asset) { + let latest_backfill = database::backfills::select_latest_where_symbol( + &app_config.clickhouse_client, + &asset.symbol, + ) + .await; + + let fetch_from = if let Some(backfill) = latest_backfill { + backfill.time + ONE_MINUTE } else { - last_minute() + OffsetDateTime::UNIX_EPOCH }; + let fetch_until = last_minute(); if fetch_from > fetch_until { return; } @@ -289,7 +456,7 @@ pub async fn backfill( sleep(task_run_delay).await; } - info!("Running historical data backfill for {}...", asset.symbol); + info!("Running historical data backfill for {}.", asset.symbol); let mut bars = Vec::new(); let mut next_page_token = None; @@ -298,7 +465,7 @@ pub async fn backfill( app_config.alpaca_rate_limit.until_ready().await; let message = app_config .alpaca_client - .get(match asset.class { + .get(match class { Class::UsEquity => ALPACA_STOCK_DATA_URL, Class::Crypto => ALPACA_CRYPTO_DATA_URL, }) @@ -330,12 +497,11 @@ pub async fn backfill( } database::bars::upsert_batch(&app_config.clickhouse_client, &bars).await; - if let Some(last_bar) = bars.last() { - database::bars::upsert_validity(&app_config.clickhouse_client, &last_bar.clone().into()) - .await; - } - - backfilled.write().await.insert(asset.symbol.clone(), true); + database::backfills::upsert( + &app_config.clickhouse_client, + &Backfill::new(asset.symbol.clone(), fetch_until), + ) + .await; info!("Backfilled historical data for {}.", asset.symbol); } diff --git a/src/database/assets.rs b/src/database/assets.rs index 9d39b41..1c4ce83 100644 --- a/src/database/assets.rs +++ b/src/database/assets.rs @@ -3,15 +3,15 @@ use clickhouse::Client; pub async fn select(clickhouse_client: &Client) -> Vec { clickhouse_client - .query("SELECT ?fields FROM assets") + .query("SELECT ?fields FROM assets FINAL") .fetch_all::() .await .unwrap() } -pub async fn select_where_class(clickhouse_client: &Client, class: Class) -> Vec { +pub async fn select_where_class(clickhouse_client: &Client, class: &Class) -> Vec { clickhouse_client - .query("SELECT ?fields FROM assets WHERE class = ?") + .query("SELECT ?fields FROM assets FINAL WHERE class = ?") .bind(class) .fetch_all::() .await @@ -20,23 +20,25 @@ pub async fn select_where_class(clickhouse_client: &Client, class: Class) -> Vec pub async fn select_where_symbol(clickhouse_client: &Client, symbol: &str) -> Option { clickhouse_client - .query("SELECT ?fields FROM assets WHERE symbol = ?") + .query("SELECT ?fields FROM assets FINAL WHERE symbol = ?") .bind(symbol) .fetch_optional::() .await .unwrap() } -pub async fn upsert(clickhouse_client: &Client, asset: &Asset) { +pub async fn upsert_batch(clickhouse_client: &Client, assets: &Vec) { let mut insert = clickhouse_client.insert("assets").unwrap(); - insert.write(asset).await.unwrap(); + for asset in assets { + insert.write(asset).await.unwrap(); + } insert.end().await.unwrap(); } -pub async fn delete_where_symbol(clickhouse_client: &Client, symbol: &str) { +pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &Vec) { clickhouse_client - .query("DELETE FROM assets WHERE symbol = ?") - .bind(symbol) + .query("DELETE FROM assets WHERE symbol IN ?") + .bind(symbols) .execute() .await .unwrap(); diff --git a/src/database/backfills.rs b/src/database/backfills.rs new file mode 100644 index 0000000..c879e66 --- /dev/null +++ b/src/database/backfills.rs @@ -0,0 +1,38 @@ +use crate::types::Backfill; +use clickhouse::Client; + +pub async fn select_latest_where_symbol( + clickhouse_client: &Client, + symbol: &str, +) -> Option { + clickhouse_client + .query("SELECT ?fields FROM backfills FINAL WHERE symbol = ? ORDER BY time DESC LIMIT 1") + .bind(symbol) + .fetch_optional::() + .await + .unwrap() +} + +pub async fn upsert(clickhouse_client: &Client, backfill: &Backfill) { + let mut insert = clickhouse_client.insert("backfills").unwrap(); + insert.write(backfill).await.unwrap(); + insert.end().await.unwrap(); +} + +pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &Vec) { + clickhouse_client + .query("DELETE FROM backfills WHERE symbol IN ?") + .bind(symbols) + .execute() + .await + .unwrap(); +} + +pub async fn delete_where_not_symbols(clickhouse_client: &Client, symbols: &Vec) { + clickhouse_client + .query("DELETE FROM backfills WHERE symbol NOT IN ?") + .bind(symbols) + .execute() + .await + .unwrap(); +} diff --git a/src/database/bars.rs b/src/database/bars.rs index fcf8866..5ccff43 100644 --- a/src/database/bars.rs +++ b/src/database/bars.rs @@ -1,4 +1,4 @@ -use crate::types::{Bar, BarValidity}; +use crate::types::Bar; use clickhouse::Client; pub async fn upsert(clickhouse_client: &Client, bar: &Bar) { @@ -15,46 +15,19 @@ pub async fn upsert_batch(clickhouse_client: &Client, bars: &[Bar]) { insert.end().await.unwrap(); } -pub async fn delete_where_symbol(clickhouse_client: &Client, symbol: &str) { +pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &Vec) { clickhouse_client - .query("DELETE FROM bars WHERE symbol = ?") - .bind(symbol) + .query("DELETE FROM bars WHERE symbol IN ?") + .bind(symbols) .execute() .await .unwrap(); } -pub async fn select_validity_where_symbol( - clickhouse_client: &Client, - symbol: &str, -) -> Option { +pub async fn delete_where_not_symbols(clickhouse_client: &Client, symbols: &Vec) { clickhouse_client - .query("SELECT ?fields FROM bars_validity FINAL WHERE symbol = ?") - .bind(symbol) - .fetch_optional::() - .await - .unwrap() -} - -pub async fn upsert_validity(clickhouse_client: &Client, bar_validity: &BarValidity) { - let mut insert = clickhouse_client.insert("bars_validity").unwrap(); - insert.write(bar_validity).await.unwrap(); - insert.end().await.unwrap(); -} - -pub async fn insert_validity_if_not_exists(clickhouse_client: &Client, bar_validity: &BarValidity) { - if select_validity_where_symbol(clickhouse_client, &bar_validity.symbol) - .await - .is_none() - { - upsert_validity(clickhouse_client, bar_validity).await; - } -} - -pub async fn delete_validity_where_symbol(clickhouse_client: &Client, symbol: &str) { - clickhouse_client - .query("DELETE FROM bars_validity WHERE symbol = ?") - .bind(symbol) + .query("DELETE FROM bars WHERE symbol NOT IN ?") + .bind(symbols) .execute() .await .unwrap(); diff --git a/src/database/mod.rs b/src/database/mod.rs index 5ac2df4..4768255 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,2 +1,3 @@ pub mod assets; +pub mod backfills; pub mod bars; diff --git a/src/main.rs b/src/main.rs index 4771ec4..136df19 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,15 +5,18 @@ mod config; mod data; mod database; mod routes; +mod state; mod types; mod utils; +use crate::utils::cleanup; use config::Config; use dotenv::dotenv; use log4rs::config::Deserializers; +use state::BroadcastMessage; use std::error::Error; use tokio::{spawn, sync::broadcast}; -use types::{BroadcastMessage, Class}; +use types::Class; #[tokio::main] async fn main() -> Result<(), Box> { @@ -22,21 +25,23 @@ async fn main() -> Result<(), Box> { let app_config = Config::arc_from_env(); let mut threads = Vec::new(); - let (broadcast_sender, _) = broadcast::channel::(100); + cleanup(&app_config.clickhouse_client).await; + + let (broadcast_bus, _) = broadcast::channel::(100); threads.push(spawn(data::market::run( app_config.clone(), Class::UsEquity, - broadcast_sender.clone(), + broadcast_bus.clone(), ))); threads.push(spawn(data::market::run( app_config.clone(), Class::Crypto, - broadcast_sender.clone(), + broadcast_bus.clone(), ))); - threads.push(spawn(routes::run(app_config.clone(), broadcast_sender))); + threads.push(spawn(routes::run(app_config.clone(), broadcast_bus))); for thread in threads { thread.await?; diff --git a/src/routes/assets.rs b/src/routes/assets.rs index 657cb10..ee31814 100644 --- a/src/routes/assets.rs +++ b/src/routes/assets.rs @@ -1,12 +1,14 @@ -use crate::config::{Config, ALPACA_ASSET_API_URL}; -use crate::database; -use crate::types::{ - alpaca::api::incoming::{self, asset::Status}, - asset, Asset, BroadcastMessage, +use crate::{ + config::{Config, ALPACA_ASSET_API_URL}, + database, + state::{self, BroadcastMessage}, + types::{ + alpaca::api::incoming::{self, asset::Status}, + Asset, + }, }; use axum::{extract::Path, Extension, Json}; use http::StatusCode; -use log::info; use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast::Sender; @@ -35,7 +37,7 @@ pub struct AddAssetRequest { pub async fn add( Extension(app_config): Extension>, - Extension(broadcast_sender): Extension>, + Extension(broadcast_bus_sender): Extension>, Json(request): Json, ) -> Result<(StatusCode, Json), StatusCode> { if database::assets::select_where_symbol(&app_config.clickhouse_client, &request.symbol) @@ -66,22 +68,19 @@ pub async fn add( } let asset = Asset::from(asset); - database::assets::upsert(&app_config.clickhouse_client, &asset).await; - - broadcast_sender + broadcast_bus_sender .send(BroadcastMessage::Asset(( - asset::BroadcastMessage::Added, + state::asset::BroadcastMessage::Add, vec![asset.clone()], ))) .unwrap(); - info!("Added asset {}.", asset.symbol); Ok((StatusCode::CREATED, Json(asset))) } pub async fn delete( Extension(app_config): Extension>, - Extension(broadcast_sender): Extension>, + Extension(broadcast_bus_sender): Extension>, Path(symbol): Path, ) -> Result { let asset = database::assets::select_where_symbol(&app_config.clickhouse_client, &symbol) @@ -89,15 +88,12 @@ pub async fn delete( .ok_or(StatusCode::NOT_FOUND) .unwrap(); - broadcast_sender + broadcast_bus_sender .send(BroadcastMessage::Asset(( - asset::BroadcastMessage::Deleted, - vec![asset.clone()], + state::asset::BroadcastMessage::Delete, + vec![asset], ))) .unwrap(); - database::assets::delete_where_symbol(&app_config.clickhouse_client, &symbol).await; - - info!("Deleted asset {}.", symbol); Ok(StatusCode::NO_CONTENT) } diff --git a/src/routes/mod.rs b/src/routes/mod.rs index c53e4c1..34f6b65 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,4 +1,4 @@ -use crate::{config::Config, types::BroadcastMessage}; +use crate::{config::Config, state::BroadcastMessage}; use axum::{ routing::{delete, get, post}, serve, Extension, Router, diff --git a/src/state/asset.rs b/src/state/asset.rs new file mode 100644 index 0000000..5bf5370 --- /dev/null +++ b/src/state/asset.rs @@ -0,0 +1,7 @@ +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum BroadcastMessage { + Add, + Backfill, + Delete, + Purge, +} diff --git a/src/state/mod.rs b/src/state/mod.rs new file mode 100644 index 0000000..e027a0f --- /dev/null +++ b/src/state/mod.rs @@ -0,0 +1,8 @@ +use crate::types::Asset; + +pub mod asset; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum BroadcastMessage { + Asset((asset::BroadcastMessage, Vec)), +} diff --git a/src/types/alpaca/api/incoming/asset.rs b/src/types/alpaca/api/incoming/asset.rs index 810d0e8..03cfe04 100644 --- a/src/types/alpaca/api/incoming/asset.rs +++ b/src/types/alpaca/api/incoming/asset.rs @@ -1,5 +1,3 @@ -#![allow(clippy::struct_excessive_bools)] - use crate::types::alpaca::api::impl_from_enum; use serde::{Deserialize, Serialize}; @@ -45,6 +43,7 @@ pub enum Status { Inactive, } +#[allow(clippy::struct_excessive_bools)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Asset { pub id: String, diff --git a/src/types/alpaca/api/incoming/bar.rs b/src/types/alpaca/api/incoming/bar.rs index 78045b4..ae66841 100644 --- a/src/types/alpaca/api/incoming/bar.rs +++ b/src/types/alpaca/api/incoming/bar.rs @@ -16,7 +16,7 @@ pub struct Bar { #[serde(rename = "c")] pub close: f64, #[serde(rename = "v")] - pub volume: i64, + pub volume: f64, #[serde(rename = "n")] pub trades: i64, #[serde(rename = "vw")] diff --git a/src/types/alpaca/websocket/data/incoming/bar.rs b/src/types/alpaca/websocket/data/incoming/bar.rs index cea2023..335dc59 100644 --- a/src/types/alpaca/websocket/data/incoming/bar.rs +++ b/src/types/alpaca/websocket/data/incoming/bar.rs @@ -17,7 +17,7 @@ pub struct Message { #[serde(rename = "c")] pub close: f64, #[serde(rename = "v")] - pub volume: i64, + pub volume: f64, #[serde(rename = "n")] pub trades: i64, #[serde(rename = "vw")] diff --git a/src/types/asset.rs b/src/types/asset.rs index 1f53ebc..6fd878e 100644 --- a/src/types/asset.rs +++ b/src/types/asset.rs @@ -31,9 +31,3 @@ pub struct Asset { #[serde(with = "clickhouse::serde::time::datetime")] pub time_added: OffsetDateTime, } - -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum BroadcastMessage { - Added, - Deleted, -} diff --git a/src/types/backfill.rs b/src/types/backfill.rs new file mode 100644 index 0000000..89e233a --- /dev/null +++ b/src/types/backfill.rs @@ -0,0 +1,23 @@ +use super::Bar; +use clickhouse::Row; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Row)] +pub struct Backfill { + pub symbol: String, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time: OffsetDateTime, +} + +impl Backfill { + pub const fn new(symbol: String, time: OffsetDateTime) -> Self { + Self { symbol, time } + } +} + +impl From for Backfill { + fn from(bar: Bar) -> Self { + Self::new(bar.symbol, bar.time) + } +} diff --git a/src/types/bar.rs b/src/types/bar.rs index d7f5616..7d60a4b 100644 --- a/src/types/bar.rs +++ b/src/types/bar.rs @@ -1,5 +1,3 @@ -#![allow(clippy::module_name_repetitions)] - use clickhouse::Row; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -13,32 +11,7 @@ pub struct Bar { pub high: f64, pub low: f64, pub close: f64, - pub volume: i64, + pub volume: f64, pub trades: i64, pub vwap: f64, } - -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Row)] -pub struct BarValidity { - pub symbol: String, - #[serde(with = "clickhouse::serde::time::datetime")] - pub time_last: OffsetDateTime, -} - -impl BarValidity { - pub const fn none(symbol: String) -> Self { - Self { - symbol, - time_last: OffsetDateTime::UNIX_EPOCH, - } - } -} - -impl From for BarValidity { - fn from(bar: Bar) -> Self { - Self { - symbol: bar.symbol, - time_last: bar.time, - } - } -} diff --git a/src/types/mod.rs b/src/types/mod.rs index 4fd2b8e..266d69f 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,11 +1,8 @@ pub mod alpaca; pub mod asset; +pub mod backfill; pub mod bar; pub use asset::{Asset, Class, Exchange}; -pub use bar::{Bar, BarValidity}; - -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum BroadcastMessage { - Asset((asset::BroadcastMessage, Vec)), -} +pub use backfill::Backfill; +pub use bar::Bar; diff --git a/src/utils/cleanup.rs b/src/utils/cleanup.rs new file mode 100644 index 0000000..f30b040 --- /dev/null +++ b/src/utils/cleanup.rs @@ -0,0 +1,14 @@ +use crate::database; +use clickhouse::Client; + +pub async fn cleanup(clickhouse_client: &Client) { + let assets = database::assets::select(clickhouse_client).await; + + let symbols = assets + .iter() + .map(|asset| asset.symbol.clone()) + .collect::>(); + + database::bars::delete_where_not_symbols(clickhouse_client, &symbols).await; + database::backfills::delete_where_not_symbols(clickhouse_client, &symbols).await; +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index bd9b56b..df65ac1 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,5 @@ +pub mod cleanup; pub mod time; +pub use cleanup::cleanup; pub use time::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}; diff --git a/support/ci/Dockerfile b/support/ci/Dockerfile index b34d71b..e860203 100644 --- a/support/ci/Dockerfile +++ b/support/ci/Dockerfile @@ -1,5 +1,5 @@ FROM rust RUN rustup install nightly -RUN rustup component add clippy +RUN rustup component add rustfmt clippy RUN cargo install cargo-udeps cargo-outdated diff --git a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql index 818617f..a8522d7 100644 --- a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql +++ b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql @@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS qrust.bars ( high Float64, low Float64, close Float64, - volume Int64, + volume Float64, trades Int64, vwap Float64 ) @@ -31,9 +31,9 @@ ENGINE = ReplacingMergeTree() PRIMARY KEY (symbol, time) PARTITION BY toYYYYMM(time); -CREATE TABLE IF NOT EXISTS qrust.bars_validity ( +CREATE TABLE IF NOT EXISTS qrust.backfills ( symbol String, - time_last DateTime + time DateTime ) ENGINE = ReplacingMergeTree() PRIMARY KEY symbol;