diff --git a/Cargo.lock b/Cargo.lock index 0088ac0..4a1cf01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,16 +216,17 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" dependencies = [ "android-tzdata", "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -316,6 +317,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.48", +] + +[[package]] +name = "darling_macro" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.48", +] + [[package]] name = "dashmap" version = "5.5.3" @@ -644,6 +680,12 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.11" @@ -815,6 +857,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.5.0" @@ -833,6 +881,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -843,6 +892,7 @@ checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", "hashbrown 0.14.3", + "serde", ] [[package]] @@ -1215,9 +1265,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.76" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -1239,6 +1289,7 @@ dependencies = [ "serde", "serde_json", "serde_repr", + "serde_with", "time", "tokio", "tokio-tungstenite", @@ -1522,6 +1573,35 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5c9fdb6b00a489875b22efd4b78fe2b363b72265cc5f6eb2e2b9ee270e6140c" +dependencies = [ + "base64", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.1.0", + "serde", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbff351eb4b33600a2e138dfa0b10b65a238ea8ff8fb2387c422c5022a3e8298" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "serde_yaml" version = "0.8.26" @@ -1587,6 +1667,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "syn" version = "1.0.109" diff --git a/Cargo.toml b/Cargo.toml index 7987120..1394c43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ log4rs = "1.2.0" serde = "1.0.188" serde_json = "1.0.105" serde_repr = "0.1.18" +serde_with = "3.5.1" futures-util = "0.3.28" reqwest = { version = "0.11.20", features = [ "json", @@ -46,4 +47,6 @@ time = { version = "0.3.31", features = [ "macros", "serde-well-known", ] } -backoff = { version = "0.4.0", features = ["tokio"] } +backoff = { version = "0.4.0", features = [ + "tokio", +] } diff --git a/log4rs.yaml b/log4rs.yaml index d13de5e..b191c37 100644 --- a/log4rs.yaml +++ b/log4rs.yaml @@ -2,7 +2,7 @@ appenders: stdout: kind: console encoder: - pattern: "{({d} {h({l})} {M}::{L}):65} - {m}{n}" + pattern: "{d} {h({l})} {M}::{L} - {m}{n}" root: level: info diff --git a/src/config.rs b/src/config.rs index e5982ae..906d762 100644 --- a/src/config.rs +++ b/src/config.rs @@ -10,8 +10,11 @@ pub const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets"; pub const ALPACA_CLOCK_API_URL: &str = "https://api.alpaca.markets/v2/clock"; pub const ALPACA_STOCK_DATA_URL: &str = "https://data.alpaca.markets/v2/stocks/bars"; pub const ALPACA_CRYPTO_DATA_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars"; +pub const ALPACA_NEWS_DATA_URL: &str = "https://data.alpaca.markets/v1beta1/news"; + pub const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2"; pub const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; +pub const ALPACA_NEWS_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta1/news"; pub struct Config { pub alpaca_api_key: String, diff --git a/src/data/market.rs b/src/data/market.rs deleted file mode 100644 index 1420d50..0000000 --- a/src/data/market.rs +++ /dev/null @@ -1,449 +0,0 @@ -use crate::{ - config::{Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL}, - data::authenticate_websocket, - database, - types::{ - alpaca::{api, websocket, Source}, - state, Asset, Backfill, Bar, BroadcastMessage, Class, - }, - utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}, -}; -use backoff::{future::retry, ExponentialBackoff}; -use futures_util::{ - stream::{SplitSink, SplitStream}, - SinkExt, StreamExt, -}; -use log::{error, info, warn}; -use serde_json::{from_str, to_string}; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; -use time::OffsetDateTime; -use tokio::{ - net::TcpStream, - spawn, - sync::{broadcast::Sender, Mutex, RwLock}, - task::JoinHandle, - time::sleep, -}; -use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; - -pub struct Guard { - symbols: HashSet, - backfill_jobs: HashMap>, - pending_subscriptions: HashMap, - pending_unsubscriptions: HashMap, -} - -pub async fn run( - app_config: Arc, - class: Class, - broadcast_bus_sender: Sender, -) { - info!("Running live threads for {:?}.", class); - - let websocket_url = match class { - Class::UsEquity => format!( - "{}/{}", - ALPACA_STOCK_WEBSOCKET_URL, app_config.alpaca_source - ), - Class::Crypto => ALPACA_CRYPTO_WEBSOCKET_URL.to_string(), - }; - - let (stream, _) = connect_async(websocket_url).await.unwrap(); - let (mut sink, mut stream) = stream.split(); - authenticate_websocket(&app_config, &mut stream, &mut sink).await; - let sink = Arc::new(Mutex::new(sink)); - - let guard = Arc::new(RwLock::new(Guard { - symbols: HashSet::new(), - backfill_jobs: HashMap::new(), - pending_subscriptions: HashMap::new(), - pending_unsubscriptions: HashMap::new(), - })); - - spawn(broadcast_bus_handler( - app_config.clone(), - class, - sink.clone(), - broadcast_bus_sender.clone(), - guard.clone(), - )); - - spawn(websocket_handler( - app_config.clone(), - stream, - sink, - broadcast_bus_sender.clone(), - guard.clone(), - )); - - let assets = database::assets::select_where_class(&app_config.clickhouse_client, &class).await; - broadcast_bus_sender - .send(BroadcastMessage::Asset(( - state::asset::BroadcastMessage::Add, - assets, - ))) - .unwrap(); -} - -pub async fn broadcast_bus_handler( - app_config: Arc, - class: Class, - sink: Arc>, Message>>>, - broadcast_bus_sender: Sender, - guard: Arc>, -) { - let mut broadcast_bus_receiver = broadcast_bus_sender.subscribe(); - - loop { - let app_config = app_config.clone(); - let sink = sink.clone(); - let broadcast_bus_sender = broadcast_bus_sender.clone(); - let guard = guard.clone(); - let message = broadcast_bus_receiver.recv().await.unwrap(); - - spawn(broadcast_bus_handle_message( - app_config, - class, - sink, - broadcast_bus_sender, - guard, - message, - )); - } -} - -#[allow(clippy::significant_drop_tightening)] -#[allow(clippy::too_many_lines)] -async fn broadcast_bus_handle_message( - app_config: Arc, - class: Class, - sink: Arc>, Message>>>, - broadcast_bus_sender: Sender, - guard: Arc>, - message: BroadcastMessage, -) { - match message { - BroadcastMessage::Asset((action, mut assets)) => { - assets.retain(|asset| asset.class == class); - if assets.is_empty() { - return; - } - - let assets = assets - .into_iter() - .map(|asset| (asset.symbol.clone(), asset)) - .collect::>(); - - let symbols = assets.keys().cloned().collect::>(); - - match action { - state::asset::BroadcastMessage::Add => { - database::assets::upsert_batch( - &app_config.clickhouse_client, - assets.clone().into_values(), - ) - .await; - - let mut guard = guard.write().await; - guard.symbols.extend(symbols.clone()); - guard.pending_subscriptions.extend(assets); - - info!("Added {:?}.", symbols); - - sink.lock() - .await - .send(Message::Text( - to_string(&websocket::data::outgoing::Message::Subscribe( - websocket::data::outgoing::subscribe::Message::new(symbols), - )) - .unwrap(), - )) - .await - .unwrap(); - } - state::asset::BroadcastMessage::Delete => { - database::assets::delete_where_symbols(&app_config.clickhouse_client, &symbols) - .await; - - let mut guard = guard.write().await; - guard.symbols.retain(|symbol| !assets.contains_key(symbol)); - guard.pending_unsubscriptions.extend(assets); - - info!("Deleted {:?}.", symbols); - - sink.lock() - .await - .send(Message::Text( - to_string(&websocket::data::outgoing::Message::Unsubscribe( - websocket::data::outgoing::subscribe::Message::new(symbols), - )) - .unwrap(), - )) - .await - .unwrap(); - } - state::asset::BroadcastMessage::Backfill => { - let guard_clone = guard.clone(); - let mut guard = guard.write().await; - - info!("Creating backfill jobs for {:?}.", symbols); - - for (symbol, asset) in assets { - if let Some(backfill_job) = guard.backfill_jobs.remove(&symbol) { - backfill_job.abort(); - backfill_job.await.unwrap_err(); - } - - guard.backfill_jobs.insert(symbol.clone(), { - let guard = guard_clone.clone(); - let app_config = app_config.clone(); - - spawn(async move { - backfill(app_config, class, asset.clone()).await; - - let mut guard = guard.write().await; - guard.backfill_jobs.remove(&symbol); - }) - }); - } - } - state::asset::BroadcastMessage::Purge => { - let mut guard = guard.write().await; - - info!("Purging {:?}.", symbols); - - for (symbol, _) in assets { - if let Some(backfill_job) = guard.backfill_jobs.remove(&symbol) { - backfill_job.abort(); - backfill_job.await.unwrap_err(); - } - } - - database::backfills::delete_where_symbols( - &app_config.clickhouse_client, - &symbols, - ) - .await; - - database::bars::delete_where_symbols(&app_config.clickhouse_client, &symbols) - .await; - } - } - } - BroadcastMessage::Clock(_) => { - broadcast_bus_sender - .send(BroadcastMessage::Asset(( - state::asset::BroadcastMessage::Backfill, - database::assets::select(&app_config.clickhouse_client).await, - ))) - .unwrap(); - } - } -} - -async fn websocket_handler( - app_config: Arc, - mut stream: SplitStream>>, - sink: Arc>, Message>>>, - broadcast_bus_sender: Sender, - guard: Arc>, -) { - loop { - let app_config = app_config.clone(); - let sink = sink.clone(); - let broadcast_bus_sender = broadcast_bus_sender.clone(); - let guard = guard.clone(); - let message = stream.next().await.expect("Websocket stream closed."); - - spawn(async move { - match message { - Ok(Message::Text(data)) => { - let parsed_data = from_str::>(&data); - - if let Ok(messages) = parsed_data { - for message in messages { - websocket_handle_message( - app_config.clone(), - broadcast_bus_sender.clone(), - guard.clone(), - message, - ) - .await; - } - } else { - error!( - "Unparsed websocket message: {:?}: {}.", - data, - parsed_data.unwrap_err() - ); - } - } - Ok(Message::Ping(_)) => { - sink.lock().await.send(Message::Pong(vec![])).await.unwrap(); - } - _ => error!("Unknown websocket message: {:?}.", message), - } - }); - } -} - -#[allow(clippy::significant_drop_tightening)] -async fn websocket_handle_message( - app_config: Arc, - broadcast_bus_sender: Sender, - guard: Arc>, - message: websocket::data::incoming::Message, -) { - match message { - websocket::data::incoming::Message::Subscription(message) => { - let symbols = message.bars.into_iter().collect::>(); - - let mut guard = guard.write().await; - - let newly_subscribed_assets = guard - .pending_subscriptions - .extract_if(|symbol, _| symbols.contains(symbol)) - .collect::>(); - - if !newly_subscribed_assets.is_empty() { - info!( - "Subscribed to {:?}.", - newly_subscribed_assets.keys().collect::>() - ); - - broadcast_bus_sender - .send(BroadcastMessage::Asset(( - state::asset::BroadcastMessage::Backfill, - newly_subscribed_assets.into_values().collect::>(), - ))) - .unwrap(); - } - - let newly_unsubscribed_assets = guard - .pending_unsubscriptions - .extract_if(|symbol, _| !symbols.contains(symbol)) - .collect::>(); - - if !newly_unsubscribed_assets.is_empty() { - info!( - "Unsubscribed from {:?}.", - newly_unsubscribed_assets.keys().collect::>() - ); - - broadcast_bus_sender - .send(BroadcastMessage::Asset(( - state::asset::BroadcastMessage::Purge, - newly_unsubscribed_assets.into_values().collect::>(), - ))) - .unwrap(); - } - } - websocket::data::incoming::Message::Bars(bar_message) - | websocket::data::incoming::Message::UpdatedBars(bar_message) => { - let bar = Bar::from(bar_message); - - let guard = guard.read().await; - let symbol_status = guard.symbols.get(&bar.symbol); - - if symbol_status.is_none() { - warn!( - "Race condition: received bar for unsubscribed symbol: {:?}.", - bar.symbol - ); - return; - } - - info!("Received bar for {}: {}.", bar.symbol, bar.time); - database::bars::upsert(&app_config.clickhouse_client, &bar).await; - } - websocket::data::incoming::Message::Success(_) => {} - } -} - -pub async fn backfill(app_config: Arc, class: Class, asset: Asset) { - let latest_backfill = database::backfills::select_latest_where_symbol( - &app_config.clickhouse_client, - &asset.symbol, - ) - .await; - - let fetch_from = if let Some(backfill) = latest_backfill { - backfill.time + ONE_MINUTE - } else { - OffsetDateTime::UNIX_EPOCH - }; - - let fetch_until = last_minute(); - if fetch_from > fetch_until { - return; - } - - if app_config.alpaca_source == Source::Iex { - let task_run_delay = duration_until(fetch_until + FIFTEEN_MINUTES + ONE_MINUTE); - info!( - "Queing backfill for {} in {:?}.", - asset.symbol, task_run_delay - ); - sleep(task_run_delay).await; - } - - info!("Running backfill for {}.", asset.symbol); - - let mut bars = Vec::new(); - let mut next_page_token = None; - - loop { - let message = retry(ExponentialBackoff::default(), || async { - app_config.alpaca_rate_limit.until_ready().await; - app_config - .alpaca_client - .get(class.get_data_url()) - .query(&api::outgoing::bar::Bar::new( - vec![asset.symbol.clone()], - ONE_MINUTE, - fetch_from, - fetch_until, - 10000, - next_page_token.clone(), - )) - .send() - .await? - .error_for_status()? - .json::() - .await - .map_err(backoff::Error::Permanent) - }) - .await; - - let message = match message { - Ok(message) => message, - Err(e) => { - error!("Failed to backfill data for {}: {}.", asset.symbol, e); - return; - } - }; - - message.bars.into_iter().for_each(|(symbol, bar_vec)| { - bar_vec.unwrap_or_default().into_iter().for_each(|bar| { - bars.push(Bar::from((bar, symbol.clone()))); - }); - }); - - if message.next_page_token.is_none() { - break; - } - next_page_token = message.next_page_token; - } - - database::bars::upsert_batch(&app_config.clickhouse_client, bars).await; - database::backfills::upsert( - &app_config.clickhouse_client, - &Backfill::new(asset.symbol.clone(), fetch_until), - ) - .await; - - info!("Backfilled data for {}.", asset.symbol); -} diff --git a/src/data/mod.rs b/src/data/mod.rs deleted file mode 100644 index cacd796..0000000 --- a/src/data/mod.rs +++ /dev/null @@ -1,53 +0,0 @@ -pub mod clock; -pub mod market; - -use crate::{config::Config, types::alpaca::websocket}; -use core::panic; -use futures_util::{ - stream::{SplitSink, SplitStream}, - SinkExt, StreamExt, -}; -use serde_json::{from_str, to_string}; -use std::sync::Arc; -use tokio::net::TcpStream; -use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream}; - -async fn authenticate_websocket( - app_config: &Arc, - stream: &mut SplitStream>>, - sink: &mut SplitSink>, Message>, -) { - match stream.next().await { - Some(Ok(Message::Text(data))) - if from_str::>(&data) - .unwrap() - .first() - == Some(&websocket::data::incoming::Message::Success( - websocket::data::incoming::success::Message::Connected, - )) => {} - _ => panic!("Failed to connect to Alpaca websocket."), - } - - sink.send(Message::Text( - to_string(&websocket::data::outgoing::Message::Auth( - websocket::data::outgoing::auth::Message::new( - app_config.alpaca_api_key.clone(), - app_config.alpaca_api_secret.clone(), - ), - )) - .unwrap(), - )) - .await - .unwrap(); - - match stream.next().await { - Some(Ok(Message::Text(data))) - if from_str::>(&data) - .unwrap() - .first() - == Some(&websocket::data::incoming::Message::Success( - websocket::data::incoming::success::Message::Authenticated, - )) => {} - _ => panic!("Failed to authenticate with Alpaca websocket."), - }; -} diff --git a/src/database/assets.rs b/src/database/assets.rs index 19343c2..e5b908e 100644 --- a/src/database/assets.rs +++ b/src/database/assets.rs @@ -1,4 +1,4 @@ -use crate::types::{Asset, Class}; +use crate::types::Asset; use clickhouse::Client; use serde::Serialize; @@ -10,21 +10,13 @@ pub async fn select(clickhouse_client: &Client) -> Vec { .unwrap() } -pub async fn select_where_class(clickhouse_client: &Client, class: &Class) -> Vec { - clickhouse_client - .query("SELECT ?fields FROM assets FINAL WHERE class = ?") - .bind(class) - .fetch_all::() - .await - .unwrap() -} - pub async fn select_where_symbol(clickhouse_client: &Client, symbol: &T) -> Option where T: AsRef + Serialize + Send + Sync, { clickhouse_client - .query("SELECT ?fields FROM assets FINAL WHERE symbol = ?") + .query("SELECT ?fields FROM assets FINAL WHERE symbol = ? OR abbreviation = ?") + .bind(symbol) .bind(symbol) .fetch_optional::() .await diff --git a/src/database/backfills.rs b/src/database/backfills.rs index e1751e9..2ef38fe 100644 --- a/src/database/backfills.rs +++ b/src/database/backfills.rs @@ -1,48 +1,93 @@ -use crate::types::Backfill; +use crate::{database::assets, threads::data::ThreadType, types::Backfill}; use clickhouse::Client; use serde::Serialize; +use tokio::join; pub async fn select_latest_where_symbol( clickhouse_client: &Client, + thread_type: &ThreadType, symbol: &T, ) -> Option where T: AsRef + Serialize + Send + Sync, { clickhouse_client - .query("SELECT ?fields FROM backfills FINAL WHERE symbol = ? ORDER BY time DESC LIMIT 1") + .query(&format!( + "SELECT ?fields FROM {} FINAL WHERE symbol = ? ORDER BY time DESC LIMIT 1", + match thread_type { + ThreadType::Bars(_) => "backfills_bars", + ThreadType::News => "backfills_news", + } + )) .bind(symbol) .fetch_optional::() .await .unwrap() } -pub async fn upsert(clickhouse_client: &Client, backfill: &Backfill) { - let mut insert = clickhouse_client.insert("backfills").unwrap(); +pub async fn upsert(clickhouse_client: &Client, thread_type: &ThreadType, backfill: &Backfill) { + let mut insert = clickhouse_client + .insert(match thread_type { + ThreadType::Bars(_) => "backfills_bars", + ThreadType::News => "backfills_news", + }) + .unwrap(); insert.write(backfill).await.unwrap(); insert.end().await.unwrap(); } -pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &[T]) -where +pub async fn delete_where_symbols( + clickhouse_client: &Client, + thread_type: &ThreadType, + symbols: &[T], +) where T: AsRef + Serialize + Send + Sync, { clickhouse_client - .query("DELETE FROM backfills WHERE symbol IN ?") + .query(&format!( + "DELETE FROM {} WHERE symbol IN ?", + match thread_type { + ThreadType::Bars(_) => "backfills_bars", + ThreadType::News => "backfills_news", + } + )) .bind(symbols) .execute() .await .unwrap(); } -pub async fn delete_where_not_symbols(clickhouse_client: &Client, symbols: &[T]) -where - T: AsRef + Serialize + Send + Sync, -{ - clickhouse_client - .query("DELETE FROM backfills WHERE symbol NOT IN ?") - .bind(symbols) - .execute() - .await - .unwrap(); +pub async fn cleanup(clickhouse_client: &Client) { + let assets = assets::select(clickhouse_client).await; + + let bars_symbols = assets + .clone() + .into_iter() + .map(|asset| asset.symbol) + .collect::>(); + + let news_symbols = assets + .into_iter() + .map(|asset| asset.abbreviation) + .collect::>(); + + let delete_bars_future = async { + clickhouse_client + .query("DELETE FROM backfills_bars WHERE symbol NOT IN ?") + .bind(bars_symbols) + .execute() + .await + .unwrap(); + }; + + let delete_news_future = async { + clickhouse_client + .query("DELETE FROM backfills_news WHERE symbol NOT IN ?") + .bind(news_symbols) + .execute() + .await + .unwrap(); + }; + + join!(delete_bars_future, delete_news_future); } diff --git a/src/database/bars.rs b/src/database/bars.rs index c67185b..7f739da 100644 --- a/src/database/bars.rs +++ b/src/database/bars.rs @@ -1,3 +1,4 @@ +use super::assets; use crate::types::Bar; use clickhouse::Client; use serde::Serialize; @@ -32,10 +33,14 @@ where .unwrap(); } -pub async fn delete_where_not_symbols(clickhouse_client: &Client, symbols: &[T]) -where - T: AsRef + Serialize + Send + Sync, -{ +pub async fn cleanup(clickhouse_client: &Client) { + let assets = assets::select(clickhouse_client).await; + + let symbols = assets + .into_iter() + .map(|asset| asset.symbol) + .collect::>(); + clickhouse_client .query("DELETE FROM bars WHERE symbol NOT IN ?") .bind(symbols) diff --git a/src/database/mod.rs b/src/database/mod.rs index 4768255..792108d 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,3 +1,4 @@ pub mod assets; pub mod backfills; pub mod bars; +pub mod news; diff --git a/src/database/news.rs b/src/database/news.rs new file mode 100644 index 0000000..52fc1e9 --- /dev/null +++ b/src/database/news.rs @@ -0,0 +1,50 @@ +use super::assets; +use crate::types::News; +use clickhouse::Client; +use serde::Serialize; + +pub async fn upsert(clickhouse_client: &Client, news: &News) { + let mut insert = clickhouse_client.insert("news").unwrap(); + insert.write(news).await.unwrap(); + insert.end().await.unwrap(); +} + +pub async fn upsert_batch(clickhouse_client: &Client, news: T) +where + T: IntoIterator + Send + Sync, + T::IntoIter: Send, +{ + let mut insert = clickhouse_client.insert("news").unwrap(); + for news in news { + insert.write(&news).await.unwrap(); + } + insert.end().await.unwrap(); +} + +pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &[T]) +where + T: AsRef + Serialize + Send + Sync, +{ + clickhouse_client + .query("DELETE FROM news WHERE hasAny(symbols, ?)") + .bind(symbols) + .execute() + .await + .unwrap(); +} + +pub async fn cleanup(clickhouse_client: &Client) { + let assets = assets::select(clickhouse_client).await; + + let symbols = assets + .into_iter() + .map(|asset| asset.abbreviation) + .collect::>(); + + clickhouse_client + .query("DELETE FROM news WHERE NOT hasAny(symbols, ?)") + .bind(symbols) + .execute() + .await + .unwrap(); +} diff --git a/src/main.rs b/src/main.rs index 499d507..25667cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,9 +3,9 @@ #![feature(hash_extract_if)] mod config; -mod data; mod database; mod routes; +mod threads; mod types; mod utils; @@ -13,8 +13,7 @@ use crate::utils::cleanup; use config::Config; use dotenv::dotenv; use log4rs::config::Deserializers; -use tokio::{spawn, sync::broadcast}; -use types::{BroadcastMessage, Class}; +use tokio::{spawn, sync::mpsc}; #[tokio::main] async fn main() { @@ -24,21 +23,27 @@ async fn main() { cleanup(&app_config.clickhouse_client).await; - let (broadcast_bus, _) = broadcast::channel::(100); + let (asset_status_sender, asset_status_receiver) = + mpsc::channel::(100); + let (clock_sender, clock_receiver) = mpsc::channel::(1); - spawn(data::market::run( + spawn(threads::data::run( app_config.clone(), - Class::UsEquity, - broadcast_bus.clone(), + asset_status_receiver, + clock_receiver, )); - spawn(data::market::run( - app_config.clone(), - Class::Crypto, - broadcast_bus.clone(), - )); + spawn(threads::clock::run(app_config.clone(), clock_sender)); - spawn(data::clock::run(app_config.clone(), broadcast_bus.clone())); + let assets = database::assets::select(&app_config.clickhouse_client).await; - routes::run(app_config, broadcast_bus).await; + let (asset_status_message, asset_status_receiver) = + threads::data::asset_status::Message::new(threads::data::asset_status::Action::Add, assets); + asset_status_sender + .send(asset_status_message) + .await + .unwrap(); + asset_status_receiver.await.unwrap(); + + routes::run(app_config, asset_status_sender).await; } diff --git a/src/routes/assets.rs b/src/routes/assets.rs index 7f79198..681311a 100644 --- a/src/routes/assets.rs +++ b/src/routes/assets.rs @@ -1,9 +1,8 @@ use crate::{ config::{Config, ALPACA_ASSET_API_URL}, - database, + database, threads, types::{ alpaca::api::incoming::{self, asset::Status}, - state::{self, BroadcastMessage}, Asset, }, }; @@ -13,7 +12,7 @@ use core::panic; use http::StatusCode; use serde::Deserialize; use std::sync::Arc; -use tokio::sync::broadcast::Sender; +use tokio::sync::mpsc; pub async fn get( Extension(app_config): Extension>, @@ -39,7 +38,7 @@ pub struct AddAssetRequest { pub async fn add( Extension(app_config): Extension>, - Extension(broadcast_bus_sender): Extension>, + Extension(asset_status_sender): Extension>, Json(request): Json, ) -> Result<(StatusCode, Json), StatusCode> { if database::assets::select_where_symbol(&app_config.clickhouse_client, &request.symbol) @@ -77,31 +76,39 @@ pub async fn add( let asset = Asset::from(asset); - broadcast_bus_sender - .send(BroadcastMessage::Asset(( - state::asset::BroadcastMessage::Add, - vec![asset.clone()], - ))) + let (asset_status_message, asset_status_response) = threads::data::asset_status::Message::new( + threads::data::asset_status::Action::Add, + vec![asset.clone()], + ); + + asset_status_sender + .send(asset_status_message) + .await .unwrap(); + asset_status_response.await.unwrap(); Ok((StatusCode::CREATED, Json(asset))) } pub async fn delete( Extension(app_config): Extension>, - Extension(broadcast_bus_sender): Extension>, + Extension(asset_status_sender): Extension>, Path(symbol): Path, ) -> Result { let asset = database::assets::select_where_symbol(&app_config.clickhouse_client, &symbol) .await .ok_or(StatusCode::NOT_FOUND)?; - broadcast_bus_sender - .send(BroadcastMessage::Asset(( - state::asset::BroadcastMessage::Delete, - vec![asset], - ))) + let (asset_status_message, asset_status_response) = threads::data::asset_status::Message::new( + threads::data::asset_status::Action::Remove, + vec![asset], + ); + + asset_status_sender + .send(asset_status_message) + .await .unwrap(); + asset_status_response.await.unwrap(); Ok(StatusCode::NO_CONTENT) } diff --git a/src/routes/mod.rs b/src/routes/mod.rs index d8e4593..47ec066 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,22 +1,25 @@ -use crate::{config::Config, types::BroadcastMessage}; +pub mod assets; + +use crate::{config::Config, threads}; use axum::{ routing::{delete, get, post}, serve, Extension, Router, }; use log::info; use std::{net::SocketAddr, sync::Arc}; -use tokio::{net::TcpListener, sync::broadcast::Sender}; +use tokio::{net::TcpListener, sync::mpsc}; -pub mod assets; - -pub async fn run(app_config: Arc, broadcast_sender: Sender) { +pub async fn run( + app_config: Arc, + asset_status_sender: mpsc::Sender, +) { let app = Router::new() .route("/assets", get(assets::get)) .route("/assets/:symbol", get(assets::get_where_symbol)) .route("/assets", post(assets::add)) .route("/assets/:symbol", delete(assets::delete)) .layer(Extension(app_config)) - .layer(Extension(broadcast_sender)); + .layer(Extension(asset_status_sender)); let addr = SocketAddr::from(([0, 0, 0, 0], 7878)); let listener = TcpListener::bind(addr).await.unwrap(); diff --git a/src/data/clock.rs b/src/threads/clock.rs similarity index 56% rename from src/data/clock.rs rename to src/threads/clock.rs index ac504e1..34b7853 100644 --- a/src/data/clock.rs +++ b/src/threads/clock.rs @@ -1,17 +1,41 @@ use crate::{ config::{Config, ALPACA_CLOCK_API_URL}, - types::{ - alpaca, - state::{self, BroadcastMessage}, - }, + types::alpaca, utils::duration_until, }; use backoff::{future::retry, ExponentialBackoff}; use log::info; use std::sync::Arc; -use tokio::{sync::broadcast::Sender, time::sleep}; +use time::OffsetDateTime; +use tokio::{sync::mpsc, time::sleep}; -pub async fn run(app_config: Arc, broadcast_bus_sender: Sender) { +pub enum Status { + Open, + Closed, +} + +pub struct Message { + pub status: Status, + pub next_switch: OffsetDateTime, +} + +impl From for Message { + fn from(clock: alpaca::api::incoming::clock::Clock) -> Self { + if clock.is_open { + Self { + status: Status::Open, + next_switch: clock.next_close, + } + } else { + Self { + status: Status::Closed, + next_switch: clock.next_open, + } + } + } +} + +pub async fn run(app_config: Arc, clock_sender: mpsc::Sender) { loop { let clock = retry(ExponentialBackoff::default(), || async { app_config.alpaca_rate_limit.until_ready().await; @@ -37,13 +61,6 @@ pub async fn run(app_config: Arc, broadcast_bus_sender: Sender, + pub response: oneshot::Sender<()>, +} + +impl Message { + pub fn new(action: Action, assets: Vec) -> (Self, oneshot::Receiver<()>) { + let (sender, receiver) = oneshot::channel::<()>(); + ( + Self { + action, + assets, + response: sender, + }, + receiver, + ) + } +} + +pub async fn run( + app_config: Arc, + thread_type: ThreadType, + guard: Arc>, + mut asset_status_receiver: mpsc::Receiver, + websocket_sender: Arc< + Mutex>, tungstenite::Message>>, + >, +) { + loop { + let app_config = app_config.clone(); + let guard = guard.clone(); + let websocket_sender = websocket_sender.clone(); + + let message = asset_status_receiver.recv().await.unwrap(); + + spawn(handle_asset_status_message( + app_config, + thread_type, + guard, + websocket_sender, + message, + )); + } +} + +#[allow(clippy::significant_drop_tightening)] +async fn handle_asset_status_message( + app_config: Arc, + thread_type: ThreadType, + guard: Arc>, + websocket_sender: Arc< + Mutex>, tungstenite::Message>>, + >, + message: Message, +) { + let symbols = message + .assets + .clone() + .into_iter() + .map(|asset| match thread_type { + ThreadType::Bars(_) => asset.symbol, + ThreadType::News => asset.abbreviation, + }) + .collect::>(); + + match message.action { + Action::Add => { + let mut guard = guard.write().await; + + guard.symbols.extend(symbols.clone()); + guard + .pending_subscriptions + .extend(symbols.clone().into_iter().zip(message.assets.clone())); + + info!("{:?} - Added {:?}.", thread_type, symbols); + + let database_future = async { + if matches!(thread_type, ThreadType::Bars(_)) { + database::assets::upsert_batch(&app_config.clickhouse_client, message.assets) + .await; + } + }; + + let websocket_future = async move { + websocket_sender + .lock() + .await + .send(tungstenite::Message::Text( + to_string(&websocket::outgoing::Message::Subscribe( + websocket_market_message_factory(thread_type, symbols), + )) + .unwrap(), + )) + .await + .unwrap(); + }; + + join!(database_future, websocket_future); + } + Action::Remove => { + let mut guard = guard.write().await; + + guard.symbols.retain(|symbol| !symbols.contains(symbol)); + guard + .pending_unsubscriptions + .extend(symbols.clone().into_iter().zip(message.assets.clone())); + + info!("{:?} - Removed {:?}.", thread_type, symbols); + + let sybols_clone = symbols.clone(); + let database_future = database::assets::delete_where_symbols( + &app_config.clickhouse_client, + &sybols_clone, + ); + + let websocket_future = async move { + websocket_sender + .lock() + .await + .send(tungstenite::Message::Text( + to_string(&websocket::outgoing::Message::Unsubscribe( + websocket_market_message_factory(thread_type, symbols), + )) + .unwrap(), + )) + .await + .unwrap(); + }; + + join!(database_future, websocket_future); + } + } + + message.response.send(()).unwrap(); +} + +fn websocket_market_message_factory( + thread_type: ThreadType, + symbols: Vec, +) -> websocket::outgoing::subscribe::Message { + match thread_type { + ThreadType::Bars(_) => websocket::outgoing::subscribe::Message::Market( + websocket::outgoing::subscribe::MarketMessage::new(symbols), + ), + ThreadType::News => websocket::outgoing::subscribe::Message::News( + websocket::outgoing::subscribe::NewsMessage::new(symbols), + ), + } +} diff --git a/src/threads/data/backfill.rs b/src/threads/data/backfill.rs new file mode 100644 index 0000000..ff7fae0 --- /dev/null +++ b/src/threads/data/backfill.rs @@ -0,0 +1,374 @@ +use super::{Guard, ThreadType}; +use crate::{ + config::{Config, ALPACA_CRYPTO_DATA_URL, ALPACA_NEWS_DATA_URL, ALPACA_STOCK_DATA_URL}, + database, + types::{ + alpaca::{api, Source}, + Asset, Bar, Class, News, Subset, + }, + utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}, +}; +use backoff::{future::retry, ExponentialBackoff}; +use log::{error, info}; +use std::{collections::HashMap, sync::Arc}; +use time::OffsetDateTime; +use tokio::{ + join, spawn, + sync::{mpsc, oneshot, Mutex, RwLock}, + task::JoinHandle, + time::sleep, +}; + +pub enum Action { + Backfill, + Purge, +} + +pub struct Message { + pub action: Action, + pub assets: Subset, + pub response: oneshot::Sender<()>, +} + +impl Message { + pub fn new(action: Action, assets: Subset) -> (Self, oneshot::Receiver<()>) { + let (sender, receiver) = oneshot::channel::<()>(); + ( + Self { + action, + assets, + response: sender, + }, + receiver, + ) + } +} + +pub async fn run( + app_config: Arc, + thread_type: ThreadType, + guard: Arc>, + mut backfill_receiver: mpsc::Receiver, +) { + let backfill_jobs = Arc::new(Mutex::new(HashMap::new())); + + let data_url = match thread_type { + ThreadType::Bars(Class::UsEquity) => ALPACA_STOCK_DATA_URL.to_string(), + ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_DATA_URL.to_string(), + ThreadType::News => ALPACA_NEWS_DATA_URL.to_string(), + }; + + loop { + let app_config = app_config.clone(); + let guard = guard.clone(); + let backfill_jobs = backfill_jobs.clone(); + let data_url = data_url.clone(); + + let message = backfill_receiver.recv().await.unwrap(); + + spawn(handle_backfill_message( + app_config, + thread_type, + guard, + data_url, + backfill_jobs, + message, + )); + } +} + +#[allow(clippy::significant_drop_tightening)] +#[allow(clippy::too_many_lines)] +async fn handle_backfill_message( + app_config: Arc, + thread_type: ThreadType, + guard: Arc>, + data_url: String, + backfill_jobs: Arc>>>, + message: Message, +) { + let guard = guard.read().await; + let mut backfill_jobs = backfill_jobs.lock().await; + + let symbols = match message.assets { + Subset::All => guard.symbols.clone().into_iter().collect::>(), + Subset::Some(assets) => assets + .into_iter() + .map(|asset| match thread_type { + ThreadType::Bars(_) => asset.symbol, + ThreadType::News => asset.abbreviation, + }) + .filter(|symbol| match message.action { + Action::Backfill => guard.symbols.contains(symbol), + Action::Purge => !guard.symbols.contains(symbol), + }) + .collect::>(), + }; + + match message.action { + Action::Backfill => { + for symbol in symbols { + if let Some(job) = backfill_jobs.remove(&symbol) { + if !job.is_finished() { + job.abort(); + } + job.await.unwrap_err(); + } + + let app_config = app_config.clone(); + let data_url = data_url.clone(); + + backfill_jobs.insert( + symbol.clone(), + spawn(async move { + let (fetch_from, fetch_to) = + queue_backfill(&app_config, thread_type, &symbol).await; + + match thread_type { + ThreadType::Bars(_) => { + execute_backfill_bars( + app_config, + thread_type, + data_url, + symbol, + fetch_from, + fetch_to, + ) + .await; + } + ThreadType::News => { + execute_backfill_news( + app_config, + thread_type, + data_url, + symbol, + fetch_from, + fetch_to, + ) + .await; + } + } + }), + ); + } + } + Action::Purge => { + for symbol in &symbols { + if let Some(job) = backfill_jobs.remove(symbol) { + if !job.is_finished() { + job.abort(); + } + job.await.unwrap_err(); + } + } + + let backfills_future = database::backfills::delete_where_symbols( + &app_config.clickhouse_client, + &thread_type, + &symbols, + ); + + let data_future = async { + match thread_type { + ThreadType::Bars(_) => { + database::bars::delete_where_symbols( + &app_config.clickhouse_client, + &symbols, + ) + .await; + } + ThreadType::News => { + database::news::delete_where_symbols( + &app_config.clickhouse_client, + &symbols, + ) + .await; + } + } + }; + + join!(backfills_future, data_future); + } + } + + message.response.send(()).unwrap(); +} + +async fn queue_backfill( + app_config: &Arc, + thread_type: ThreadType, + symbol: &String, +) -> (OffsetDateTime, OffsetDateTime) { + let latest_backfill = database::backfills::select_latest_where_symbol( + &app_config.clickhouse_client, + &thread_type, + &symbol, + ) + .await; + + let fetch_from = latest_backfill + .as_ref() + .map_or(OffsetDateTime::UNIX_EPOCH, |backfill| { + backfill.time + ONE_MINUTE + }); + + let fetch_to = last_minute(); + + if app_config.alpaca_source == Source::Iex { + let run_delay = duration_until(fetch_to + FIFTEEN_MINUTES + ONE_MINUTE); + info!( + "{:?} - Queing backfill for {} in {:?}.", + thread_type, symbol, run_delay + ); + sleep(run_delay).await; + } + + (fetch_from, fetch_to) +} + +async fn execute_backfill_bars( + app_config: Arc, + thread_type: ThreadType, + data_url: String, + symbol: String, + fetch_from: OffsetDateTime, + fetch_to: OffsetDateTime, +) { + if fetch_from > fetch_to { + return; + } + + info!("{:?} - Backfilling data for {}.", thread_type, symbol); + + let mut bars = Vec::new(); + let mut next_page_token = None; + + loop { + let message = retry(ExponentialBackoff::default(), || async { + app_config.alpaca_rate_limit.until_ready().await; + app_config + .alpaca_client + .get(&data_url) + .query(&api::outgoing::bar::Bar::new( + vec![symbol.clone()], + ONE_MINUTE, + fetch_from, + fetch_to, + 10000, + next_page_token.clone(), + )) + .send() + .await? + .error_for_status()? + .json::() + .await + .map_err(backoff::Error::Permanent) + }) + .await; + + let message = match message { + Ok(message) => message, + Err(e) => { + error!( + "{:?} - Failed to backfill data for {}: {}.", + thread_type, symbol, e + ); + return; + } + }; + + message.bars.into_iter().for_each(|(symbol, bar_vec)| { + for bar in bar_vec { + bars.push(Bar::from((bar, symbol.clone()))); + } + }); + + if message.next_page_token.is_none() { + break; + } + next_page_token = message.next_page_token; + } + + if bars.is_empty() { + return; + } + + let backfill = bars.last().unwrap().clone().into(); + database::bars::upsert_batch(&app_config.clickhouse_client, bars).await; + database::backfills::upsert(&app_config.clickhouse_client, &thread_type, &backfill).await; + + info!("{:?} - Backfilled data for {}.", thread_type, symbol); +} + +async fn execute_backfill_news( + app_config: Arc, + thread_type: ThreadType, + data_url: String, + symbol: String, + fetch_from: OffsetDateTime, + fetch_to: OffsetDateTime, +) { + if fetch_from > fetch_to { + return; + } + + info!("{:?} - Backfilling data for {}.", thread_type, symbol); + + let mut news = Vec::new(); + let mut next_page_token = None; + + loop { + let message = retry(ExponentialBackoff::default(), || async { + app_config.alpaca_rate_limit.until_ready().await; + app_config + .alpaca_client + .get(&data_url) + .query(&api::outgoing::news::News::new( + vec![symbol.clone()], + fetch_from, + fetch_to, + 50, + true, + false, + next_page_token.clone(), + )) + .send() + .await? + .error_for_status()? + .json::() + .await + .map_err(backoff::Error::Permanent) + }) + .await; + + let message = match message { + Ok(message) => message, + Err(e) => { + error!( + "{:?} - Failed to backfill data for {}: {}.", + thread_type, symbol, e + ); + return; + } + }; + + message.news.into_iter().for_each(|news_item| { + news.push(News::from(news_item)); + }); + + if message.next_page_token.is_none() { + break; + } + next_page_token = message.next_page_token; + } + + if news.is_empty() { + return; + } + + let backfill = (news.last().unwrap().clone(), symbol.clone()).into(); + database::news::upsert_batch(&app_config.clickhouse_client, news).await; + database::backfills::upsert(&app_config.clickhouse_client, &thread_type, &backfill).await; + + info!("{:?} - Backfilled data for {}.", thread_type, symbol); +} diff --git a/src/threads/data/mod.rs b/src/threads/data/mod.rs new file mode 100644 index 0000000..d54a56a --- /dev/null +++ b/src/threads/data/mod.rs @@ -0,0 +1,233 @@ +pub mod asset_status; +pub mod backfill; +pub mod websocket; + +use super::clock; +use crate::{ + config::{ + Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_NEWS_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL, + }, + types::{Asset, Class, Subset}, + utils::authenticate, +}; +use futures_util::StreamExt; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +use tokio::{ + join, select, spawn, + sync::{mpsc, Mutex, RwLock}, +}; +use tokio_tungstenite::connect_async; + +pub struct Guard { + pub symbols: HashSet, + pub pending_subscriptions: HashMap, + pub pending_unsubscriptions: HashMap, +} + +impl Guard { + pub fn new() -> Self { + Self { + symbols: HashSet::new(), + pending_subscriptions: HashMap::new(), + pending_unsubscriptions: HashMap::new(), + } + } +} + +#[derive(Clone, Copy, Debug)] +pub enum ThreadType { + Bars(Class), + News, +} + +pub async fn run( + app_config: Arc, + mut asset_receiver: mpsc::Receiver, + mut clock_receiver: mpsc::Receiver, +) { + let (bars_us_equity_asset_status_sender, bars_us_equity_backfill_sender) = + init_thread(app_config.clone(), ThreadType::Bars(Class::UsEquity)).await; + let (bars_crypto_asset_status_sender, bars_crypto_backfill_sender) = + init_thread(app_config.clone(), ThreadType::Bars(Class::Crypto)).await; + let (news_asset_status_sender, news_backfill_sender) = + init_thread(app_config.clone(), ThreadType::News).await; + + loop { + select! { + Some(asset_message) = asset_receiver.recv() => { + let bars_us_equity_asset_status_sender = bars_us_equity_asset_status_sender.clone(); + let bars_crypto_asset_status_sender = bars_crypto_asset_status_sender.clone(); + let news_asset_status_sender = news_asset_status_sender.clone(); + + spawn(handle_asset_message( + bars_us_equity_asset_status_sender, + bars_crypto_asset_status_sender, + news_asset_status_sender, + asset_message, + )); + } + Some(_) = clock_receiver.recv() => { + let bars_us_equity_backfill_sender = bars_us_equity_backfill_sender.clone(); + let bars_crypto_backfill_sender = bars_crypto_backfill_sender.clone(); + let news_backfill_sender = news_backfill_sender.clone(); + + spawn(handle_clock_message( + bars_us_equity_backfill_sender, + bars_crypto_backfill_sender, + news_backfill_sender, + )); + } + else => { + panic!("Communication channel unexpectedly closed.") + } + } + } +} + +async fn init_thread( + app_config: Arc, + thread_type: ThreadType, +) -> ( + mpsc::Sender, + mpsc::Sender, +) { + let guard = Arc::new(RwLock::new(Guard::new())); + + let websocket_url = match thread_type { + ThreadType::Bars(Class::UsEquity) => format!( + "{}/{}", + ALPACA_STOCK_WEBSOCKET_URL, &app_config.alpaca_source + ), + ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_WEBSOCKET_URL.into(), + ThreadType::News => ALPACA_NEWS_WEBSOCKET_URL.into(), + }; + + let (websocket, _) = connect_async(websocket_url).await.unwrap(); + let (mut websocket_sender, mut websocket_receiver) = websocket.split(); + authenticate(&app_config, &mut websocket_sender, &mut websocket_receiver).await; + let websocket_sender = Arc::new(Mutex::new(websocket_sender)); + + let (asset_status_sender, asset_status_receiver) = mpsc::channel(100); + spawn(asset_status::run( + app_config.clone(), + thread_type, + guard.clone(), + asset_status_receiver, + websocket_sender.clone(), + )); + + let (backfill_sender, backfill_receiver) = mpsc::channel(100); + spawn(backfill::run( + app_config.clone(), + thread_type, + guard.clone(), + backfill_receiver, + )); + + spawn(websocket::run( + app_config.clone(), + thread_type, + guard.clone(), + websocket_sender, + websocket_receiver, + backfill_sender.clone(), + )); + + (asset_status_sender, backfill_sender) +} + +async fn handle_asset_message( + bars_us_equity_asset_status_sender: mpsc::Sender, + bars_crypto_asset_status_sender: mpsc::Sender, + news_asset_status_sender: mpsc::Sender, + asset_status_message: asset_status::Message, +) { + let (us_equity_assets, crypto_assets): (Vec<_>, Vec<_>) = asset_status_message + .assets + .clone() + .into_iter() + .partition(|asset| asset.class == Class::UsEquity); + + let bars_us_equity_future = async { + if !us_equity_assets.is_empty() { + let (bars_us_equity_asset_status_message, bars_us_equity_asset_status_receiver) = + asset_status::Message::new(asset_status_message.action.clone(), us_equity_assets); + bars_us_equity_asset_status_sender + .send(bars_us_equity_asset_status_message) + .await + .unwrap(); + bars_us_equity_asset_status_receiver.await.unwrap(); + } + }; + + let bars_crypto_future = async { + if !crypto_assets.is_empty() { + let (crypto_asset_status_message, crypto_asset_status_receiver) = + asset_status::Message::new(asset_status_message.action.clone(), crypto_assets); + bars_crypto_asset_status_sender + .send(crypto_asset_status_message) + .await + .unwrap(); + crypto_asset_status_receiver.await.unwrap(); + } + }; + + let news_future = async { + if !asset_status_message.assets.is_empty() { + let (news_asset_status_message, news_asset_status_receiver) = + asset_status::Message::new( + asset_status_message.action.clone(), + asset_status_message.assets, + ); + news_asset_status_sender + .send(news_asset_status_message) + .await + .unwrap(); + news_asset_status_receiver.await.unwrap(); + } + }; + + join!(bars_us_equity_future, bars_crypto_future, news_future); + asset_status_message.response.send(()).unwrap(); +} + +async fn handle_clock_message( + bars_us_equity_backfill_sender: mpsc::Sender, + bars_crypto_backfill_sender: mpsc::Sender, + news_backfill_sender: mpsc::Sender, +) { + let bars_us_equity_future = async { + let (bars_us_equity_backfill_message, bars_us_equity_backfill_receiver) = + backfill::Message::new(backfill::Action::Backfill, Subset::All); + bars_us_equity_backfill_sender + .send(bars_us_equity_backfill_message) + .await + .unwrap(); + bars_us_equity_backfill_receiver.await.unwrap(); + }; + + let bars_crypto_future = async { + let (bars_crypto_backfill_message, bars_crypto_backfill_receiver) = + backfill::Message::new(backfill::Action::Backfill, Subset::All); + bars_crypto_backfill_sender + .send(bars_crypto_backfill_message) + .await + .unwrap(); + bars_crypto_backfill_receiver.await.unwrap(); + }; + + let news_future = async { + let (news_backfill_message, news_backfill_receiver) = + backfill::Message::new(backfill::Action::Backfill, Subset::All); + news_backfill_sender + .send(news_backfill_message) + .await + .unwrap(); + news_backfill_receiver.await.unwrap(); + }; + + join!(bars_us_equity_future, bars_crypto_future, news_future); +} diff --git a/src/threads/data/websocket.rs b/src/threads/data/websocket.rs new file mode 100644 index 0000000..bae568c --- /dev/null +++ b/src/threads/data/websocket.rs @@ -0,0 +1,217 @@ +use super::{backfill, Guard, ThreadType}; +use crate::{ + config::Config, + database, + types::{alpaca::websocket, Bar, News, Subset}, +}; +use futures_util::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use log::{error, info, warn}; +use serde_json::from_str; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +use tokio::{ + join, + net::TcpStream, + spawn, + sync::{mpsc, Mutex, RwLock}, +}; +use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; + +pub async fn run( + app_config: Arc, + thread_type: ThreadType, + guard: Arc>, + websocket_sender: Arc< + Mutex>, tungstenite::Message>>, + >, + mut websocket_receiver: SplitStream>>, + backfill_sender: mpsc::Sender, +) { + loop { + let app_config = app_config.clone(); + let guard = guard.clone(); + let websocket_sender = websocket_sender.clone(); + let backfill_sender = backfill_sender.clone(); + + let message = websocket_receiver.next().await.unwrap().unwrap(); + + spawn(handle_websocket_message( + app_config, + thread_type, + guard, + websocket_sender, + backfill_sender, + message, + )); + } +} + +async fn handle_websocket_message( + app_config: Arc, + thread_type: ThreadType, + guard: Arc>, + websocket_sender: Arc< + Mutex>, tungstenite::Message>>, + >, + backfill_sender: mpsc::Sender, + message: tungstenite::Message, +) { + match message { + tungstenite::Message::Text(message) => { + let message = from_str::>(&message); + + if let Ok(message) = message { + for message in message { + let app_config = app_config.clone(); + let guard = guard.clone(); + let backfill_sender = backfill_sender.clone(); + + spawn(handle_parsed_websocket_message( + app_config, + thread_type, + guard, + backfill_sender, + message, + )); + } + } else { + error!( + "{:?} - Failed to deserialize websocket message: {:?}", + thread_type, message + ); + } + } + tungstenite::Message::Ping(_) => { + websocket_sender + .lock() + .await + .send(tungstenite::Message::Pong(vec![])) + .await + .unwrap(); + } + _ => error!( + "{:?} - Unexpected websocket message: {:?}", + thread_type, message + ), + } +} + +#[allow(clippy::significant_drop_tightening)] +async fn handle_parsed_websocket_message( + app_config: Arc, + thread_type: ThreadType, + guard: Arc>, + backfill_sender: mpsc::Sender, + message: websocket::incoming::Message, +) { + match message { + websocket::incoming::Message::Subscription(message) => { + let symbols = match message { + websocket::incoming::subscription::Message::Market(message) => message.bars, + websocket::incoming::subscription::Message::News(message) => message.news, + }; + + let mut guard = guard.write().await; + + let newly_subscribed = guard + .pending_subscriptions + .extract_if(|symbol, _| symbols.contains(symbol)) + .collect::>(); + + let newly_unsubscribed = guard + .pending_unsubscriptions + .extract_if(|symbol, _| !symbols.contains(symbol)) + .collect::>(); + + drop(guard); + + let newly_subscribed_future = async { + if !newly_subscribed.is_empty() { + info!( + "{:?} - Subscribed to {:?}.", + thread_type, + newly_subscribed.keys().collect::>() + ); + + let (backfill_message, backfill_receiver) = backfill::Message::new( + backfill::Action::Backfill, + Subset::Some(newly_subscribed.into_values().collect::>()), + ); + + backfill_sender.send(backfill_message).await.unwrap(); + backfill_receiver.await.unwrap(); + } + }; + + let newly_unsubscribed_future = async { + if !newly_unsubscribed.is_empty() { + info!( + "{:?} - Unsubscribed from {:?}.", + thread_type, + newly_unsubscribed.keys().collect::>() + ); + + let (purge_message, purge_receiver) = backfill::Message::new( + backfill::Action::Purge, + Subset::Some(newly_unsubscribed.into_values().collect::>()), + ); + + backfill_sender.send(purge_message).await.unwrap(); + purge_receiver.await.unwrap(); + } + }; + + join!(newly_subscribed_future, newly_unsubscribed_future); + } + websocket::incoming::Message::Bar(message) + | websocket::incoming::Message::UpdatedBar(message) => { + let bar = Bar::from(message); + + let guard = guard.read().await; + if guard.symbols.get(&bar.symbol).is_none() { + warn!( + "{:?} - Race condition: received bar for unsubscribed symbol: {:?}.", + thread_type, bar.symbol + ); + return; + } + + info!( + "{:?} - Received bar for {}: {}.", + thread_type, bar.symbol, bar.time + ); + database::bars::upsert(&app_config.clickhouse_client, &bar).await; + } + websocket::incoming::Message::News(message) => { + let news = News::from(message); + let symbols = news.symbols.clone().into_iter().collect::>(); + + let guard = guard.read().await; + if !guard.symbols.iter().any(|symbol| symbols.contains(symbol)) { + warn!( + "{:?} - Race condition: received news for unsubscribed symbols: {:?}.", + thread_type, news.symbols + ); + return; + } + + info!( + "{:?} - Received news for {:?}: {}.", + thread_type, news.symbols, news.time_created + ); + database::news::upsert(&app_config.clickhouse_client, &news).await; + } + websocket::incoming::Message::Success(_) => {} + websocket::incoming::Message::Error(message) => { + error!( + "{:?} - Received error message: {}.", + thread_type, message.message + ); + } + } +} diff --git a/src/threads/mod.rs b/src/threads/mod.rs new file mode 100644 index 0000000..5f09b94 --- /dev/null +++ b/src/threads/mod.rs @@ -0,0 +1,2 @@ +pub mod clock; +pub mod data; diff --git a/src/types/algebraic/mod.rs b/src/types/algebraic/mod.rs new file mode 100644 index 0000000..192af7d --- /dev/null +++ b/src/types/algebraic/mod.rs @@ -0,0 +1,3 @@ +pub mod subset; + +pub use subset::Subset; diff --git a/src/types/algebraic/subset.rs b/src/types/algebraic/subset.rs new file mode 100644 index 0000000..5a902f3 --- /dev/null +++ b/src/types/algebraic/subset.rs @@ -0,0 +1,5 @@ +#[derive(Clone, Debug)] +pub enum Subset { + Some(Vec), + All, +} diff --git a/src/types/alpaca/api/incoming/asset.rs b/src/types/alpaca/api/incoming/asset.rs index 03cfe04..8f59ca8 100644 --- a/src/types/alpaca/api/incoming/asset.rs +++ b/src/types/alpaca/api/incoming/asset.rs @@ -1,4 +1,4 @@ -use crate::types::alpaca::api::impl_from_enum; +use crate::types::{self, alpaca::api::impl_from_enum}; use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -8,7 +8,7 @@ pub enum Class { Crypto, } -impl_from_enum!(crate::types::Class, Class, UsEquity, Crypto); +impl_from_enum!(types::Class, Class, UsEquity, Crypto); #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "UPPERCASE")] @@ -24,7 +24,7 @@ pub enum Exchange { } impl_from_enum!( - crate::types::Exchange, + types::Exchange, Exchange, Amex, Arca, @@ -61,10 +61,11 @@ pub struct Asset { pub attributes: Option>, } -impl From for crate::types::Asset { +impl From for types::Asset { fn from(item: Asset) -> Self { Self { - symbol: item.symbol, + symbol: item.symbol.clone(), + abbreviation: item.symbol.replace('/', ""), class: item.class.into(), exchange: item.exchange.into(), time_added: time::OffsetDateTime::now_utc(), diff --git a/src/types/alpaca/api/incoming/bar.rs b/src/types/alpaca/api/incoming/bar.rs index ae66841..e078d38 100644 --- a/src/types/alpaca/api/incoming/bar.rs +++ b/src/types/alpaca/api/incoming/bar.rs @@ -1,3 +1,4 @@ +use crate::types; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use time::OffsetDateTime; @@ -23,7 +24,7 @@ pub struct Bar { pub vwap: f64, } -impl From<(Bar, String)> for crate::types::Bar { +impl From<(Bar, String)> for types::Bar { fn from((bar, symbol): (Bar, String)) -> Self { Self { time: bar.time, @@ -41,6 +42,6 @@ impl From<(Bar, String)> for crate::types::Bar { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Message { - pub bars: HashMap>>, + pub bars: HashMap>, pub next_page_token: Option, } diff --git a/src/types/alpaca/api/incoming/mod.rs b/src/types/alpaca/api/incoming/mod.rs index adac42e..30dd91b 100644 --- a/src/types/alpaca/api/incoming/mod.rs +++ b/src/types/alpaca/api/incoming/mod.rs @@ -1,3 +1,4 @@ pub mod asset; pub mod bar; pub mod clock; +pub mod news; diff --git a/src/types/alpaca/api/incoming/news.rs b/src/types/alpaca/api/incoming/news.rs new file mode 100644 index 0000000..ef45f70 --- /dev/null +++ b/src/types/alpaca/api/incoming/news.rs @@ -0,0 +1,64 @@ +use crate::types; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ImageSize { + Thumb, + Small, + Large, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Image { + pub size: ImageSize, + pub url: String, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde_as] +pub struct News { + pub id: i64, + #[serde(with = "time::serde::rfc3339")] + #[serde(rename = "created_at")] + pub time_created: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + #[serde(rename = "updated_at")] + pub time_updated: OffsetDateTime, + pub symbols: Vec, + pub headline: String, + pub author: String, + #[serde_as(as = "NoneAsEmptyString")] + pub source: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub summary: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub content: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub url: Option, + pub images: Vec, +} + +impl From for types::News { + fn from(news: News) -> Self { + Self { + id: news.id, + time_created: news.time_created, + time_updated: news.time_updated, + symbols: news.symbols, + headline: news.headline, + author: news.author, + source: news.source, + summary: news.summary, + url: news.url, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Message { + pub news: Vec, + pub next_page_token: Option, +} diff --git a/src/types/alpaca/api/outgoing/bar.rs b/src/types/alpaca/api/outgoing/bar.rs index 124ef57..48b758f 100644 --- a/src/types/alpaca/api/outgoing/bar.rs +++ b/src/types/alpaca/api/outgoing/bar.rs @@ -1,15 +1,8 @@ -use serde::{Serialize, Serializer}; +use super::serialize_symbols; +use serde::Serialize; use std::time::Duration; use time::OffsetDateTime; -fn serialize_symbols(symbols: &[String], serializer: S) -> Result -where - S: Serializer, -{ - let string = symbols.join(","); - serializer.serialize_str(&string) -} - fn serialize_timeframe(timeframe: &Duration, serializer: S) -> Result where S: serde::Serializer, diff --git a/src/types/alpaca/api/outgoing/mod.rs b/src/types/alpaca/api/outgoing/mod.rs index 46f285c..6293fb8 100644 --- a/src/types/alpaca/api/outgoing/mod.rs +++ b/src/types/alpaca/api/outgoing/mod.rs @@ -1 +1,12 @@ pub mod bar; +pub mod news; + +use serde::Serializer; + +fn serialize_symbols(symbols: &[String], serializer: S) -> Result +where + S: Serializer, +{ + let string = symbols.join(","); + serializer.serialize_str(&string) +} diff --git a/src/types/alpaca/api/outgoing/news.rs b/src/types/alpaca/api/outgoing/news.rs new file mode 100644 index 0000000..8bc64c2 --- /dev/null +++ b/src/types/alpaca/api/outgoing/news.rs @@ -0,0 +1,40 @@ +use super::serialize_symbols; +use serde::Serialize; +use time::OffsetDateTime; + +#[derive(Serialize)] +pub struct News { + #[serde(serialize_with = "serialize_symbols")] + pub symbols: Vec, + #[serde(with = "time::serde::rfc3339")] + pub start: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub end: OffsetDateTime, + pub limit: i64, + pub include_content: bool, + pub exclude_contentless: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub page_token: Option, +} + +impl News { + pub const fn new( + symbols: Vec, + start: OffsetDateTime, + end: OffsetDateTime, + limit: i64, + include_content: bool, + exclude_contentless: bool, + page_token: Option, + ) -> Self { + Self { + symbols, + start, + end, + limit, + include_content, + exclude_contentless, + page_token, + } + } +} diff --git a/src/types/alpaca/websocket/data/mod.rs b/src/types/alpaca/websocket/data/mod.rs deleted file mode 100644 index 9aac270..0000000 --- a/src/types/alpaca/websocket/data/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod incoming; -pub mod outgoing; diff --git a/src/types/alpaca/websocket/data/outgoing/subscribe.rs b/src/types/alpaca/websocket/data/outgoing/subscribe.rs deleted file mode 100644 index afea0af..0000000 --- a/src/types/alpaca/websocket/data/outgoing/subscribe.rs +++ /dev/null @@ -1,17 +0,0 @@ -use serde::Serialize; - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct Message { - bars: Vec, - updated_bars: Vec, -} - -impl Message { - pub fn new(symbols: Vec) -> Self { - Self { - bars: symbols.clone(), - updated_bars: symbols, - } - } -} diff --git a/src/types/alpaca/websocket/data/incoming/bar.rs b/src/types/alpaca/websocket/incoming/bar.rs similarity index 94% rename from src/types/alpaca/websocket/data/incoming/bar.rs rename to src/types/alpaca/websocket/incoming/bar.rs index 335dc59..7a4d986 100644 --- a/src/types/alpaca/websocket/data/incoming/bar.rs +++ b/src/types/alpaca/websocket/incoming/bar.rs @@ -1,3 +1,4 @@ +use crate::types; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -24,7 +25,7 @@ pub struct Message { pub vwap: f64, } -impl From for crate::types::Bar { +impl From for types::Bar { fn from(bar: Message) -> Self { Self { time: bar.time, diff --git a/src/types/alpaca/websocket/incoming/error.rs b/src/types/alpaca/websocket/incoming/error.rs new file mode 100644 index 0000000..714ba8e --- /dev/null +++ b/src/types/alpaca/websocket/incoming/error.rs @@ -0,0 +1,9 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Message { + pub code: u16, + #[serde(rename = "msg")] + pub message: String, +} diff --git a/src/types/alpaca/websocket/data/incoming/mod.rs b/src/types/alpaca/websocket/incoming/mod.rs similarity index 66% rename from src/types/alpaca/websocket/data/incoming/mod.rs rename to src/types/alpaca/websocket/incoming/mod.rs index 1a2f413..7309e2c 100644 --- a/src/types/alpaca/websocket/data/incoming/mod.rs +++ b/src/types/alpaca/websocket/incoming/mod.rs @@ -1,4 +1,6 @@ pub mod bar; +pub mod error; +pub mod news; pub mod subscription; pub mod success; @@ -12,7 +14,11 @@ pub enum Message { #[serde(rename = "subscription")] Subscription(subscription::Message), #[serde(rename = "b")] - Bars(bar::Message), + Bar(bar::Message), #[serde(rename = "u")] - UpdatedBars(bar::Message), + UpdatedBar(bar::Message), + #[serde(rename = "n")] + News(news::Message), + #[serde(rename = "error")] + Error(error::Message), } diff --git a/src/types/alpaca/websocket/incoming/news.rs b/src/types/alpaca/websocket/incoming/news.rs new file mode 100644 index 0000000..f9565fd --- /dev/null +++ b/src/types/alpaca/websocket/incoming/news.rs @@ -0,0 +1,43 @@ +use crate::types; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde_as] +pub struct Message { + pub id: i64, + #[serde(with = "time::serde::rfc3339")] + #[serde(rename = "created_at")] + pub time_created: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + #[serde(rename = "updated_at")] + pub time_updated: OffsetDateTime, + pub symbols: Vec, + pub headline: String, + pub author: String, + #[serde_as(as = "NoneAsEmptyString")] + pub source: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub summary: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub content: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub url: Option, +} + +impl From for types::News { + fn from(news: Message) -> Self { + Self { + id: news.id, + time_created: news.time_created, + time_updated: news.time_updated, + symbols: news.symbols, + headline: news.headline, + author: news.author, + source: news.source, + summary: news.summary, + url: news.url, + } + } +} diff --git a/src/types/alpaca/websocket/data/incoming/subscription.rs b/src/types/alpaca/websocket/incoming/subscription.rs similarity index 57% rename from src/types/alpaca/websocket/data/incoming/subscription.rs rename to src/types/alpaca/websocket/incoming/subscription.rs index 8531f88..92b5d91 100644 --- a/src/types/alpaca/websocket/data/incoming/subscription.rs +++ b/src/types/alpaca/websocket/incoming/subscription.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct Message { +pub struct MarketMessage { pub trades: Vec, pub quotes: Vec, pub bars: Vec, @@ -13,3 +13,16 @@ pub struct Message { pub lulds: Option>, pub cancel_errors: Option>, } + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NewsMessage { + pub news: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum Message { + Market(MarketMessage), + News(NewsMessage), +} diff --git a/src/types/alpaca/websocket/data/incoming/success.rs b/src/types/alpaca/websocket/incoming/success.rs similarity index 100% rename from src/types/alpaca/websocket/data/incoming/success.rs rename to src/types/alpaca/websocket/incoming/success.rs diff --git a/src/types/alpaca/websocket/mod.rs b/src/types/alpaca/websocket/mod.rs index 7a345e4..9aac270 100644 --- a/src/types/alpaca/websocket/mod.rs +++ b/src/types/alpaca/websocket/mod.rs @@ -1 +1,2 @@ -pub mod data; +pub mod incoming; +pub mod outgoing; diff --git a/src/types/alpaca/websocket/data/outgoing/auth.rs b/src/types/alpaca/websocket/outgoing/auth.rs similarity index 100% rename from src/types/alpaca/websocket/data/outgoing/auth.rs rename to src/types/alpaca/websocket/outgoing/auth.rs diff --git a/src/types/alpaca/websocket/data/outgoing/mod.rs b/src/types/alpaca/websocket/outgoing/mod.rs similarity index 100% rename from src/types/alpaca/websocket/data/outgoing/mod.rs rename to src/types/alpaca/websocket/outgoing/mod.rs diff --git a/src/types/alpaca/websocket/outgoing/subscribe.rs b/src/types/alpaca/websocket/outgoing/subscribe.rs new file mode 100644 index 0000000..311d888 --- /dev/null +++ b/src/types/alpaca/websocket/outgoing/subscribe.rs @@ -0,0 +1,36 @@ +use serde::Serialize; + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct MarketMessage { + bars: Vec, + updated_bars: Vec, +} + +impl MarketMessage { + pub fn new(symbols: Vec) -> Self { + Self { + bars: symbols.clone(), + updated_bars: symbols, + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NewsMessage { + news: Vec, +} + +impl NewsMessage { + pub fn new(symbols: Vec) -> Self { + Self { news: symbols } + } +} + +#[derive(Serialize)] +#[serde(untagged)] +pub enum Message { + Market(MarketMessage), + News(NewsMessage), +} diff --git a/src/types/asset.rs b/src/types/asset.rs index 50f95f9..13809b3 100644 --- a/src/types/asset.rs +++ b/src/types/asset.rs @@ -1,4 +1,3 @@ -use crate::config::{ALPACA_CRYPTO_DATA_URL, ALPACA_STOCK_DATA_URL}; use clickhouse::Row; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; @@ -11,15 +10,6 @@ pub enum Class { Crypto = 2, } -impl Class { - pub const fn get_data_url(self) -> &'static str { - match self { - Self::UsEquity => ALPACA_STOCK_DATA_URL, - Self::Crypto => ALPACA_CRYPTO_DATA_URL, - } - } -} - #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr)] #[repr(u8)] pub enum Exchange { @@ -36,6 +26,7 @@ pub enum Exchange { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Row)] pub struct Asset { pub symbol: String, + pub abbreviation: String, pub class: Class, pub exchange: Exchange, #[serde(with = "clickhouse::serde::time::datetime")] diff --git a/src/types/backfill.rs b/src/types/backfill.rs index 89e233a..e940d76 100644 --- a/src/types/backfill.rs +++ b/src/types/backfill.rs @@ -1,4 +1,4 @@ -use super::Bar; +use super::{Bar, News}; use clickhouse::Row; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -21,3 +21,9 @@ impl From for Backfill { Self::new(bar.symbol, bar.time) } } + +impl From<(News, String)> for Backfill { + fn from((news, symbol): (News, String)) -> Self { + Self::new(symbol, news.time_created) + } +} diff --git a/src/types/mod.rs b/src/types/mod.rs index 45494c5..b19bfab 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,10 +1,12 @@ +pub mod algebraic; pub mod alpaca; pub mod asset; pub mod backfill; pub mod bar; -pub mod state; +pub mod news; +pub use algebraic::Subset; pub use asset::{Asset, Class, Exchange}; pub use backfill::Backfill; pub use bar::Bar; -pub use state::BroadcastMessage; +pub use news::News; diff --git a/src/types/news.rs b/src/types/news.rs new file mode 100644 index 0000000..9e97cd1 --- /dev/null +++ b/src/types/news.rs @@ -0,0 +1,23 @@ +use clickhouse::Row; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Row)] +#[serde_as] +pub struct News { + pub id: i64, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_created: OffsetDateTime, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_updated: OffsetDateTime, + pub symbols: Vec, + pub headline: String, + pub author: String, + #[serde_as(as = "NoneAsEmptyString")] + pub source: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub summary: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub url: Option, +} diff --git a/src/types/state/asset.rs b/src/types/state/asset.rs deleted file mode 100644 index 5bf5370..0000000 --- a/src/types/state/asset.rs +++ /dev/null @@ -1,7 +0,0 @@ -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum BroadcastMessage { - Add, - Backfill, - Delete, - Purge, -} diff --git a/src/types/state/clock.rs b/src/types/state/clock.rs deleted file mode 100644 index 5433a8b..0000000 --- a/src/types/state/clock.rs +++ /dev/null @@ -1,5 +0,0 @@ -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum BroadcastMessage { - Open, - Close, -} diff --git a/src/types/state/mod.rs b/src/types/state/mod.rs deleted file mode 100644 index c1f6023..0000000 --- a/src/types/state/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -use crate::types::Asset; - -pub mod asset; -pub mod clock; - -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum BroadcastMessage { - Asset((asset::BroadcastMessage, Vec)), - Clock(clock::BroadcastMessage), -} diff --git a/src/utils/cleanup.rs b/src/utils/cleanup.rs index 3e75110..05bb69f 100644 --- a/src/utils/cleanup.rs +++ b/src/utils/cleanup.rs @@ -1,13 +1,11 @@ use crate::database; use clickhouse::Client; +use tokio::join; pub async fn cleanup(clickhouse_client: &Client) { - let assets = database::assets::select(clickhouse_client).await; - let symbols = assets - .iter() - .map(|asset| asset.symbol.clone()) - .collect::>(); + let bars_future = database::bars::cleanup(clickhouse_client); + let news_future = database::news::cleanup(clickhouse_client); + let backfills_future = database::backfills::cleanup(clickhouse_client); - database::bars::delete_where_not_symbols(clickhouse_client, &symbols).await; - database::backfills::delete_where_not_symbols(clickhouse_client, &symbols).await; + join!(bars_future, news_future, backfills_future); } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index df65ac1..7918a26 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,5 +1,7 @@ pub mod cleanup; pub mod time; +pub mod websocket; pub use cleanup::cleanup; pub use time::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}; +pub use websocket::authenticate; diff --git a/src/utils/websocket.rs b/src/utils/websocket.rs new file mode 100644 index 0000000..5aef8ee --- /dev/null +++ b/src/utils/websocket.rs @@ -0,0 +1,51 @@ +use crate::{config::Config, types::alpaca::websocket}; +use core::panic; +use futures_util::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use serde_json::{from_str, to_string}; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +pub async fn authenticate( + app_config: &Arc, + sender: &mut SplitSink>, Message>, + receiver: &mut SplitStream>>, +) { + match receiver.next().await.unwrap().unwrap() { + Message::Text(data) + if from_str::>(&data) + .unwrap() + .first() + == Some(&websocket::incoming::Message::Success( + websocket::incoming::success::Message::Connected, + )) => {} + _ => panic!("Failed to connect to Alpaca websocket."), + } + + sender + .send(Message::Text( + to_string(&websocket::outgoing::Message::Auth( + websocket::outgoing::auth::Message::new( + app_config.alpaca_api_key.clone(), + app_config.alpaca_api_secret.clone(), + ), + )) + .unwrap(), + )) + .await + .unwrap(); + + match receiver.next().await.unwrap().unwrap() { + Message::Text(data) + if from_str::>(&data) + .unwrap() + .first() + == Some(&websocket::incoming::Message::Success( + websocket::incoming::success::Message::Authenticated, + )) => {} + _ => panic!("Failed to authenticate with Alpaca websocket."), + }; +} diff --git a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql index a8522d7..f051581 100644 --- a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql +++ b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql @@ -1,5 +1,6 @@ CREATE TABLE IF NOT EXISTS qrust.assets ( - symbol String, + symbol LowCardinality(String), + abbreviation LowCardinality(String), class Enum('us_equity' = 1, 'crypto' = 2), exchange Enum( 'AMEX' = 1, @@ -11,13 +12,14 @@ CREATE TABLE IF NOT EXISTS qrust.assets ( 'OTC' = 7, 'CRYPTO' = 8 ), - time_added DateTime DEFAULT now() + time_added DateTime DEFAULT now(), + CONSTRAINT abbreviation ASSUME replace(symbol, '/', '') = abbreviation ) ENGINE = ReplacingMergeTree() PRIMARY KEY symbol; CREATE TABLE IF NOT EXISTS qrust.bars ( - symbol String, + symbol LowCardinality(String), time DateTime, open Float64, high Float64, @@ -31,8 +33,30 @@ ENGINE = ReplacingMergeTree() PRIMARY KEY (symbol, time) PARTITION BY toYYYYMM(time); -CREATE TABLE IF NOT EXISTS qrust.backfills ( - symbol String, +CREATE TABLE IF NOT EXISTS qrust.backfills_bars ( + symbol LowCardinality(String), + time DateTime +) +ENGINE = ReplacingMergeTree() +PRIMARY KEY symbol; + +CREATE TABLE IF NOT EXISTS qrust.news ( + id Int64, + time_created DateTime, + time_updated DateTime, + symbols Array(LowCardinality(String)), + headline String, + author String, + source Nullable(String), + summary Nullable(String), + url Nullable(String), +) +ENGINE = ReplacingMergeTree() +PARTITION BY toYYYYMM(time_created) +PRIMARY KEY id; + +CREATE TABLE IF NOT EXISTS qrust.backfills_news ( + symbol LowCardinality(String), time DateTime ) ENGINE = ReplacingMergeTree() diff --git a/support/clickhouse/users.d/max_partitions_per_insert_block.xml b/support/clickhouse/users.d/max_partitions_per_insert_block.xml new file mode 100644 index 0000000..776b5f8 --- /dev/null +++ b/support/clickhouse/users.d/max_partitions_per_insert_block.xml @@ -0,0 +1,7 @@ + + + + 1000 + + +