mod backfill; mod websocket; use super::clock; use crate::{ config::{ Config, ALPACA_CRYPTO_DATA_WEBSOCKET_URL, ALPACA_NEWS_DATA_WEBSOCKET_URL, ALPACA_SOURCE, ALPACA_STOCK_DATA_WEBSOCKET_URL, }, create_send_await, database, types::{alpaca, Asset, Class}, utils::backoff, }; use futures_util::{future::join_all, StreamExt}; use itertools::{Either, Itertools}; use std::sync::Arc; use tokio::{ join, select, spawn, sync::{mpsc, oneshot}, }; use tokio_tungstenite::connect_async; #[derive(Clone, Copy)] #[allow(dead_code)] pub enum Action { Add, Enable, Remove, Disable, } pub struct Message { pub action: Action, pub assets: Vec<(String, Class)>, pub response: oneshot::Sender<()>, } impl Message { pub fn new(action: Action, assets: Vec<(String, Class)>) -> (Self, oneshot::Receiver<()>) { let (sender, receiver) = oneshot::channel(); ( Self { action, assets, response: sender, }, receiver, ) } } #[derive(Clone, Copy)] pub enum ThreadType { Bars(Class), News, } pub async fn run( config: Arc, mut receiver: mpsc::Receiver, mut clock_receiver: mpsc::Receiver, ) { let (bars_us_equity_websocket_sender, bars_us_equity_backfill_sender) = init_thread(config.clone(), ThreadType::Bars(Class::UsEquity)).await; let (bars_crypto_websocket_sender, bars_crypto_backfill_sender) = init_thread(config.clone(), ThreadType::Bars(Class::Crypto)).await; let (news_websocket_sender, news_backfill_sender) = init_thread(config.clone(), ThreadType::News).await; loop { select! { Some(message) = receiver.recv() => { spawn(handle_message( config.clone(), bars_us_equity_websocket_sender.clone(), bars_us_equity_backfill_sender.clone(), bars_crypto_websocket_sender.clone(), bars_crypto_backfill_sender.clone(), news_websocket_sender.clone(), news_backfill_sender.clone(), message, )); } Some(_) = clock_receiver.recv() => { spawn(handle_clock_message( config.clone(), bars_us_equity_backfill_sender.clone(), bars_crypto_backfill_sender.clone(), news_backfill_sender.clone(), )); } else => panic!("Communication channel unexpectedly closed.") } } } async fn init_thread( config: Arc, thread_type: ThreadType, ) -> ( mpsc::Sender, mpsc::Sender, ) { let websocket_url = match thread_type { ThreadType::Bars(Class::UsEquity) => { format!("{}/{}", ALPACA_STOCK_DATA_WEBSOCKET_URL, *ALPACA_SOURCE) } ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_DATA_WEBSOCKET_URL.into(), ThreadType::News => ALPACA_NEWS_DATA_WEBSOCKET_URL.into(), }; let (websocket, _) = connect_async(websocket_url).await.unwrap(); let (mut websocket_sink, mut websocket_stream) = websocket.split(); alpaca::websocket::data::authenticate(&mut websocket_sink, &mut websocket_stream).await; let (backfill_sender, backfill_receiver) = mpsc::channel(100); spawn(backfill::run( Arc::new(backfill::create_handler(thread_type, config.clone())), backfill_receiver, )); let (websocket_sender, websocket_receiver) = mpsc::channel(100); spawn(websocket::run( Arc::new(websocket::create_handler(thread_type, config.clone())), websocket_receiver, websocket_stream, websocket_sink, )); (websocket_sender, backfill_sender) } #[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_lines)] async fn handle_message( config: Arc, bars_us_equity_websocket_sender: mpsc::Sender, bars_us_equity_backfill_sender: mpsc::Sender, bars_crypto_websocket_sender: mpsc::Sender, bars_crypto_backfill_sender: mpsc::Sender, news_websocket_sender: mpsc::Sender, news_backfill_sender: mpsc::Sender, message: Message, ) { let (us_equity_symbols, crypto_symbols): (Vec<_>, Vec<_>) = message .assets .clone() .into_iter() .partition_map(|asset| match asset.1 { Class::UsEquity => Either::Left(asset.0), Class::Crypto => Either::Right(asset.0), }); let symbols = message .assets .into_iter() .map(|(symbol, _)| symbol) .collect::>(); let bars_us_equity_future = async { if us_equity_symbols.is_empty() { return; } create_send_await!( bars_us_equity_websocket_sender, websocket::Message::new, message.action.into(), us_equity_symbols.clone() ); create_send_await!( bars_us_equity_backfill_sender, backfill::Message::new, message.action.into(), us_equity_symbols ); }; let bars_crypto_future = async { if crypto_symbols.is_empty() { return; } create_send_await!( bars_crypto_websocket_sender, websocket::Message::new, message.action.into(), crypto_symbols.clone() ); create_send_await!( bars_crypto_backfill_sender, backfill::Message::new, message.action.into(), crypto_symbols ); }; let news_future = async { create_send_await!( news_websocket_sender, websocket::Message::new, message.action.into(), symbols.clone() ); create_send_await!( news_backfill_sender, backfill::Message::new, message.action.into(), symbols.clone() ); }; join!(bars_us_equity_future, bars_crypto_future, news_future); match message.action { Action::Add => { let assets = join_all(symbols.into_iter().map(|symbol| { let config = config.clone(); async move { let asset_future = async { alpaca::api::incoming::asset::get_by_symbol( &config.alpaca_client, &config.alpaca_rate_limiter, &symbol, Some(backoff::infinite()), ) .await .unwrap() }; let position_future = async { alpaca::api::incoming::position::get_by_symbol( &config.alpaca_rate_limiter, &config.alpaca_client, &symbol, Some(backoff::infinite()), ) .await .unwrap() }; let (asset, position) = join!(asset_future, position_future); Asset::from((asset, position)) } })) .await; database::assets::upsert_batch(&config.clickhouse_client, &assets) .await .unwrap(); } Action::Remove => { database::assets::delete_where_symbols(&config.clickhouse_client, &symbols) .await .unwrap(); } _ => {} } message.response.send(()).unwrap(); } async fn handle_clock_message( config: Arc, bars_us_equity_backfill_sender: mpsc::Sender, bars_crypto_backfill_sender: mpsc::Sender, news_backfill_sender: mpsc::Sender, ) { database::cleanup_all(&config.clickhouse_client) .await .unwrap(); let assets = database::assets::select(&config.clickhouse_client) .await .unwrap(); let (us_equity_symbols, crypto_symbols): (Vec<_>, Vec<_>) = assets .clone() .into_iter() .partition_map(|asset| match asset.class { Class::UsEquity => Either::Left(asset.symbol), Class::Crypto => Either::Right(asset.symbol), }); let symbols = assets .into_iter() .map(|asset| asset.symbol) .collect::>(); let bars_us_equity_future = async { create_send_await!( bars_us_equity_backfill_sender, backfill::Message::new, Some(backfill::Action::Backfill), us_equity_symbols.clone() ); }; let bars_crypto_future = async { create_send_await!( bars_crypto_backfill_sender, backfill::Message::new, Some(backfill::Action::Backfill), crypto_symbols.clone() ); }; let news_future = async { create_send_await!( news_backfill_sender, backfill::Message::new, Some(backfill::Action::Backfill), symbols ); }; join!(bars_us_equity_future, bars_crypto_future, news_future); }