From dee21d53245240478d4b1ad10fcd465556ec883f Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Fri, 9 Feb 2024 15:43:42 +0000 Subject: [PATCH] Add asset status management Signed-off-by: Nikolaos Karaolidis --- src/database/assets.rs | 16 ++ src/threads/data/websocket.rs | 54 +++++- src/types/alpaca/api/incoming/asset.rs | 1 + src/types/alpaca/websocket/incoming/mod.rs | 3 + src/types/alpaca/websocket/incoming/status.rs | 154 ++++++++++++++++++ .../alpaca/websocket/outgoing/subscribe.rs | 34 +++- src/types/asset.rs | 1 + .../docker-entrypoint-initdb.d/0000_init.sql | 1 + 8 files changed, 249 insertions(+), 15 deletions(-) create mode 100644 src/types/alpaca/websocket/incoming/status.rs diff --git a/src/database/assets.rs b/src/database/assets.rs index 6c087f9..94f7f87 100644 --- a/src/database/assets.rs +++ b/src/database/assets.rs @@ -45,3 +45,19 @@ where .execute() .await } + +pub async fn update_status_where_symbol( + clickhouse_client: &Client, + symbol: &T, + status: bool, +) -> Result<(), Error> +where + T: AsRef + Serialize + Send + Sync, +{ + clickhouse_client + .query("ALTER TABLE assets UPDATE status = ? WHERE symbol = ?") + .bind(status) + .bind(symbol) + .execute() + .await +} diff --git a/src/threads/data/websocket.rs b/src/threads/data/websocket.rs index 3b1c17c..e7538fb 100644 --- a/src/threads/data/websocket.rs +++ b/src/threads/data/websocket.rs @@ -2,7 +2,7 @@ use super::ThreadType; use crate::{ config::Config, database, - types::{alpaca::websocket, news::Prediction, Bar, News}, + types::{alpaca::websocket, news::Prediction, Bar, Class, News}, utils::add_slash_to_pair, }; use async_trait::async_trait; @@ -222,6 +222,7 @@ async fn handle_websocket_message( struct BarsHandler { config: Arc, + subscription_message_constructor: fn(Vec) -> websocket::outgoing::subscribe::Message, } #[async_trait] @@ -230,7 +231,7 @@ impl Handler for BarsHandler { &self, symbols: Vec, ) -> websocket::outgoing::subscribe::Message { - websocket::outgoing::subscribe::Message::new_market(symbols) + (self.subscription_message_constructor)(symbols) } async fn handle_parsed_websocket_message( @@ -291,11 +292,40 @@ impl Handler for BarsHandler { .await .unwrap(); } - websocket::incoming::Message::Success(_) => {} + websocket::incoming::Message::Status(message) => { + debug!( + "Received status message for {}: {}.", + message.symbol, message.status_message + ); + + match message.status { + websocket::incoming::status::Status::TradingHalt + | websocket::incoming::status::Status::VolatilityTradingPause => { + database::assets::update_status_where_symbol( + &self.config.clickhouse_client, + &message.symbol, + false, + ) + .await + .unwrap(); + } + websocket::incoming::status::Status::Resume + | websocket::incoming::status::Status::TradingResumption => { + database::assets::update_status_where_symbol( + &self.config.clickhouse_client, + &message.symbol, + true, + ) + .await + .unwrap(); + } + _ => {} + } + } websocket::incoming::Message::Error(message) => { error!("Received error message: {}.", message.message); } - websocket::incoming::Message::News(_) => unreachable!(), + _ => unreachable!(), } } } @@ -396,20 +426,26 @@ impl Handler for NewsHandler { .await .unwrap(); } - websocket::incoming::Message::Success(_) => {} websocket::incoming::Message::Error(message) => { error!("Received error message: {}.", message.message); } - websocket::incoming::Message::Bar(_) | websocket::incoming::Message::UpdatedBar(_) => { - unreachable!() - } + _ => unreachable!(), } } } pub fn create_handler(thread_type: ThreadType, config: Arc) -> Box { match thread_type { - ThreadType::Bars(_) => Box::new(BarsHandler { config }), + ThreadType::Bars(Class::UsEquity) => Box::new(BarsHandler { + config, + subscription_message_constructor: + websocket::outgoing::subscribe::Message::new_market_us_equity, + }), + ThreadType::Bars(Class::Crypto) => Box::new(BarsHandler { + config, + subscription_message_constructor: + websocket::outgoing::subscribe::Message::new_market_crypto, + }), ThreadType::News => Box::new(NewsHandler { config }), } } diff --git a/src/types/alpaca/api/incoming/asset.rs b/src/types/alpaca/api/incoming/asset.rs index e79ce4d..8a70949 100644 --- a/src/types/alpaca/api/incoming/asset.rs +++ b/src/types/alpaca/api/incoming/asset.rs @@ -83,6 +83,7 @@ impl From for types::Asset { symbol: item.symbol, class: item.class.into(), exchange: item.exchange.into(), + status: item.status.into(), time_added: time::OffsetDateTime::now_utc(), } } diff --git a/src/types/alpaca/websocket/incoming/mod.rs b/src/types/alpaca/websocket/incoming/mod.rs index c955f9d..0ac40c9 100644 --- a/src/types/alpaca/websocket/incoming/mod.rs +++ b/src/types/alpaca/websocket/incoming/mod.rs @@ -1,6 +1,7 @@ pub mod bar; pub mod error; pub mod news; +pub mod status; pub mod subscription; pub mod success; @@ -19,6 +20,8 @@ pub enum Message { UpdatedBar(bar::Message), #[serde(rename = "n")] News(news::Message), + #[serde(rename = "s")] + Status(status::Message), #[serde(rename = "error")] Error(error::Message), } diff --git a/src/types/alpaca/websocket/incoming/status.rs b/src/types/alpaca/websocket/incoming/status.rs new file mode 100644 index 0000000..2f7ec81 --- /dev/null +++ b/src/types/alpaca/websocket/incoming/status.rs @@ -0,0 +1,154 @@ +use serde::Deserialize; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +pub enum Status { + #[serde(rename = "2")] + #[serde(alias = "H")] + TradingHalt, + #[serde(rename = "3")] + Resume, + #[serde(rename = "5")] + PriceIndication, + #[serde(rename = "6")] + TradingRangeIndication, + #[serde(rename = "7")] + MarketImbalanceBuy, + #[serde(rename = "8")] + MarketImbalanceSell, + #[serde(rename = "9")] + MarketOnCloseImbalanceBuy, + #[serde(rename = "A")] + MarketOnCloseImbalanceSell, + #[serde(rename = "C")] + NoMarketImbalance, + #[serde(rename = "D")] + NoMarketOnCloseImbalance, + #[serde(rename = "E")] + ShortSaleRestriction, + #[serde(rename = "F")] + LimitUpLimitDown, + #[serde(rename = "Q")] + QuotationResumption, + #[serde(rename = "T")] + TradingResumption, + #[serde(rename = "P")] + VolatilityTradingPause, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[serde(tag = "rc", content = "rm")] +pub enum Reason { + #[serde(rename = "D")] + NewsReleased, + #[serde(rename = "I")] + OrderImbalance, + #[serde(rename = "M")] + LimitUpLimitDown, + #[serde(rename = "P")] + NewsPending, + #[serde(rename = "X")] + Operational, + #[serde(rename = "Y")] + SubPennyTrading, + #[serde(rename = "1")] + MarketWideCircuitBreakerL1Breached, + #[serde(rename = "2")] + MarketWideCircuitBreakerL2Breached, + #[serde(rename = "3")] + MarketWideCircuitBreakerL3Breached, + #[serde(rename = "T1")] + HaltNewsPending, + #[serde(rename = "T2")] + HaltNewsDissemination, + #[serde(rename = "T5")] + SingleStockTradingPauseInAffect, + #[serde(rename = "T6")] + RegulatoryHaltExtraordinaryMarketActivity, + #[serde(rename = "T8")] + HaltETF, + #[serde(rename = "T12")] + TradingHaltedForInformationRequestedByNASDAQ, + #[serde(rename = "H4")] + HaltNonCompliance, + #[serde(rename = "H9")] + HaltFilingsNotCurrent, + #[serde(rename = "H10")] + HaltSECTradingSuspension, + #[serde(rename = "H11")] + HaltRegulatoryConcern, + #[serde(rename = "01")] + OperationsHaltContactMarketOperations, + #[serde(rename = "IPO1")] + IPOIssueNotYetTrading, + #[serde(rename = "M1")] + CorporateAction, + #[serde(rename = "M2")] + QuotationNotAvailable, + #[serde(rename = "LUDP")] + VolatilityTradingPause, + #[serde(rename = "LUDS")] + VolatilityTradingPauseStraddleCondition, + #[serde(rename = "MWC1")] + MarketWideCircuitBreakerHaltL1, + #[serde(rename = "MWC2")] + MarketWideCircuitBreakerHaltL2, + #[serde(rename = "MWC3")] + MarketWideCircuitBreakerHaltL3, + #[serde(rename = "MWC0")] + MarketWideCircuitBreakerHaltCarryOverFromPreviousDay, + #[serde(rename = "T3")] + NewsAndResumptionTimes, + #[serde(rename = "T7")] + SingleStockTradingPauseQuotationOnlyPeriod, + #[serde(rename = "R4")] + QualificationsIssuesReviewedResolvedQuotationsTradingToResume, + #[serde(rename = "R9")] + FilingRequirementsSatisfiedResolvedQuotationsTradingToResume, + #[serde(rename = "C3")] + IssuerNewsNotForthcomingQuotationsTradingToResume, + #[serde(rename = "C4")] + QualificationsHaltEndedMaintReqMetResume, + #[serde(rename = "C9")] + QualificationsHaltConcludedFilingsMetQuotesTradesToResume, + #[serde(rename = "C11")] + TradeHaltConcludedByOtherRegulatoryAuthQuotesTradesResume, + #[serde(rename = "R1")] + NewIssueAvailable, + #[serde(rename = "R")] + IssueAvailable, + #[serde(rename = "IPOQ")] + IPOSecurityReleasedForQuotation, + #[serde(rename = "IPOE")] + IPOSecurityPositioningWindowExtension, + #[serde(rename = "MWCQ")] + MarketWideCircuitBreakerResumption, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +pub enum Tape { + A, + B, + C, + O, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[allow(clippy::struct_field_names)] +pub struct Message { + #[serde(rename = "t")] + #[serde(with = "time::serde::rfc3339")] + pub time: OffsetDateTime, + #[serde(rename = "S")] + pub symbol: String, + #[serde(rename = "sc")] + pub status: Status, + #[serde(rename = "sm")] + pub status_message: String, + #[serde(rename = "rc")] + pub reason: Reason, + #[serde(rename = "rm")] + pub reason_message: String, + #[serde(rename = "z")] + pub tape: Tape, +} diff --git a/src/types/alpaca/websocket/outgoing/subscribe.rs b/src/types/alpaca/websocket/outgoing/subscribe.rs index ef78c28..30591b4 100644 --- a/src/types/alpaca/websocket/outgoing/subscribe.rs +++ b/src/types/alpaca/websocket/outgoing/subscribe.rs @@ -3,22 +3,44 @@ use serde::Serialize; #[derive(Serialize)] #[serde(untagged)] -pub enum Message { +pub enum Market { #[serde(rename_all = "camelCase")] - Market { + UsEquity { + bars: Vec, + updated_bars: Vec, + statuses: Vec, + }, + #[serde(rename_all = "camelCase")] + Crypto { bars: Vec, updated_bars: Vec, }, +} + +#[derive(Serialize)] +#[serde(untagged)] +pub enum Message { + Market(Market), #[serde(rename_all = "camelCase")] - News { news: Vec }, + News { + news: Vec, + }, } impl Message { - pub fn new_market(symbols: Vec) -> Self { - Self::Market { + pub fn new_market_us_equity(symbols: Vec) -> Self { + Self::Market(Market::UsEquity { + bars: symbols.clone(), + updated_bars: symbols.clone(), + statuses: symbols, + }) + } + + pub fn new_market_crypto(symbols: Vec) -> Self { + Self::Market(Market::Crypto { bars: symbols.clone(), updated_bars: symbols, - } + }) } pub fn new_news(symbols: Vec) -> Self { diff --git a/src/types/asset.rs b/src/types/asset.rs index 7a3335c..2d05b92 100644 --- a/src/types/asset.rs +++ b/src/types/asset.rs @@ -29,6 +29,7 @@ pub struct Asset { pub symbol: String, pub class: Class, pub exchange: Exchange, + pub status: bool, #[serde(with = "clickhouse::serde::time::datetime")] pub time_added: OffsetDateTime, } diff --git a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql index c8650fe..bebb309 100644 --- a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql +++ b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql @@ -11,6 +11,7 @@ CREATE TABLE IF NOT EXISTS qrust.assets ( 'OTC' = 7, 'CRYPTO' = 8 ), + status Boolean, time_added DateTime DEFAULT now(), ) ENGINE = ReplacingMergeTree()