diff --git a/Dockerfile b/Dockerfile index e516912..7680e7c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ RUN apk add --no-cache pkgconf musl-dev openssl-dev WORKDIR /usr/src/qrust -ENV SQLX_OFFLINE true +ENV SQLX_OFFLINE=true RUN mkdir src && echo "fn main() {}" > src/main.rs COPY Cargo.toml .sqlx ./ diff --git a/src/data/market.rs b/src/data/market.rs index 745c6e1..a7e3295 100644 --- a/src/data/market.rs +++ b/src/data/market.rs @@ -3,6 +3,7 @@ use crate::{ Config, ALPACA_CRYPTO_DATA_URL, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_DATA_URL, ALPACA_STOCK_WEBSOCKET_URL, ALPACA_TIMESTAMP_FORMAT, }, + data::authenticate_websocket, database, time::{duration_until, last_minute, next_30s, next_minute, ONE_MINUTE, THIRTY_SECONDS}, types::{ @@ -19,7 +20,7 @@ use futures_util::{ use http::StatusCode; use indexmap::IndexMap; use log::{error, info, warn}; -use serde_json::{from_str, to_string}; +use serde_json::from_str; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -83,50 +84,6 @@ pub async fn run( unreachable!() } -async fn authenticate_websocket( - app_config: &Arc, - stream: &mut SplitStream>>, - sink: &mut SplitSink>, Message>, -) { - match stream.next().await { - Some(Ok(Message::Text(data))) - if from_str::>(&data) - .unwrap() - .get(0) - == Some(&websocket::incoming::Message::Success( - websocket::incoming::success::Message { - msg: websocket::incoming::success::MessageType::Connected, - }, - )) => {} - _ => panic!(), - } - - sink.send(Message::Text( - to_string(&websocket::outgoing::Message::Auth( - websocket::outgoing::auth::Message::new( - app_config.alpaca_api_key.clone(), - app_config.alpaca_api_secret.clone(), - ), - )) - .unwrap(), - )) - .await - .unwrap(); - - match stream.next().await { - Some(Ok(Message::Text(data))) - if from_str::>(&data) - .unwrap() - .get(0) - == Some(&websocket::incoming::Message::Success( - websocket::incoming::success::Message { - msg: websocket::incoming::success::MessageType::Authenticated, - }, - )) => {} - _ => panic!(), - }; -} - async fn websocket_broadcast_handler( class: Class, sink: Arc>, Message>>>, @@ -140,8 +97,10 @@ async fn websocket_broadcast_handler( sink.write() .await .send(Message::Text( - serde_json::to_string(&websocket::outgoing::Message::Subscribe( - websocket::outgoing::subscribe::Message::new(asset.clone().symbol), + serde_json::to_string(&websocket::data::outgoing::Message::Subscribe( + websocket::data::outgoing::subscribe::Message::new( + asset.clone().symbol, + ), )) .unwrap(), )) @@ -154,8 +113,10 @@ async fn websocket_broadcast_handler( sink.write() .await .send(Message::Text( - serde_json::to_string(&websocket::outgoing::Message::Unsubscribe( - websocket::outgoing::subscribe::Message::new(asset.clone().symbol), + serde_json::to_string(&websocket::data::outgoing::Message::Unsubscribe( + websocket::data::outgoing::subscribe::Message::new( + asset.clone().symbol, + ), )) .unwrap(), )) @@ -177,9 +138,12 @@ async fn websocket_message_handler( loop { match stream.next().await { Some(Ok(Message::Text(data))) => { - let parsed_data = from_str::>(&data); + let parsed_data = from_str::>(&data); if let Err(e) = &parsed_data { - warn!("Unparsed websocket::incoming message: {:?}: {}", data, e); + warn!( + "Unparsed websocket::data::incoming message: {:?}: {}", + data, e + ); } for message in parsed_data.unwrap_or_default() { @@ -192,7 +156,7 @@ async fn websocket_message_handler( .send(Message::Pong(vec![])) .await .unwrap(), - Some(unknown) => error!("Unknown websocket::incoming message: {:?}", unknown), + Some(unknown) => error!("Unknown websocket::data::incoming message: {:?}", unknown), None => panic!(), } } @@ -201,11 +165,11 @@ async fn websocket_message_handler( async fn websocket_handle_text_message( app_config: &Arc, class: Class, - message: websocket::incoming::Message, + message: websocket::data::incoming::Message, backfilled: &Arc>>, ) { match message { - websocket::incoming::Message::Subscription(subscription_message) => { + websocket::data::incoming::Message::Subscription(subscription_message) => { let old_assets = backfilled .read() .await @@ -243,7 +207,7 @@ async fn websocket_handle_text_message( class, added_assets, deleted_assets ); } - websocket::incoming::Message::Bars(bar_message) => { + websocket::data::incoming::Message::Bars(bar_message) => { let bar = Bar::from(bar_message); info!( "websocket::Incoming bar for {}: {}", @@ -256,7 +220,7 @@ async fn websocket_handle_text_message( ) .await; } - websocket::incoming::Message::UpdatedBars(bar_message) => { + websocket::data::incoming::Message::UpdatedBars(bar_message) => { let bar = Bar::from(bar_message); info!( "websocket::Incoming bar for {}: {}", @@ -276,7 +240,7 @@ async fn websocket_handle_text_message( } transaction.commit().await.unwrap(); } - websocket::incoming::Message::Success(_) => {} + _ => {} } } diff --git a/src/data/mod.rs b/src/data/mod.rs index 60e5a3f..1d63dff 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -1 +1,56 @@ pub mod market; + +use crate::{config::Config, types::websocket}; +use core::panic; +use futures_util::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use serde_json::{from_str, to_string}; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +async fn authenticate_websocket( + app_config: &Arc, + stream: &mut SplitStream>>, + sink: &mut SplitSink>, Message>, +) { + match stream.next().await { + Some(Ok(Message::Text(data))) + if from_str::>(&data) + .unwrap() + .get(0) + == Some(&websocket::data::incoming::Message::Success( + websocket::data::incoming::success::Message { + msg: websocket::data::incoming::success::MessageType::Connected, + }, + )) => {} + _ => panic!(), + } + + sink.send(Message::Text( + to_string(&websocket::data::outgoing::Message::Auth( + websocket::data::outgoing::auth::Message::new( + app_config.alpaca_api_key.clone(), + app_config.alpaca_api_secret.clone(), + ), + )) + .unwrap(), + )) + .await + .unwrap(); + + match stream.next().await { + Some(Ok(Message::Text(data))) + if from_str::>(&data) + .unwrap() + .get(0) + == Some(&websocket::data::incoming::Message::Success( + websocket::data::incoming::success::Message { + msg: websocket::data::incoming::success::MessageType::Authenticated, + }, + )) => {} + _ => panic!(), + }; +} diff --git a/src/types/asset.rs b/src/types/asset.rs index 02ccfb1..af16534 100644 --- a/src/types/asset.rs +++ b/src/types/asset.rs @@ -1,8 +1,56 @@ -use super::{api::incoming, class::Class, exchange::Exchange}; -use serde::Serialize; -use sqlx::FromRow; +use super::api; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, Type}; use time::OffsetDateTime; +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type)] +pub enum Class { + #[sqlx(rename = "us_equity")] + #[serde(rename = "us_equity")] + UsEquity, + #[sqlx(rename = "crypto")] + #[serde(rename = "crypto")] + Crypto, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Type)] +pub enum Exchange { + #[sqlx(rename = "AMEX")] + #[serde(rename = "AMEX")] + Amex, + #[sqlx(rename = "ARCA")] + #[serde(rename = "ARCA")] + Arca, + #[sqlx(rename = "BATS")] + #[serde(rename = "BATS")] + Bats, + #[sqlx(rename = "NYSE")] + #[serde(rename = "NYSE")] + Nyse, + #[sqlx(rename = "NASDAQ")] + #[serde(rename = "NASDAQ")] + Nasdaq, + #[sqlx(rename = "NYSEARCA")] + #[serde(rename = "NYSEARCA")] + Nysearca, + #[sqlx(rename = "OTC")] + #[serde(rename = "OTC")] + Otc, + #[sqlx(rename = "CRYPTO")] + #[serde(rename = "CRYPTO")] + Crypto, +} + +#[derive(PartialEq, Eq, Deserialize, Type)] +pub enum Status { + #[sqlx(rename = "active")] + #[serde(rename = "active")] + Active, + #[sqlx(rename = "inactive")] + #[serde(rename = "inactive")] + Inactive, +} + #[derive(Clone, Debug, FromRow, Serialize)] pub struct Asset { pub symbol: String, @@ -14,8 +62,10 @@ pub struct Asset { pub timestamp_last: OffsetDateTime, } -impl From<(incoming::Asset, bool, OffsetDateTime)> for Asset { - fn from((asset, trading, timestamp_first): (incoming::Asset, bool, OffsetDateTime)) -> Self { +impl From<(api::incoming::Asset, bool, OffsetDateTime)> for Asset { + fn from( + (asset, trading, timestamp_first): (api::incoming::Asset, bool, OffsetDateTime), + ) -> Self { Self { symbol: asset.symbol, class: asset.class, diff --git a/src/types/bar.rs b/src/types/bar.rs index 15f7f60..471ec0e 100644 --- a/src/types/bar.rs +++ b/src/types/bar.rs @@ -39,8 +39,8 @@ impl Bar { } } -impl From for Bar { - fn from(bar_message: websocket::incoming::bar::Message) -> Self { +impl From for Bar { + fn from(bar_message: websocket::data::incoming::bar::Message) -> Self { Self { timestamp: bar_message.timestamp, asset_symbol: bar_message.symbol, diff --git a/src/types/class.rs b/src/types/class.rs deleted file mode 100644 index 9418d29..0000000 --- a/src/types/class.rs +++ /dev/null @@ -1,12 +0,0 @@ -use serde::{Deserialize, Serialize}; -use sqlx::Type; - -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type)] -pub enum Class { - #[sqlx(rename = "us_equity")] - #[serde(rename = "us_equity")] - UsEquity, - #[sqlx(rename = "crypto")] - #[serde(rename = "crypto")] - Crypto, -} diff --git a/src/types/exchange.rs b/src/types/exchange.rs deleted file mode 100644 index c7eb10b..0000000 --- a/src/types/exchange.rs +++ /dev/null @@ -1,30 +0,0 @@ -use serde::{Deserialize, Serialize}; -use sqlx::Type; - -#[derive(Clone, Copy, Debug, Serialize, Deserialize, Type)] -pub enum Exchange { - #[sqlx(rename = "AMEX")] - #[serde(rename = "AMEX")] - Amex, - #[sqlx(rename = "ARCA")] - #[serde(rename = "ARCA")] - Arca, - #[sqlx(rename = "BATS")] - #[serde(rename = "BATS")] - Bats, - #[sqlx(rename = "NYSE")] - #[serde(rename = "NYSE")] - Nyse, - #[sqlx(rename = "NASDAQ")] - #[serde(rename = "NASDAQ")] - Nasdaq, - #[sqlx(rename = "NYSEARCA")] - #[serde(rename = "NYSEARCA")] - Nysearca, - #[sqlx(rename = "OTC")] - #[serde(rename = "OTC")] - Otc, - #[sqlx(rename = "CRYPTO")] - #[serde(rename = "CRYPTO")] - Crypto, -} diff --git a/src/types/mod.rs b/src/types/mod.rs index d04d3ee..2b6196a 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,18 +1,12 @@ pub mod api; pub mod asset; pub mod bar; -pub mod class; -pub mod exchange; pub mod source; -pub mod status; pub mod websocket; -pub use asset::Asset; +pub use asset::{Asset, Class, Exchange, Status}; pub use bar::Bar; -pub use class::Class; -pub use exchange::Exchange; pub use source::Source; -pub use status::Status; #[derive(Clone, Debug)] pub enum BroadcastMessage { diff --git a/src/types/status.rs b/src/types/status.rs deleted file mode 100644 index 906f670..0000000 --- a/src/types/status.rs +++ /dev/null @@ -1,12 +0,0 @@ -use serde::Deserialize; -use sqlx::Type; - -#[derive(PartialEq, Eq, Deserialize, Type)] -pub enum Status { - #[sqlx(rename = "active")] - #[serde(rename = "active")] - Active, - #[sqlx(rename = "inactive")] - #[serde(rename = "inactive")] - Inactive, -} diff --git a/src/types/websocket/incoming/bar.rs b/src/types/websocket/data/incoming/bar.rs similarity index 100% rename from src/types/websocket/incoming/bar.rs rename to src/types/websocket/data/incoming/bar.rs diff --git a/src/types/websocket/incoming/mod.rs b/src/types/websocket/data/incoming/mod.rs similarity index 100% rename from src/types/websocket/incoming/mod.rs rename to src/types/websocket/data/incoming/mod.rs diff --git a/src/types/websocket/incoming/subscription.rs b/src/types/websocket/data/incoming/subscription.rs similarity index 100% rename from src/types/websocket/incoming/subscription.rs rename to src/types/websocket/data/incoming/subscription.rs diff --git a/src/types/websocket/incoming/success.rs b/src/types/websocket/data/incoming/success.rs similarity index 100% rename from src/types/websocket/incoming/success.rs rename to src/types/websocket/data/incoming/success.rs diff --git a/src/types/websocket/data/mod.rs b/src/types/websocket/data/mod.rs new file mode 100644 index 0000000..9aac270 --- /dev/null +++ b/src/types/websocket/data/mod.rs @@ -0,0 +1,2 @@ +pub mod incoming; +pub mod outgoing; diff --git a/src/types/websocket/outgoing/auth.rs b/src/types/websocket/data/outgoing/auth.rs similarity index 100% rename from src/types/websocket/outgoing/auth.rs rename to src/types/websocket/data/outgoing/auth.rs diff --git a/src/types/websocket/outgoing/mod.rs b/src/types/websocket/data/outgoing/mod.rs similarity index 100% rename from src/types/websocket/outgoing/mod.rs rename to src/types/websocket/data/outgoing/mod.rs diff --git a/src/types/websocket/outgoing/subscribe.rs b/src/types/websocket/data/outgoing/subscribe.rs similarity index 100% rename from src/types/websocket/outgoing/subscribe.rs rename to src/types/websocket/data/outgoing/subscribe.rs diff --git a/src/types/websocket/mod.rs b/src/types/websocket/mod.rs index 9aac270..7a345e4 100644 --- a/src/types/websocket/mod.rs +++ b/src/types/websocket/mod.rs @@ -1,2 +1 @@ -pub mod incoming; -pub mod outgoing; +pub mod data;