pub mod asset_status; pub mod backfill; pub mod websocket; use super::{clock, guard::Guard}; use crate::{ config::{ Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_NEWS_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL, }, types::{Class, Subset}, utils::authenticate, }; use futures_util::StreamExt; use std::sync::Arc; use tokio::{ join, select, spawn, sync::{mpsc, Mutex, RwLock}, }; use tokio_tungstenite::connect_async; #[derive(Clone, Copy, Debug)] pub enum ThreadType { Bars(Class), News, } pub async fn run( app_config: Arc, mut asset_receiver: mpsc::Receiver, mut clock_receiver: mpsc::Receiver, ) { let (bars_us_equity_asset_status_sender, bars_us_equity_backfill_sender) = init_thread(app_config.clone(), ThreadType::Bars(Class::UsEquity)).await; let (bars_crypto_asset_status_sender, bars_crypto_backfill_sender) = init_thread(app_config.clone(), ThreadType::Bars(Class::Crypto)).await; let (news_asset_status_sender, news_backfill_sender) = init_thread(app_config.clone(), ThreadType::News).await; loop { select! { Some(asset_message) = asset_receiver.recv() => { spawn(handle_asset_message( bars_us_equity_asset_status_sender.clone(), bars_crypto_asset_status_sender.clone(), news_asset_status_sender.clone(), asset_message, )); } Some(_) = clock_receiver.recv() => { spawn(handle_clock_message( bars_us_equity_backfill_sender.clone(), bars_crypto_backfill_sender.clone(), news_backfill_sender.clone(), )); } else => { panic!("Communication channel unexpectedly closed.") } } } } async fn init_thread( app_config: Arc, thread_type: ThreadType, ) -> ( mpsc::Sender, mpsc::Sender, ) { let guard = Arc::new(RwLock::new(Guard::new())); let websocket_url = match thread_type { ThreadType::Bars(Class::UsEquity) => format!( "{}/{}", ALPACA_STOCK_WEBSOCKET_URL, &app_config.alpaca_source ), ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_WEBSOCKET_URL.into(), ThreadType::News => ALPACA_NEWS_WEBSOCKET_URL.into(), }; let (websocket, _) = connect_async(websocket_url).await.unwrap(); let (mut websocket_sender, mut websocket_receiver) = websocket.split(); authenticate(&app_config, &mut websocket_sender, &mut websocket_receiver).await; let websocket_sender = Arc::new(Mutex::new(websocket_sender)); let (asset_status_sender, asset_status_receiver) = mpsc::channel(100); spawn(asset_status::run( app_config.clone(), thread_type, guard.clone(), asset_status_receiver, websocket_sender.clone(), )); let (backfill_sender, backfill_receiver) = mpsc::channel(100); spawn(backfill::run( app_config.clone(), thread_type, guard.clone(), backfill_receiver, )); spawn(websocket::run( app_config.clone(), thread_type, guard.clone(), websocket_sender, websocket_receiver, backfill_sender.clone(), )); (asset_status_sender, backfill_sender) } async fn handle_asset_message( bars_us_equity_asset_status_sender: mpsc::Sender, bars_crypto_asset_status_sender: mpsc::Sender, news_asset_status_sender: mpsc::Sender, asset_status_message: asset_status::Message, ) { let (us_equity_assets, crypto_assets): (Vec<_>, Vec<_>) = asset_status_message .assets .clone() .into_iter() .partition(|asset| asset.class == Class::UsEquity); let bars_us_equity_future = async { if !us_equity_assets.is_empty() { let (bars_us_equity_asset_status_message, bars_us_equity_asset_status_receiver) = asset_status::Message::new(asset_status_message.action.clone(), us_equity_assets); bars_us_equity_asset_status_sender .send(bars_us_equity_asset_status_message) .await .unwrap(); bars_us_equity_asset_status_receiver.await.unwrap(); } }; let bars_crypto_future = async { if !crypto_assets.is_empty() { let (crypto_asset_status_message, crypto_asset_status_receiver) = asset_status::Message::new(asset_status_message.action.clone(), crypto_assets); bars_crypto_asset_status_sender .send(crypto_asset_status_message) .await .unwrap(); crypto_asset_status_receiver.await.unwrap(); } }; let news_future = async { if !asset_status_message.assets.is_empty() { let (news_asset_status_message, news_asset_status_receiver) = asset_status::Message::new( asset_status_message.action.clone(), asset_status_message.assets, ); news_asset_status_sender .send(news_asset_status_message) .await .unwrap(); news_asset_status_receiver.await.unwrap(); } }; join!(bars_us_equity_future, bars_crypto_future, news_future); asset_status_message.response.send(()).unwrap(); } async fn handle_clock_message( bars_us_equity_backfill_sender: mpsc::Sender, bars_crypto_backfill_sender: mpsc::Sender, news_backfill_sender: mpsc::Sender, ) { let bars_us_equity_future = async { let (bars_us_equity_backfill_message, bars_us_equity_backfill_receiver) = backfill::Message::new(backfill::Action::Backfill, Subset::All); bars_us_equity_backfill_sender .send(bars_us_equity_backfill_message) .await .unwrap(); bars_us_equity_backfill_receiver.await.unwrap(); }; let bars_crypto_future = async { let (bars_crypto_backfill_message, bars_crypto_backfill_receiver) = backfill::Message::new(backfill::Action::Backfill, Subset::All); bars_crypto_backfill_sender .send(bars_crypto_backfill_message) .await .unwrap(); bars_crypto_backfill_receiver.await.unwrap(); }; let news_future = async { let (news_backfill_message, news_backfill_receiver) = backfill::Message::new(backfill::Action::Backfill, Subset::All); news_backfill_sender .send(news_backfill_message) .await .unwrap(); news_backfill_receiver.await.unwrap(); }; join!(bars_us_equity_future, bars_crypto_future, news_future); }