diff --git a/src/data/historical.rs b/src/data/historical.rs deleted file mode 100644 index 7a3c0c0..0000000 --- a/src/data/historical.rs +++ /dev/null @@ -1,147 +0,0 @@ -use crate::{ - config::{Config, ALPACA_CRYPTO_DATA_URL, ALPACA_STOCK_DATA_URL, ALPACA_TIMESTAMP_FORMAT}, - database, - time::{next_minute, ONE_MINUTE}, - types::{api::incoming, Asset, Bar, Class}, -}; -use http::StatusCode; -use indexmap::IndexMap; -use log::{error, info}; -use std::{collections::HashMap, sync::Arc}; -use time::OffsetDateTime; -use tokio::{sync::RwLock, task::spawn_blocking, time::sleep}; - -pub async fn backfill( - app_config: Arc, - asset: Asset, - backfilled: Arc>>, -) { - info!("Backfilling historical data for {}...", asset.symbol); - - let task_run_offsetdatetime = next_minute() + app_config.alpaca_historical_offset; - let fetch_from = asset.timestamp_last + ONE_MINUTE; - let fetch_until = task_run_offsetdatetime - app_config.alpaca_historical_offset - ONE_MINUTE; - if fetch_from > fetch_until { - return; - } - - let mut current_time = fetch_from; - let asset_clone = asset.clone(); - - let mut bars = spawn_blocking(move || { - let mut bars = IndexMap::new(); - while current_time <= fetch_until { - bars.insert( - current_time, - Bar::empty(current_time, asset_clone.symbol.clone()), - ); - current_time += ONE_MINUTE; - } - bars - }) - .await - .unwrap(); - - let wait_duration = task_run_offsetdatetime - OffsetDateTime::now_utc(); - if wait_duration.is_positive() { - sleep(wait_duration.unsigned_abs()).await; - } - - let mut next_page_token = None; - loop { - let request = app_config - .alpaca_client - .get(match asset.class { - Class::UsEquity => ALPACA_STOCK_DATA_URL, - Class::Crypto => ALPACA_CRYPTO_DATA_URL, - }) - .query(&[ - ("symbols", &asset.symbol), - ("timeframe", &String::from("1Min")), - ( - "start", - &fetch_from - .format(ALPACA_TIMESTAMP_FORMAT) - .unwrap() - .to_string(), - ), - ( - "end", - &fetch_until - .format(ALPACA_TIMESTAMP_FORMAT) - .unwrap() - .to_string(), - ), - ("limit", &String::from("10000")), - ("page_token", &next_page_token.clone().unwrap_or_default()), - ]); - - app_config.alpaca_rate_limit.until_ready().await; - let response = request.send().await.unwrap(); - let mut response = if response.status() == StatusCode::OK { - response.json::().await.unwrap() - } else { - error!( - "Failed to backfill historical data for {} from {} to {}: {}", - asset.symbol, - fetch_from, - fetch_until, - response.text().await.unwrap() - ); - break; - }; - - for bar in response - .bars - .remove(&asset.symbol) - .unwrap_or_default() - .unwrap_or_default() - { - bars.insert(bar.timestamp, Bar::from((bar, asset.symbol.clone()))); - } - - if response.next_page_token.is_none() { - break; - } - next_page_token = response.next_page_token; - } - - let bars = bars.into_values().collect::>(); - - let transaction = app_config.postgres_pool.begin().await.unwrap(); - database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await; - database::assets::update_timestamp_last_where_symbol( - &app_config.postgres_pool, - &asset.symbol, - &fetch_until, - ) - .await; - backfill_recent_nulls(&app_config, &asset, &fetch_until, &backfilled).await; - transaction.commit().await.unwrap(); - - info!("Backfilled historical data for {}.", asset.symbol); -} - -#[allow(clippy::significant_drop_tightening)] -async fn backfill_recent_nulls( - app_config: &Arc, - asset: &Asset, - from: &OffsetDateTime, - backfilled: &Arc>>, -) { - let mut backfilled = backfilled.write().await; - let bars = database::bars::select_where_symbol_where_timestamp_larger_than( - &app_config.postgres_pool, - &asset.symbol, - from, - ) - .await; - database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await; - database::assets::update_timestamp_last_where_symbol( - &app_config.postgres_pool, - &asset.symbol, - &bars.last().unwrap().timestamp, - ) - .await; - backfilled.insert(asset.symbol.clone(), true); -} diff --git a/src/data/live.rs b/src/data/market.rs similarity index 52% rename from src/data/live.rs rename to src/data/market.rs index 6d12650..745c6e1 100644 --- a/src/data/live.rs +++ b/src/data/market.rs @@ -1,12 +1,14 @@ use crate::{ - config::{Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL}, - data::historical::backfill, + config::{ + Config, ALPACA_CRYPTO_DATA_URL, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_DATA_URL, + ALPACA_STOCK_WEBSOCKET_URL, ALPACA_TIMESTAMP_FORMAT, + }, database, - time::{duration_until, last_minute, next_30s, ONE_MINUTE, THIRTY_SECONDS}, + time::{duration_until, last_minute, next_30s, next_minute, ONE_MINUTE, THIRTY_SECONDS}, types::{ - asset, - websocket::{incoming, outgoing}, - Bar, BroadcastMessage, Class, + api, + asset::{self, Asset}, + websocket, Bar, BroadcastMessage, Class, }, }; use core::panic; @@ -14,6 +16,8 @@ use futures_util::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, }; +use http::StatusCode; +use indexmap::IndexMap; use log::{error, info, warn}; use serde_json::{from_str, to_string}; use std::{ @@ -21,6 +25,7 @@ use std::{ sync::Arc, time::Instant, }; +use time::OffsetDateTime; use tokio::{ net::TcpStream, spawn, @@ -28,7 +33,8 @@ use tokio::{ broadcast::{Receiver, Sender}, RwLock, }, - time::interval_at, + task::spawn_blocking, + time::{interval_at, sleep}, }; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; @@ -84,18 +90,24 @@ async fn authenticate_websocket( ) { match stream.next().await { Some(Ok(Message::Text(data))) - if from_str::>(&data).unwrap().get(0) - == Some(&incoming::Message::Success(incoming::success::Message { - msg: incoming::success::MessageType::Connected, - })) => {} + if from_str::>(&data) + .unwrap() + .get(0) + == Some(&websocket::incoming::Message::Success( + websocket::incoming::success::Message { + msg: websocket::incoming::success::MessageType::Connected, + }, + )) => {} _ => panic!(), } sink.send(Message::Text( - to_string(&outgoing::Message::Auth(outgoing::auth::Message::new( - app_config.alpaca_api_key.clone(), - app_config.alpaca_api_secret.clone(), - ))) + to_string(&websocket::outgoing::Message::Auth( + websocket::outgoing::auth::Message::new( + app_config.alpaca_api_key.clone(), + app_config.alpaca_api_secret.clone(), + ), + )) .unwrap(), )) .await @@ -103,10 +115,14 @@ async fn authenticate_websocket( match stream.next().await { Some(Ok(Message::Text(data))) - if from_str::>(&data).unwrap().get(0) - == Some(&incoming::Message::Success(incoming::success::Message { - msg: incoming::success::MessageType::Authenticated, - })) => {} + if from_str::>(&data) + .unwrap() + .get(0) + == Some(&websocket::incoming::Message::Success( + websocket::incoming::success::Message { + msg: websocket::incoming::success::MessageType::Authenticated, + }, + )) => {} _ => panic!(), }; } @@ -124,8 +140,8 @@ async fn websocket_broadcast_handler( sink.write() .await .send(Message::Text( - serde_json::to_string(&outgoing::Message::Subscribe( - outgoing::subscribe::Message::new(asset.clone().symbol), + serde_json::to_string(&websocket::outgoing::Message::Subscribe( + websocket::outgoing::subscribe::Message::new(asset.clone().symbol), )) .unwrap(), )) @@ -138,8 +154,8 @@ async fn websocket_broadcast_handler( sink.write() .await .send(Message::Text( - serde_json::to_string(&outgoing::Message::Unsubscribe( - outgoing::subscribe::Message::new(asset.clone().symbol), + serde_json::to_string(&websocket::outgoing::Message::Unsubscribe( + websocket::outgoing::subscribe::Message::new(asset.clone().symbol), )) .unwrap(), )) @@ -161,13 +177,13 @@ async fn websocket_message_handler( loop { match stream.next().await { Some(Ok(Message::Text(data))) => { - let parsed_data = from_str::>(&data); + let parsed_data = from_str::>(&data); if let Err(e) = &parsed_data { - warn!("Unparsed incoming message: {:?}: {}", data, e); + warn!("Unparsed websocket::incoming message: {:?}: {}", data, e); } for message in parsed_data.unwrap_or_default() { - handle_message(&app_config, class, message, &backfilled).await; + websocket_handle_text_message(&app_config, class, message, &backfilled).await; } } Some(Ok(Message::Ping(_))) => sink @@ -176,20 +192,20 @@ async fn websocket_message_handler( .send(Message::Pong(vec![])) .await .unwrap(), - Some(unknown) => error!("Unknown incoming message: {:?}", unknown), + Some(unknown) => error!("Unknown websocket::incoming message: {:?}", unknown), None => panic!(), } } } -async fn handle_message( +async fn websocket_handle_text_message( app_config: &Arc, class: Class, - message: incoming::Message, + message: websocket::incoming::Message, backfilled: &Arc>>, ) { match message { - incoming::Message::Subscription(subscription_message) => { + websocket::incoming::Message::Subscription(subscription_message) => { let old_assets = backfilled .read() .await @@ -227,9 +243,12 @@ async fn handle_message( class, added_assets, deleted_assets ); } - incoming::Message::Bars(bar_message) => { + websocket::incoming::Message::Bars(bar_message) => { let bar = Bar::from(bar_message); - info!("Incoming bar for {}: {}", bar.asset_symbol, bar.timestamp); + info!( + "websocket::Incoming bar for {}: {}", + bar.asset_symbol, bar.timestamp + ); database::bars::upsert( &app_config.postgres_pool, &bar, @@ -237,9 +256,12 @@ async fn handle_message( ) .await; } - incoming::Message::UpdatedBars(bar_message) => { + websocket::incoming::Message::UpdatedBars(bar_message) => { let bar = Bar::from(bar_message); - info!("Incoming bar for {}: {}", bar.asset_symbol, bar.timestamp); + info!( + "websocket::Incoming bar for {}: {}", + bar.asset_symbol, bar.timestamp + ); let transaction = app_config.postgres_pool.begin().await.unwrap(); let backfilled_asset_symbol = backfilled.read().await[&bar.asset_symbol]; @@ -254,18 +276,18 @@ async fn handle_message( } transaction.commit().await.unwrap(); } - incoming::Message::Success(_) => {} + websocket::incoming::Message::Success(_) => {} } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum NullHandlerState { - Bars, - UpdatedBars, -} - #[allow(clippy::significant_drop_in_scrutinee)] async fn null_handler(app_config: Arc, backfilled: Arc>>) { + #[derive(PartialEq)] + enum NullHandlerState { + Bars, + UpdatedBars, + } + let next_30s = next_30s(); let mut state = if next_30s.unix_timestamp() % 30 == 0 { NullHandlerState::Bars @@ -309,3 +331,146 @@ async fn null_handler(app_config: Arc, backfilled: Arc, + asset: Asset, + backfilled: Arc>>, +) { + info!("Backfilling historical data for {}...", asset.symbol); + + let task_run_offsetdatetime = next_minute() + app_config.alpaca_historical_offset; + let fetch_from = asset.timestamp_last + ONE_MINUTE; + let fetch_until = task_run_offsetdatetime - app_config.alpaca_historical_offset - ONE_MINUTE; + if fetch_from > fetch_until { + return; + } + + let wait_duration = task_run_offsetdatetime - OffsetDateTime::now_utc(); + if wait_duration.is_positive() { + sleep(wait_duration.unsigned_abs()).await; + } + + let bars = backfill_bars_from_api(&app_config, &asset, fetch_from, fetch_until).await; + + let transaction = app_config.postgres_pool.begin().await.unwrap(); + database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await; + database::assets::update_timestamp_last_where_symbol( + &app_config.postgres_pool, + &asset.symbol, + &fetch_until, + ) + .await; + derive_recent_nulls(&app_config, &asset, &fetch_until, &backfilled).await; + transaction.commit().await.unwrap(); + + info!("Backfilled historical data for {}.", asset.symbol); +} + +fn generate_per_minute_bars( + from: OffsetDateTime, + until: OffsetDateTime, + asset: &Asset, +) -> IndexMap { + let mut bars = IndexMap::new(); + let mut current_time = from; + while current_time <= until { + bars.insert(current_time, Bar::empty(current_time, asset.symbol.clone())); + current_time += ONE_MINUTE; + } + bars +} + +async fn backfill_bars_from_api( + app_config: &Arc, + asset: &Asset, + from: OffsetDateTime, + until: OffsetDateTime, +) -> Vec { + let asset_clone = asset.clone(); + let mut bars = spawn_blocking(move || generate_per_minute_bars(from, until, &asset_clone)) + .await + .unwrap(); + + let mut next_page_token = None; + loop { + let request = app_config + .alpaca_client + .get(match asset.class { + Class::UsEquity => ALPACA_STOCK_DATA_URL, + Class::Crypto => ALPACA_CRYPTO_DATA_URL, + }) + .query(&[ + ("symbols", &asset.symbol), + ("timeframe", &String::from("1Min")), + ( + "start", + &from.format(ALPACA_TIMESTAMP_FORMAT).unwrap().to_string(), + ), + ( + "end", + &until.format(ALPACA_TIMESTAMP_FORMAT).unwrap().to_string(), + ), + ("limit", &String::from("10000")), + ("page_token", &next_page_token.clone().unwrap_or_default()), + ]); + + app_config.alpaca_rate_limit.until_ready().await; + let response = request.send().await.unwrap(); + let mut response = if response.status() == StatusCode::OK { + response + .json::() + .await + .unwrap() + } else { + error!( + "Failed to backfill historical data for {} from {} to {}: {}", + asset.symbol, + from, + until, + response.text().await.unwrap() + ); + break; + }; + + for bar in response + .bars + .remove(&asset.symbol) + .unwrap_or_default() + .unwrap_or_default() + { + bars.insert(bar.timestamp, Bar::from((bar, asset.symbol.clone()))); + } + + if response.next_page_token.is_none() { + break; + } + next_page_token = response.next_page_token; + } + + bars.into_values().collect::>() +} + +#[allow(clippy::significant_drop_tightening)] +async fn derive_recent_nulls( + app_config: &Arc, + asset: &Asset, + from: &OffsetDateTime, + backfilled: &Arc>>, +) { + let mut backfilled = backfilled.write().await; + let bars = database::bars::select_where_symbol_where_timestamp_larger_than( + &app_config.postgres_pool, + &asset.symbol, + from, + ) + .await; + database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await; + database::assets::update_timestamp_last_where_symbol( + &app_config.postgres_pool, + &asset.symbol, + &bars.last().unwrap().timestamp, + ) + .await; + backfilled.insert(asset.symbol.clone(), true); +} diff --git a/src/data/mod.rs b/src/data/mod.rs index 5c9c723..60e5a3f 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1,2 +1 @@ -pub mod historical; -pub mod live; +pub mod market; diff --git a/src/main.rs b/src/main.rs index c5c205b..8dae5ab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,13 +24,13 @@ async fn main() -> Result<(), BoxDynError> { let (asset_broadcast_sender, _) = broadcast::channel::(100); - threads.push(spawn(data::live::run( + threads.push(spawn(data::market::run( app_config.clone(), Class::UsEquity, asset_broadcast_sender.clone(), ))); - threads.push(spawn(data::live::run( + threads.push(spawn(data::market::run( app_config.clone(), Class::Crypto, asset_broadcast_sender.clone(), diff --git a/src/types/api/incoming/bar.rs b/src/types/api/incoming/bar.rs index 3d1508d..4b23c47 100644 --- a/src/types/api/incoming/bar.rs +++ b/src/types/api/incoming/bar.rs @@ -2,7 +2,7 @@ use serde::Deserialize; use std::collections::HashMap; use time::OffsetDateTime; -#[derive(Debug, PartialEq, Deserialize)] +#[derive(Deserialize)] pub struct Bar { #[serde(rename = "t")] #[serde(with = "time::serde::rfc3339")] @@ -23,7 +23,7 @@ pub struct Bar { pub volume_weighted: f64, } -#[derive(Debug, PartialEq, Deserialize)] +#[derive(Deserialize)] pub struct Message { pub bars: HashMap>>, pub next_page_token: Option, diff --git a/src/types/api/incoming/calendar_date.rs b/src/types/api/incoming/calendar_date.rs index b92213b..f399d56 100644 --- a/src/types/api/incoming/calendar_date.rs +++ b/src/types/api/incoming/calendar_date.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Deserializer}; use time::{macros::format_description, Date, Time}; -#[derive(Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize)] pub struct CalendarDate { #[serde(deserialize_with = "deserialize_date")] pub date: Date, diff --git a/src/types/asset.rs b/src/types/asset.rs index cb0289b..02ccfb1 100644 --- a/src/types/asset.rs +++ b/src/types/asset.rs @@ -1,9 +1,9 @@ use super::{api::incoming, class::Class, exchange::Exchange}; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use sqlx::FromRow; use time::OffsetDateTime; -#[derive(Clone, Debug, PartialEq, Eq, FromRow, Serialize, Deserialize, Hash)] +#[derive(Clone, Debug, FromRow, Serialize)] pub struct Asset { pub symbol: String, pub class: Class, @@ -28,10 +28,9 @@ impl From<(incoming::Asset, bool, OffsetDateTime)> for Asset { } } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug)] pub enum BroadcastMessage { Added(Asset), Updated(Asset), Deleted(Asset), - Reset(Asset), } diff --git a/src/types/bar.rs b/src/types/bar.rs index 8ec7223..15f7f60 100644 --- a/src/types/bar.rs +++ b/src/types/bar.rs @@ -1,9 +1,9 @@ use super::{api, websocket}; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use sqlx::FromRow; use time::OffsetDateTime; -#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] +#[derive(Clone, Debug, FromRow, Serialize)] pub struct Bar { pub timestamp: OffsetDateTime, pub asset_symbol: String, diff --git a/src/types/class.rs b/src/types/class.rs index f6fbc0a..9418d29 100644 --- a/src/types/class.rs +++ b/src/types/class.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use sqlx::Type; -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type)] pub enum Class { #[sqlx(rename = "us_equity")] #[serde(rename = "us_equity")] diff --git a/src/types/exchange.rs b/src/types/exchange.rs index 27933fe..c7eb10b 100644 --- a/src/types/exchange.rs +++ b/src/types/exchange.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use sqlx::Type; -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type, Hash)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Type)] pub enum Exchange { #[sqlx(rename = "AMEX")] #[serde(rename = "AMEX")] diff --git a/src/types/mod.rs b/src/types/mod.rs index cbdee21..d04d3ee 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -14,9 +14,7 @@ pub use exchange::Exchange; pub use source::Source; pub use status::Status; -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug)] pub enum BroadcastMessage { Asset(asset::BroadcastMessage), } diff --git a/src/types/source.rs b/src/types/source.rs index f734d3e..cf56619 100644 --- a/src/types/source.rs +++ b/src/types/source.rs @@ -3,7 +3,7 @@ use std::{ str::FromStr, }; -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug)] pub enum Source { Iex, Sip, diff --git a/src/types/status.rs b/src/types/status.rs index 7b1374f..906f670 100644 --- a/src/types/status.rs +++ b/src/types/status.rs @@ -1,7 +1,7 @@ -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use sqlx::Type; -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type)] +#[derive(PartialEq, Eq, Deserialize, Type)] pub enum Status { #[sqlx(rename = "active")] #[serde(rename = "active")] diff --git a/src/types/websocket/incoming/bar.rs b/src/types/websocket/incoming/bar.rs index 6ef71b8..5db1c39 100644 --- a/src/types/websocket/incoming/bar.rs +++ b/src/types/websocket/incoming/bar.rs @@ -1,7 +1,7 @@ use serde::Deserialize; use time::OffsetDateTime; -#[derive(Debug, PartialEq, Deserialize)] +#[derive(PartialEq, Deserialize)] pub struct Message { #[serde(rename = "t")] #[serde(with = "time::serde::rfc3339")] diff --git a/src/types/websocket/incoming/mod.rs b/src/types/websocket/incoming/mod.rs index 49496d1..18ec825 100644 --- a/src/types/websocket/incoming/mod.rs +++ b/src/types/websocket/incoming/mod.rs @@ -4,7 +4,7 @@ pub mod success; use serde::Deserialize; -#[derive(Debug, Deserialize, PartialEq)] +#[derive(PartialEq, Deserialize)] #[serde(tag = "T")] pub enum Message { #[serde(rename = "success")] diff --git a/src/types/websocket/incoming/subscription.rs b/src/types/websocket/incoming/subscription.rs index 4644c3b..731f7ab 100644 --- a/src/types/websocket/incoming/subscription.rs +++ b/src/types/websocket/incoming/subscription.rs @@ -1,6 +1,6 @@ use serde::Deserialize; -#[derive(Debug, PartialEq, Eq, Deserialize)] +#[derive(PartialEq, Eq, Deserialize)] pub struct Message { pub trades: Vec, pub quotes: Vec, diff --git a/src/types/websocket/incoming/success.rs b/src/types/websocket/incoming/success.rs index 2d29509..37ac155 100644 --- a/src/types/websocket/incoming/success.rs +++ b/src/types/websocket/incoming/success.rs @@ -1,6 +1,6 @@ use serde::Deserialize; -#[derive(Debug, PartialEq, Eq, Deserialize)] +#[derive(PartialEq, Eq, Deserialize)] pub enum MessageType { #[serde(rename = "connected")] Connected, @@ -8,7 +8,7 @@ pub enum MessageType { Authenticated, } -#[derive(Debug, PartialEq, Eq, Deserialize)] +#[derive(PartialEq, Eq, Deserialize)] pub struct Message { pub msg: MessageType, } diff --git a/src/types/websocket/outgoing/auth.rs b/src/types/websocket/outgoing/auth.rs index 98a414b..a8933e5 100644 --- a/src/types/websocket/outgoing/auth.rs +++ b/src/types/websocket/outgoing/auth.rs @@ -1,6 +1,6 @@ use serde::Serialize; -#[derive(Debug, Serialize)] +#[derive(Serialize)] pub struct Message { key: String, secret: String, diff --git a/src/types/websocket/outgoing/mod.rs b/src/types/websocket/outgoing/mod.rs index 049333d..4bf2d01 100644 --- a/src/types/websocket/outgoing/mod.rs +++ b/src/types/websocket/outgoing/mod.rs @@ -3,7 +3,7 @@ pub mod subscribe; use serde::Serialize; -#[derive(Debug, Serialize)] +#[derive(Serialize)] #[serde(tag = "action")] pub enum Message { #[serde(rename = "auth")] diff --git a/src/types/websocket/outgoing/subscribe.rs b/src/types/websocket/outgoing/subscribe.rs index 3e1e66b..f0ab6c7 100644 --- a/src/types/websocket/outgoing/subscribe.rs +++ b/src/types/websocket/outgoing/subscribe.rs @@ -1,6 +1,6 @@ use serde::Serialize; -#[derive(Debug, Serialize)] +#[derive(Serialize)] pub struct Message { bars: Vec, #[serde(rename = "updatedBars")]