use crate::{ config::AppConfig, database::{assets::get_assets_with_class, bars::add_bar}, types::{ websocket::{ incoming::{IncomingMessage, SuccessMessage, SuccessMessageType}, outgoing::{AuthMessage, OutgoingMessage, SubscribeMessage}, }, AssetBroadcastMessage, Bar, Class, }, }; use core::panic; use futures_util::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, }; use log::{debug, error, info, warn}; use serde_json::{from_str, to_string}; use std::{error::Error, sync::Arc, time::Duration}; use tokio::{ net::TcpStream, spawn, sync::{broadcast::Receiver, RwLock}, time::timeout, }; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2/iex"; const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; const TIMEOUT_DURATION: Duration = Duration::from_millis(100); pub async fn run_data_live( class: Class, app_config: Arc, asset_broadcast_receiver: Receiver, ) -> Result<(), Box> { let websocket_url = match class { Class::UsEquity => ALPACA_STOCK_WEBSOCKET_URL, Class::Crypto => ALPACA_CRYPTO_WEBSOCKET_URL, }; let (stream, _) = connect_async(websocket_url).await?; let (mut sink, mut stream) = stream.split(); match stream.next().await { Some(Ok(Message::Text(data))) if from_str::>(&data)?.get(0) == Some(&IncomingMessage::Success(SuccessMessage { msg: SuccessMessageType::Connected, })) => {} _ => panic!(), } sink.send(Message::Text(to_string(&OutgoingMessage::Auth( AuthMessage::new( app_config.alpaca_api_key.clone(), app_config.alpaca_api_secret.clone(), ), ))?)) .await?; match stream.next().await { Some(Ok(Message::Text(data))) if from_str::>(&data)?.get(0) == Some(&IncomingMessage::Success(SuccessMessage { msg: SuccessMessageType::Authenticated, })) => {} _ => panic!(), } let symbols = get_assets_with_class(&app_config.postgres_pool, &class) .await? .into_iter() .map(|asset| asset.symbol) .collect::>(); if !symbols.is_empty() { sink.send(Message::Text(to_string(&OutgoingMessage::Subscribe( SubscribeMessage::from_vec(symbols), ))?)) .await?; } let sink = Arc::new(RwLock::new(sink)); let stream = Arc::new(RwLock::new(stream)); info!("Running live data thread for {:?}.", class); spawn(broadcast_handler( class, sink.clone(), asset_broadcast_receiver, )); websocket_handler(app_config, class, sink, stream).await?; unreachable!() } pub async fn websocket_handler( app_config: Arc, class: Class, sink: Arc>, Message>>>, stream: Arc>>>>, ) -> Result<(), Box> { loop { let mut stream = stream.write().await; match timeout(TIMEOUT_DURATION, stream.next()).await { Ok(Some(Ok(Message::Text(data)))) => match from_str::>(&data) { Ok(parsed_data) => { for message in parsed_data { match message { IncomingMessage::Subscription(subscription_message) => { info!( "Current {:?} subscriptions: {:?}", class, subscription_message.bars ); } IncomingMessage::Bars(bar_message) | IncomingMessage::UpdatedBars(bar_message) => { debug!("Incoming bar: {:?}", bar_message); add_bar(&app_config.postgres_pool, &Bar::from(bar_message)).await?; } message => { warn!("Unhandled incoming message: {:?}", message); } } } } Err(e) => { warn!("Unparsed incoming message: {:?}: {}", data, e); } }, Ok(Some(Ok(Message::Ping(_)))) => { sink.write().await.send(Message::Pong(vec![])).await? } Ok(unknown) => { error!("Unknown incoming message: {:?}", unknown); } Err(_) => {} } } } pub async fn broadcast_handler( class: Class, sink: Arc>, Message>>>, mut asset_broadcast_receiver: Receiver, ) -> Result<(), Box> { loop { match asset_broadcast_receiver.recv().await? { AssetBroadcastMessage::Added(asset) if asset.class == class => { sink.write() .await .send(Message::Text(serde_json::to_string( &OutgoingMessage::Subscribe(SubscribeMessage::new(asset.symbol)), )?)) .await?; } AssetBroadcastMessage::Deleted(asset) if asset.class == class => { sink.write() .await .send(Message::Text(serde_json::to_string( &OutgoingMessage::Unsubscribe(SubscribeMessage::new(asset.symbol)), )?)) .await?; } _ => {} } } }