Add asset status management

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-02-09 15:43:42 +00:00
parent 76bf2fddcb
commit dee21d5324
8 changed files with 249 additions and 15 deletions

View File

@@ -45,3 +45,19 @@ where
.execute() .execute()
.await .await
} }
pub async fn update_status_where_symbol<T>(
clickhouse_client: &Client,
symbol: &T,
status: bool,
) -> Result<(), Error>
where
T: AsRef<str> + Serialize + Send + Sync,
{
clickhouse_client
.query("ALTER TABLE assets UPDATE status = ? WHERE symbol = ?")
.bind(status)
.bind(symbol)
.execute()
.await
}

View File

@@ -2,7 +2,7 @@ use super::ThreadType;
use crate::{ use crate::{
config::Config, config::Config,
database, database,
types::{alpaca::websocket, news::Prediction, Bar, News}, types::{alpaca::websocket, news::Prediction, Bar, Class, News},
utils::add_slash_to_pair, utils::add_slash_to_pair,
}; };
use async_trait::async_trait; use async_trait::async_trait;
@@ -222,6 +222,7 @@ async fn handle_websocket_message(
struct BarsHandler { struct BarsHandler {
config: Arc<Config>, config: Arc<Config>,
subscription_message_constructor: fn(Vec<String>) -> websocket::outgoing::subscribe::Message,
} }
#[async_trait] #[async_trait]
@@ -230,7 +231,7 @@ impl Handler for BarsHandler {
&self, &self,
symbols: Vec<String>, symbols: Vec<String>,
) -> websocket::outgoing::subscribe::Message { ) -> websocket::outgoing::subscribe::Message {
websocket::outgoing::subscribe::Message::new_market(symbols) (self.subscription_message_constructor)(symbols)
} }
async fn handle_parsed_websocket_message( async fn handle_parsed_websocket_message(
@@ -291,11 +292,40 @@ impl Handler for BarsHandler {
.await .await
.unwrap(); .unwrap();
} }
websocket::incoming::Message::Success(_) => {} websocket::incoming::Message::Status(message) => {
debug!(
"Received status message for {}: {}.",
message.symbol, message.status_message
);
match message.status {
websocket::incoming::status::Status::TradingHalt
| websocket::incoming::status::Status::VolatilityTradingPause => {
database::assets::update_status_where_symbol(
&self.config.clickhouse_client,
&message.symbol,
false,
)
.await
.unwrap();
}
websocket::incoming::status::Status::Resume
| websocket::incoming::status::Status::TradingResumption => {
database::assets::update_status_where_symbol(
&self.config.clickhouse_client,
&message.symbol,
true,
)
.await
.unwrap();
}
_ => {}
}
}
websocket::incoming::Message::Error(message) => { websocket::incoming::Message::Error(message) => {
error!("Received error message: {}.", message.message); error!("Received error message: {}.", message.message);
} }
websocket::incoming::Message::News(_) => unreachable!(), _ => unreachable!(),
} }
} }
} }
@@ -396,20 +426,26 @@ impl Handler for NewsHandler {
.await .await
.unwrap(); .unwrap();
} }
websocket::incoming::Message::Success(_) => {}
websocket::incoming::Message::Error(message) => { websocket::incoming::Message::Error(message) => {
error!("Received error message: {}.", message.message); error!("Received error message: {}.", message.message);
} }
websocket::incoming::Message::Bar(_) | websocket::incoming::Message::UpdatedBar(_) => { _ => unreachable!(),
unreachable!()
}
} }
} }
} }
pub fn create_handler(thread_type: ThreadType, config: Arc<Config>) -> Box<dyn Handler> { pub fn create_handler(thread_type: ThreadType, config: Arc<Config>) -> Box<dyn Handler> {
match thread_type { match thread_type {
ThreadType::Bars(_) => Box::new(BarsHandler { config }), ThreadType::Bars(Class::UsEquity) => Box::new(BarsHandler {
config,
subscription_message_constructor:
websocket::outgoing::subscribe::Message::new_market_us_equity,
}),
ThreadType::Bars(Class::Crypto) => Box::new(BarsHandler {
config,
subscription_message_constructor:
websocket::outgoing::subscribe::Message::new_market_crypto,
}),
ThreadType::News => Box::new(NewsHandler { config }), ThreadType::News => Box::new(NewsHandler { config }),
} }
} }

