diff --git a/backend/src/data/live.rs b/backend/src/data/live.rs index 429a8ac..313b8f1 100644 --- a/backend/src/data/live.rs +++ b/backend/src/data/live.rs @@ -68,7 +68,7 @@ pub async fn run_data_live( _ => panic!(), } - let symbols = get_assets_with_class(&app_config.postgres_pool, class.clone()) + let symbols = get_assets_with_class(&app_config.postgres_pool, &class) .await? .into_iter() .map(|asset| asset.symbol) @@ -88,17 +88,18 @@ pub async fn run_data_live( spawn(broadcast_handler( class, - asset_broadcast_receiver, sink.clone(), + asset_broadcast_receiver, )); - websocket_handler(app_config, sink, stream).await?; + websocket_handler(app_config, class, sink, stream).await?; unreachable!() } pub async fn websocket_handler( app_config: Arc, + class: Class, sink: Arc>, Message>>>, stream: Arc>>>>, ) -> Result<(), Box> { @@ -111,12 +112,15 @@ pub async fn websocket_handler( for message in parsed_data { match message { IncomingMessage::Subscription(subscription_message) => { - info!("Current subscriptions: {:?}", subscription_message.bars); + info!( + "Current {:?} subscriptions: {:?}", + class, subscription_message.bars + ); } IncomingMessage::Bars(bar_message) | IncomingMessage::UpdatedBars(bar_message) => { debug!("Incoming bar: {:?}", bar_message); - add_bar(&app_config.postgres_pool, Bar::from(bar_message)).await?; + add_bar(&app_config.postgres_pool, &Bar::from(bar_message)).await?; } message => { warn!("Unhandled incoming message: {:?}", message); @@ -141,8 +145,8 @@ pub async fn websocket_handler( pub async fn broadcast_handler( class: Class, - mut asset_broadcast_receiver: Receiver, sink: Arc>, Message>>>, + mut asset_broadcast_receiver: Receiver, ) -> Result<(), Box> { loop { match asset_broadcast_receiver.recv().await? { diff --git a/backend/src/database/assets.rs b/backend/src/database/assets.rs index ec51015..f7b3511 100644 --- a/backend/src/database/assets.rs +++ b/backend/src/database/assets.rs @@ -16,11 +16,11 @@ pub async fn get_assets( pub async fn get_assets_with_class( postgres_pool: &PgPool, - class: Class, + class: &Class, ) -> Result, Box> { query_as!( Asset, - r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = $1::CLASS"#, class as Class + r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = $1::CLASS"#, &class as &Class ) .fetch_all(postgres_pool) .await @@ -42,7 +42,7 @@ pub async fn get_asset( pub async fn add_asset( postgres_pool: &PgPool, - asset: Asset, + asset: &Asset, ) -> Result> { query_as!( Asset, @@ -58,7 +58,7 @@ pub async fn add_asset( pub async fn update_asset_trading( postgres_pool: &PgPool, symbol: &str, - trading: bool, + trading: &bool, ) -> Result, Box> { query_as!( Asset, diff --git a/backend/src/database/bars.rs b/backend/src/database/bars.rs index 19b88af..46784ef 100644 --- a/backend/src/database/bars.rs +++ b/backend/src/database/bars.rs @@ -4,7 +4,7 @@ use std::error::Error; pub async fn add_bar( postgres_pool: &PgPool, - bar: Bar, + bar: &Bar, ) -> Result> { query_as!( Bar, diff --git a/backend/src/main.rs b/backend/src/main.rs index d09d7f1..0a751d4 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box> { threads.push(spawn(run_api(app_config.clone(), asset_broadcast_sender))); for thread in threads { - let _ = thread.await?; + thread.await??; } Ok(()) diff --git a/backend/src/routes/assets.rs b/backend/src/routes/assets.rs index 76d34f1..1819a9f 100644 --- a/backend/src/routes/assets.rs +++ b/backend/src/routes/assets.rs @@ -83,7 +83,7 @@ pub async fn add_asset( asset.trading = trading; } - let asset = database::assets::add_asset(&app_config.postgres_pool, asset) + database::assets::add_asset(&app_config.postgres_pool, &asset) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -107,7 +107,7 @@ pub async fn update_asset( Path(symbol): Path, Json(request): Json, ) -> Result<(StatusCode, Json), StatusCode> { - let asset = update_asset_trading(&app_config.postgres_pool, &symbol, request.trading) + let asset = update_asset_trading(&app_config.postgres_pool, &symbol, &request.trading) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; diff --git a/backend/src/types/class.rs b/backend/src/types/class.rs index ecd398b..f835602 100644 --- a/backend/src/types/class.rs +++ b/backend/src/types/class.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use sqlx::Type; -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Type)] pub enum Class { #[sqlx(rename = "us_equity")] #[serde(rename = "us_equity")] diff --git a/backend/src/types/exchange.rs b/backend/src/types/exchange.rs index aaba7b2..1ac9d1b 100644 --- a/backend/src/types/exchange.rs +++ b/backend/src/types/exchange.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use sqlx::Type; -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Type)] pub enum Exchange { #[sqlx(rename = "AMEX")] #[serde(rename = "AMEX")] diff --git a/backend/src/types/status.rs b/backend/src/types/status.rs index e6f651e..a6c868b 100644 --- a/backend/src/types/status.rs +++ b/backend/src/types/status.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use sqlx::Type; -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)] +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Type)] pub enum Status { #[sqlx(rename = "active")] #[serde(rename = "active")] diff --git a/backend/src/types/websocket/incoming.rs b/backend/src/types/websocket/incoming.rs index 6f75a2e..98073d2 100644 --- a/backend/src/types/websocket/incoming.rs +++ b/backend/src/types/websocket/incoming.rs @@ -31,12 +31,16 @@ pub struct SuccessMessage { pub struct SubscriptionMessage { pub trades: Vec, pub quotes: Vec, - pub orderbooks: Vec, pub bars: Vec, #[serde(rename = "updatedBars")] pub updated_bars: Vec, #[serde(rename = "dailyBars")] pub daily_bars: Vec, + pub orderbooks: Option>, + pub statuses: Option>, + pub lulds: Option>, + #[serde(rename = "cancelErrors")] + pub cancel_errors: Option>, } #[derive(Debug, PartialEq, Deserialize)]