View File

@@ -83,6 +83,7 @@ impl From<Asset> for types::Asset {
symbol: item.symbol, symbol: item.symbol,
class: item.class.into(), class: item.class.into(),
exchange: item.exchange.into(), exchange: item.exchange.into(),
status: item.status.into(),
time_added: time::OffsetDateTime::now_utc(), time_added: time::OffsetDateTime::now_utc(),
} }
} }

View File

@@ -1,6 +1,7 @@
pub mod bar; pub mod bar;
pub mod error; pub mod error;
pub mod news; pub mod news;
pub mod status;
pub mod subscription; pub mod subscription;
pub mod success; pub mod success;
@@ -19,6 +20,8 @@ pub enum Message {
UpdatedBar(bar::Message), UpdatedBar(bar::Message),
#[serde(rename = "n")] #[serde(rename = "n")]
News(news::Message), News(news::Message),
#[serde(rename = "s")]
Status(status::Message),
#[serde(rename = "error")] #[serde(rename = "error")]
Error(error::Message), Error(error::Message),
} }

View File

@@ -0,0 +1,154 @@
use serde::Deserialize;
use time::OffsetDateTime;
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
pub enum Status {
#[serde(rename = "2")]
#[serde(alias = "H")]
TradingHalt,
#[serde(rename = "3")]
Resume,
#[serde(rename = "5")]
PriceIndication,
#[serde(rename = "6")]
TradingRangeIndication,
#[serde(rename = "7")]
MarketImbalanceBuy,
#[serde(rename = "8")]
MarketImbalanceSell,
#[serde(rename = "9")]
MarketOnCloseImbalanceBuy,
#[serde(rename = "A")]
MarketOnCloseImbalanceSell,
#[serde(rename = "C")]
NoMarketImbalance,
#[serde(rename = "D")]
NoMarketOnCloseImbalance,
#[serde(rename = "E")]
ShortSaleRestriction,
#[serde(rename = "F")]
LimitUpLimitDown,
#[serde(rename = "Q")]
QuotationResumption,
#[serde(rename = "T")]
TradingResumption,
#[serde(rename = "P")]
VolatilityTradingPause,
}
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[serde(tag = "rc", content = "rm")]
pub enum Reason {
#[serde(rename = "D")]
NewsReleased,
#[serde(rename = "I")]
OrderImbalance,
#[serde(rename = "M")]
LimitUpLimitDown,
#[serde(rename = "P")]
NewsPending,
#[serde(rename = "X")]
Operational,
#[serde(rename = "Y")]
SubPennyTrading,
#[serde(rename = "1")]
MarketWideCircuitBreakerL1Breached,
#[serde(rename = "2")]
MarketWideCircuitBreakerL2Breached,
#[serde(rename = "3")]
MarketWideCircuitBreakerL3Breached,
#[serde(rename = "T1")]
HaltNewsPending,
#[serde(rename = "T2")]
HaltNewsDissemination,
#[serde(rename = "T5")]
SingleStockTradingPauseInAffect,
#[serde(rename = "T6")]
RegulatoryHaltExtraordinaryMarketActivity,
#[serde(rename = "T8")]
HaltETF,
#[serde(rename = "T12")]
TradingHaltedForInformationRequestedByNASDAQ,
#[serde(rename = "H4")]
HaltNonCompliance,
#[serde(rename = "H9")]
HaltFilingsNotCurrent,
#[serde(rename = "H10")]
HaltSECTradingSuspension,
#[serde(rename = "H11")]
HaltRegulatoryConcern,
#[serde(rename = "01")]
OperationsHaltContactMarketOperations,
#[serde(rename = "IPO1")]
IPOIssueNotYetTrading,
#[serde(rename = "M1")]
CorporateAction,
#[serde(rename = "M2")]
QuotationNotAvailable,
#[serde(rename = "LUDP")]
VolatilityTradingPause,
#[serde(rename = "LUDS")]
VolatilityTradingPauseStraddleCondition,
#[serde(rename = "MWC1")]
MarketWideCircuitBreakerHaltL1,
#[serde(rename = "MWC2")]
MarketWideCircuitBreakerHaltL2,
#[serde(rename = "MWC3")]
MarketWideCircuitBreakerHaltL3,
#[serde(rename = "MWC0")]
MarketWideCircuitBreakerHaltCarryOverFromPreviousDay,
#[serde(rename = "T3")]
NewsAndResumptionTimes,
#[serde(rename = "T7")]
SingleStockTradingPauseQuotationOnlyPeriod,
#[serde(rename = "R4")]
QualificationsIssuesReviewedResolvedQuotationsTradingToResume,
#[serde(rename = "R9")]
FilingRequirementsSatisfiedResolvedQuotationsTradingToResume,
#[serde(rename = "C3")]
IssuerNewsNotForthcomingQuotationsTradingToResume,
#[serde(rename = "C4")]
QualificationsHaltEndedMaintReqMetResume,
#[serde(rename = "C9")]
QualificationsHaltConcludedFilingsMetQuotesTradesToResume,
#[serde(rename = "C11")]
TradeHaltConcludedByOtherRegulatoryAuthQuotesTradesResume,
#[serde(rename = "R1")]
NewIssueAvailable,
#[serde(rename = "R")]
IssueAvailable,
#[serde(rename = "IPOQ")]
IPOSecurityReleasedForQuotation,
#[serde(rename = "IPOE")]
IPOSecurityPositioningWindowExtension,
#[serde(rename = "MWCQ")]
MarketWideCircuitBreakerResumption,
}
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
pub enum Tape {
A,
B,
C,
O,
}
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[allow(clippy::struct_field_names)]
pub struct Message {
#[serde(rename = "t")]
#[serde(with = "time::serde::rfc3339")]
pub time: OffsetDateTime,
#[serde(rename = "S")]
pub symbol: String,
#[serde(rename = "sc")]
pub status: Status,
#[serde(rename = "sm")]
pub status_message: String,
#[serde(rename = "rc")]
pub reason: Reason,
#[serde(rename = "rm")]
pub reason_message: String,
#[serde(rename = "z")]
pub tape: Tape,
}

View File

@@ -3,22 +3,44 @@ use serde::Serialize;
#[derive(Serialize)] #[derive(Serialize)]
#[serde(untagged)] #[serde(untagged)]
pub enum Message { pub enum Market {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
Market { UsEquity {
bars: Vec<String>,
updated_bars: Vec<String>,
statuses: Vec<String>,
},
#[serde(rename_all = "camelCase")]
Crypto {
bars: Vec<String>, bars: Vec<String>,
updated_bars: Vec<String>, updated_bars: Vec<String>,
}, },
}
#[derive(Serialize)]
#[serde(untagged)]
pub enum Message {
Market(Market),
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
News { news: Vec<String> }, News {
news: Vec<String>,
},
} }
impl Message { impl Message {
pub fn new_market(symbols: Vec<String>) -> Self { pub fn new_market_us_equity(symbols: Vec<String>) -> Self {
Self::Market { Self::Market(Market::UsEquity {
bars: symbols.clone(),
updated_bars: symbols.clone(),
statuses: symbols,
})
}
pub fn new_market_crypto(symbols: Vec<String>) -> Self {
Self::Market(Market::Crypto {
bars: symbols.clone(), bars: symbols.clone(),
updated_bars: symbols, updated_bars: symbols,
} })
} }
pub fn new_news(symbols: Vec<String>) -> Self { pub fn new_news(symbols: Vec<String>) -> Self {

View File

@@ -29,6 +29,7 @@ pub struct Asset {
pub symbol: String, pub symbol: String,
pub class: Class, pub class: Class,
pub exchange: Exchange, pub exchange: Exchange,
pub status: bool,
#[serde(with = "clickhouse::serde::time::datetime")] #[serde(with = "clickhouse::serde::time::datetime")]
pub time_added: OffsetDateTime, pub time_added: OffsetDateTime,
} }

View File

@@ -11,6 +11,7 @@ CREATE TABLE IF NOT EXISTS qrust.assets (
'OTC' = 7, 'OTC' = 7,
'CRYPTO' = 8 'CRYPTO' = 8
), ),
status Boolean,
time_added DateTime DEFAULT now(), time_added DateTime DEFAULT now(),
) )
ENGINE = ReplacingMergeTree() ENGINE = ReplacingMergeTree()