diff --git a/Cargo.lock b/Cargo.lock index bba5fb2..2b5e6a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2573,6 +2573,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" dependencies = [ "getrandom", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 58f08e5..89e2f46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,9 @@ clickhouse = { version = "0.11.6", features = [ "time", "uuid", ] } -uuid = "1.6.1" +uuid = { version = "1.6.1", features = [ + "serde", +] } time = { version = "0.3.31", features = [ "serde", "formatting", diff --git a/src/config.rs b/src/config.rs index 09d3c24..f06eed9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use crate::types::alpaca::Source; +use crate::types::alpaca::shared::Source; use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; use reqwest::{ header::{HeaderMap, HeaderName, HeaderValue}, @@ -15,7 +15,9 @@ use std::{env, num::NonZeroU32, path::PathBuf, sync::Arc}; use tokio::sync::Mutex; pub const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets"; +pub const ALPACA_ORDER_API_URL: &str = "https://api.alpaca.markets/v2/orders"; pub const ALPACA_CLOCK_API_URL: &str = "https://api.alpaca.markets/v2/clock"; + pub const ALPACA_STOCK_DATA_URL: &str = "https://data.alpaca.markets/v2/stocks/bars"; pub const ALPACA_CRYPTO_DATA_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars"; pub const ALPACA_NEWS_DATA_URL: &str = "https://data.alpaca.markets/v1beta1/news"; diff --git a/src/database/mod.rs b/src/database/mod.rs index 792108d..c3dc204 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -2,3 +2,4 @@ pub mod assets; pub mod backfills; pub mod bars; pub mod news; +pub mod orders; diff --git a/src/database/orders.rs b/src/database/orders.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/threads/data/backfill.rs b/src/threads/data/backfill.rs index c70ff23..fa72bf9 100644 --- a/src/threads/data/backfill.rs +++ b/src/threads/data/backfill.rs @@ -6,7 +6,7 @@ use crate::{ alpaca::{ self, api::{self, outgoing::Sort}, - Source, + shared::Source, }, news::Prediction, Backfill, Bar, Class, News, @@ -248,7 +248,7 @@ impl Handler for BarHandler { async fn backfill(&self, symbol: String, fetch_from: OffsetDateTime, fetch_to: OffsetDateTime) { info!("Backfilling bars for {}.", symbol); - let mut bars = Vec::new(); + let mut bars = vec![]; let mut next_page_token = None; loop { @@ -348,7 +348,7 @@ impl Handler for NewsHandler { async fn backfill(&self, symbol: String, fetch_from: OffsetDateTime, fetch_to: OffsetDateTime) { info!("Backfilling news for {}.", symbol); - let mut news = Vec::new(); + let mut news = vec![]; let mut next_page_token = None; loop { diff --git a/src/threads/data/mod.rs b/src/threads/data/mod.rs index 81f00e1..b950864 100644 --- a/src/threads/data/mod.rs +++ b/src/threads/data/mod.rs @@ -8,7 +8,7 @@ use crate::{ }, create_send_await, database, types::{alpaca, Asset, Class}, - utils::{authenticate, backoff, cleanup}, + utils::{backoff, cleanup}, }; use futures_util::{future::join_all, StreamExt}; use itertools::{Either, Itertools}; @@ -19,7 +19,7 @@ use tokio::{ }; use tokio_tungstenite::connect_async; -#[derive(Clone)] +#[derive(Clone, Copy)] pub enum Action { Add, Remove, @@ -45,7 +45,7 @@ impl Message { } } -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy)] pub enum ThreadType { Bars(Class), News, @@ -107,7 +107,8 @@ async fn init_thread( let (websocket, _) = connect_async(websocket_url).await.unwrap(); let (mut websocket_sink, mut websocket_stream) = websocket.split(); - authenticate(&config, &mut websocket_sink, &mut websocket_stream).await; + alpaca::websocket::data::authenticate(&config, &mut websocket_sink, &mut websocket_stream) + .await; let (backfill_sender, backfill_receiver) = mpsc::channel(100); spawn(backfill::run( @@ -160,14 +161,14 @@ async fn handle_message( create_send_await!( bars_us_equity_websocket_sender, websocket::Message::new, - message.action.clone().into(), + message.action.into(), us_equity_symbols.clone() ); create_send_await!( bars_us_equity_backfill_sender, backfill::Message::new, - message.action.clone().into(), + message.action.into(), us_equity_symbols ); }; @@ -180,14 +181,14 @@ async fn handle_message( create_send_await!( bars_crypto_websocket_sender, websocket::Message::new, - message.action.clone().into(), + message.action.into(), crypto_symbols.clone() ); create_send_await!( bars_crypto_backfill_sender, backfill::Message::new, - message.action.clone().into(), + message.action.into(), crypto_symbols ); }; @@ -196,14 +197,14 @@ async fn handle_message( create_send_await!( news_websocket_sender, websocket::Message::new, - message.action.clone().into(), + message.action.into(), symbols.clone() ); create_send_await!( news_backfill_sender, backfill::Message::new, - message.action.clone().into(), + message.action.into(), symbols.clone() ); }; diff --git a/src/threads/data/websocket.rs b/src/threads/data/websocket.rs index e7538fb..9d1f055 100644 --- a/src/threads/data/websocket.rs +++ b/src/threads/data/websocket.rs @@ -66,11 +66,11 @@ pub trait Handler: Send + Sync { fn create_subscription_message( &self, symbols: Vec, - ) -> websocket::outgoing::subscribe::Message; + ) -> websocket::data::outgoing::subscribe::Message; async fn handle_parsed_websocket_message( &self, pending: Arc>, - message: websocket::incoming::Message, + message: websocket::data::incoming::Message, ); } @@ -138,7 +138,7 @@ async fn handle_message( .lock() .await .send(tungstenite::Message::Text( - to_string(&websocket::outgoing::Message::Subscribe( + to_string(&websocket::data::outgoing::Message::Subscribe( handler.create_subscription_message(message.symbols), )) .unwrap(), @@ -168,7 +168,7 @@ async fn handle_message( .lock() .await .send(tungstenite::Message::Text( - to_string(&websocket::outgoing::Message::Unsubscribe( + to_string(&websocket::data::outgoing::Message::Unsubscribe( handler.create_subscription_message(message.symbols.clone()), )) .unwrap(), @@ -191,7 +191,7 @@ async fn handle_websocket_message( ) { match message { tungstenite::Message::Text(message) => { - let message = from_str::>(&message); + let message = from_str::>(&message); if let Ok(message) = message { for message in message { @@ -222,7 +222,8 @@ async fn handle_websocket_message( struct BarsHandler { config: Arc, - subscription_message_constructor: fn(Vec) -> websocket::outgoing::subscribe::Message, + subscription_message_constructor: + fn(Vec) -> websocket::data::outgoing::subscribe::Message, } #[async_trait] @@ -230,19 +231,20 @@ impl Handler for BarsHandler { fn create_subscription_message( &self, symbols: Vec, - ) -> websocket::outgoing::subscribe::Message { + ) -> websocket::data::outgoing::subscribe::Message { (self.subscription_message_constructor)(symbols) } async fn handle_parsed_websocket_message( &self, pending: Arc>, - message: websocket::incoming::Message, + message: websocket::data::incoming::Message, ) { match message { - websocket::incoming::Message::Subscription(message) => { - let websocket::incoming::subscription::Message::Market { bars: symbols, .. } = - message + websocket::data::incoming::Message::Subscription(message) => { + let websocket::data::incoming::subscription::Message::Market { + bars: symbols, .. + } = message else { unreachable!() }; @@ -283,8 +285,8 @@ impl Handler for BarsHandler { } } } - websocket::incoming::Message::Bar(message) - | websocket::incoming::Message::UpdatedBar(message) => { + websocket::data::incoming::Message::Bar(message) + | websocket::data::incoming::Message::UpdatedBar(message) => { let bar = Bar::from(message); debug!("Received bar for {}: {}.", bar.symbol, bar.time); @@ -292,15 +294,15 @@ impl Handler for BarsHandler { .await .unwrap(); } - websocket::incoming::Message::Status(message) => { + websocket::data::incoming::Message::Status(message) => { debug!( "Received status message for {}: {}.", message.symbol, message.status_message ); match message.status { - websocket::incoming::status::Status::TradingHalt - | websocket::incoming::status::Status::VolatilityTradingPause => { + websocket::data::incoming::status::Status::TradingHalt + | websocket::data::incoming::status::Status::VolatilityTradingPause => { database::assets::update_status_where_symbol( &self.config.clickhouse_client, &message.symbol, @@ -309,8 +311,8 @@ impl Handler for BarsHandler { .await .unwrap(); } - websocket::incoming::status::Status::Resume - | websocket::incoming::status::Status::TradingResumption => { + websocket::data::incoming::status::Status::Resume + | websocket::data::incoming::status::Status::TradingResumption => { database::assets::update_status_where_symbol( &self.config.clickhouse_client, &message.symbol, @@ -322,7 +324,7 @@ impl Handler for BarsHandler { _ => {} } } - websocket::incoming::Message::Error(message) => { + websocket::data::incoming::Message::Error(message) => { error!("Received error message: {}.", message.message); } _ => unreachable!(), @@ -339,18 +341,19 @@ impl Handler for NewsHandler { fn create_subscription_message( &self, symbols: Vec, - ) -> websocket::outgoing::subscribe::Message { - websocket::outgoing::subscribe::Message::new_news(symbols) + ) -> websocket::data::outgoing::subscribe::Message { + websocket::data::outgoing::subscribe::Message::new_news(symbols) } async fn handle_parsed_websocket_message( &self, pending: Arc>, - message: websocket::incoming::Message, + message: websocket::data::incoming::Message, ) { match message { - websocket::incoming::Message::Subscription(message) => { - let websocket::incoming::subscription::Message::News { news: symbols } = message + websocket::data::incoming::Message::Subscription(message) => { + let websocket::data::incoming::subscription::Message::News { news: symbols } = + message else { unreachable!() }; @@ -396,7 +399,7 @@ impl Handler for NewsHandler { } } } - websocket::incoming::Message::News(message) => { + websocket::data::incoming::Message::News(message) => { let news = News::from(message); debug!( @@ -426,7 +429,7 @@ impl Handler for NewsHandler { .await .unwrap(); } - websocket::incoming::Message::Error(message) => { + websocket::data::incoming::Message::Error(message) => { error!("Received error message: {}.", message.message); } _ => unreachable!(), @@ -439,12 +442,12 @@ pub fn create_handler(thread_type: ThreadType, config: Arc) -> Box Box::new(BarsHandler { config, subscription_message_constructor: - websocket::outgoing::subscribe::Message::new_market_us_equity, + websocket::data::outgoing::subscribe::Message::new_market_us_equity, }), ThreadType::Bars(Class::Crypto) => Box::new(BarsHandler { config, subscription_message_constructor: - websocket::outgoing::subscribe::Message::new_market_crypto, + websocket::data::outgoing::subscribe::Message::new_market_crypto, }), ThreadType::News => Box::new(NewsHandler { config }), } diff --git a/src/threads/mod.rs b/src/threads/mod.rs index 5f09b94..53e0177 100644 --- a/src/threads/mod.rs +++ b/src/threads/mod.rs @@ -1,2 +1,3 @@ pub mod clock; pub mod data; +pub mod trading; diff --git a/src/threads/trading/mod.rs b/src/threads/trading/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/types/alpaca/api/incoming/asset.rs b/src/types/alpaca/api/incoming/asset.rs index 8a70949..8ae5d57 100644 --- a/src/types/alpaca/api/incoming/asset.rs +++ b/src/types/alpaca/api/incoming/asset.rs @@ -1,68 +1,21 @@ use crate::{ config::{Config, ALPACA_ASSET_API_URL}, - impl_from_enum, types, + types::{ + self, + alpaca::shared::asset::{Class, Exchange, Status}, + }, }; use backoff::{future::retry_notify, ExponentialBackoff}; use log::warn; use reqwest::Error; use serde::Deserialize; use std::{sync::Arc, time::Duration}; - -#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum Class { - UsEquity, - Crypto, -} - -impl_from_enum!(types::Class, Class, UsEquity, Crypto); - -#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize)] -#[serde(rename_all = "UPPERCASE")] -pub enum Exchange { - Amex, - Arca, - Bats, - Nyse, - Nasdaq, - Nysearca, - Otc, - Crypto, -} - -impl_from_enum!( - types::Exchange, - Exchange, - Amex, - Arca, - Bats, - Nyse, - Nasdaq, - Nysearca, - Otc, - Crypto -); - -#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum Status { - Active, - Inactive, -} - -impl From for bool { - fn from(status: Status) -> Self { - match status { - Status::Active => true, - Status::Inactive => false, - } - } -} +use uuid::Uuid; #[allow(clippy::struct_excessive_bools)] -#[derive(Clone, Debug, PartialEq, Deserialize)] +#[derive(Deserialize)] pub struct Asset { - pub id: String, + pub id: Uuid, pub class: Class, pub exchange: Exchange, pub symbol: String, @@ -78,12 +31,12 @@ pub struct Asset { } impl From for types::Asset { - fn from(item: Asset) -> Self { + fn from(asset: Asset) -> Self { Self { - symbol: item.symbol, - class: item.class.into(), - exchange: item.exchange.into(), - status: item.status.into(), + symbol: asset.symbol, + class: asset.class.into(), + exchange: asset.exchange.into(), + status: asset.status.into(), time_added: time::OffsetDateTime::now_utc(), } } diff --git a/src/types/alpaca/api/incoming/bar.rs b/src/types/alpaca/api/incoming/bar.rs index 88b5ddc..4511cef 100644 --- a/src/types/alpaca/api/incoming/bar.rs +++ b/src/types/alpaca/api/incoming/bar.rs @@ -9,7 +9,7 @@ use serde::Deserialize; use std::{collections::HashMap, sync::Arc, time::Duration}; use time::OffsetDateTime; -#[derive(Clone, Debug, PartialEq, Deserialize)] +#[derive(Deserialize)] pub struct Bar { #[serde(rename = "t")] #[serde(with = "time::serde::rfc3339")] @@ -46,7 +46,7 @@ impl From<(Bar, String)> for types::Bar { } } -#[derive(Clone, Debug, PartialEq, Deserialize)] +#[derive(Deserialize)] pub struct Message { pub bars: HashMap>, pub next_page_token: Option, diff --git a/src/types/alpaca/api/incoming/clock.rs b/src/types/alpaca/api/incoming/clock.rs index 2467367..0ff4176 100644 --- a/src/types/alpaca/api/incoming/clock.rs +++ b/src/types/alpaca/api/incoming/clock.rs @@ -6,7 +6,7 @@ use serde::Deserialize; use std::{sync::Arc, time::Duration}; use time::OffsetDateTime; -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize)] pub struct Clock { #[serde(with = "time::serde::rfc3339")] pub timestamp: OffsetDateTime, diff --git a/src/types/alpaca/api/incoming/mod.rs b/src/types/alpaca/api/incoming/mod.rs index 30dd91b..dbeec36 100644 --- a/src/types/alpaca/api/incoming/mod.rs +++ b/src/types/alpaca/api/incoming/mod.rs @@ -2,3 +2,4 @@ pub mod asset; pub mod bar; pub mod clock; pub mod news; +pub mod order; diff --git a/src/types/alpaca/api/incoming/news.rs b/src/types/alpaca/api/incoming/news.rs index 8e10451..e4bd315 100644 --- a/src/types/alpaca/api/incoming/news.rs +++ b/src/types/alpaca/api/incoming/news.rs @@ -10,21 +10,21 @@ use serde::Deserialize; use std::{sync::Arc, time::Duration}; use time::OffsetDateTime; -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] -#[serde(rename_all = "camelCase")] +#[derive(Deserialize)] +#[serde(rename_all = "snake_case")] pub enum ImageSize { Thumb, Small, Large, } -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize)] pub struct Image { pub size: ImageSize, pub url: String, } -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize)] pub struct News { pub id: i64, #[serde(with = "time::serde::rfc3339")] @@ -66,7 +66,7 @@ impl From for types::News { } } -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize)] pub struct Message { pub news: Vec, pub next_page_token: Option, diff --git a/src/types/alpaca/api/incoming/order.rs b/src/types/alpaca/api/incoming/order.rs new file mode 100644 index 0000000..1ee3d6d --- /dev/null +++ b/src/types/alpaca/api/incoming/order.rs @@ -0,0 +1,45 @@ +use crate::{ + config::{Config, ALPACA_ORDER_API_URL}, + types::alpaca::{api::outgoing, shared}, +}; +use backoff::{future::retry_notify, ExponentialBackoff}; +use log::warn; +use reqwest::Error; +use std::{sync::Arc, time::Duration}; + +pub use shared::order::Order; + +pub async fn get( + config: &Arc, + query: &outgoing::order::Order, + backoff: Option, +) -> Result, Error> { + retry_notify( + backoff.unwrap_or_default(), + || async { + config.alpaca_rate_limit.until_ready().await; + config + .alpaca_client + .get(ALPACA_ORDER_API_URL) + .query(query) + .send() + .await? + .error_for_status() + .map_err(|e| match e.status() { + Some(reqwest::StatusCode::FORBIDDEN) => backoff::Error::Permanent(e), + _ => e.into(), + })? + .json::>() + .await + .map_err(backoff::Error::Permanent) + }, + |e, duration: Duration| { + warn!( + "Failed to get orders, will retry in {} seconds: {}", + duration.as_secs(), + e + ); + }, + ) + .await +} diff --git a/src/types/alpaca/api/outgoing/bar.rs b/src/types/alpaca/api/outgoing/bar.rs index 9e1bce9..eceb03c 100644 --- a/src/types/alpaca/api/outgoing/bar.rs +++ b/src/types/alpaca/api/outgoing/bar.rs @@ -1,5 +1,5 @@ use super::{serialize_symbols, Sort}; -use crate::types::alpaca::Source; +use crate::types::alpaca::shared::Source; use serde::Serialize; use std::time::Duration; use time::OffsetDateTime; diff --git a/src/types/alpaca/api/outgoing/mod.rs b/src/types/alpaca/api/outgoing/mod.rs index f26e908..83e799c 100644 --- a/src/types/alpaca/api/outgoing/mod.rs +++ b/src/types/alpaca/api/outgoing/mod.rs @@ -1,10 +1,11 @@ pub mod bar; pub mod news; +pub mod order; use serde::{Serialize, Serializer}; #[derive(Serialize)] -#[serde(rename_all = "camelCase")] +#[serde(rename_all = "snake_case")] #[allow(dead_code)] pub enum Sort { Asc, @@ -18,3 +19,16 @@ where let string = symbols.join(","); serializer.serialize_str(&string) } + +fn serialize_symbols_option( + symbols: &Option>, + serializer: S, +) -> Result +where + S: Serializer, +{ + match symbols { + Some(symbols) => serialize_symbols(symbols, serializer), + None => serializer.serialize_none(), + } +} diff --git a/src/types/alpaca/api/outgoing/order.rs b/src/types/alpaca/api/outgoing/order.rs new file mode 100644 index 0000000..c4bca0b --- /dev/null +++ b/src/types/alpaca/api/outgoing/order.rs @@ -0,0 +1,35 @@ +use super::{serialize_symbols_option, Sort}; +use crate::types::alpaca::shared::order::Side; +use serde::Serialize; +use time::OffsetDateTime; + +#[derive(Serialize)] +#[serde(rename_all = "snake_case")] +pub enum Status { + Open, + Closed, + All, +} + +#[derive(Serialize)] +pub struct Order { + #[serde(skip_serializing_if = "Option::is_none")] + pub status: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub limit: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "time::serde::rfc3339::option")] + pub after: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "time::serde::rfc3339::option")] + pub until: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub direction: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub nested: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_symbols_option")] + pub symbols: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub side: Option, +} diff --git a/src/types/alpaca/mod.rs b/src/types/alpaca/mod.rs index aa780dc..e4e0157 100644 --- a/src/types/alpaca/mod.rs +++ b/src/types/alpaca/mod.rs @@ -1,5 +1,3 @@ pub mod api; -pub mod source; +pub mod shared; pub mod websocket; - -pub use source::Source; diff --git a/src/types/alpaca/shared/asset.rs b/src/types/alpaca/shared/asset.rs new file mode 100644 index 0000000..76f4d95 --- /dev/null +++ b/src/types/alpaca/shared/asset.rs @@ -0,0 +1,53 @@ +use crate::{impl_from_enum, types}; +use serde::Deserialize; + +#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum Class { + UsEquity, + Crypto, +} + +impl_from_enum!(types::Class, Class, UsEquity, Crypto); + +#[derive(Deserialize)] +#[serde(rename_all = "UPPERCASE")] +pub enum Exchange { + Amex, + Arca, + Bats, + Nyse, + Nasdaq, + Nysearca, + Otc, + Crypto, +} + +impl_from_enum!( + types::Exchange, + Exchange, + Amex, + Arca, + Bats, + Nyse, + Nasdaq, + Nysearca, + Otc, + Crypto +); + +#[derive(Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum Status { + Active, + Inactive, +} + +impl From for bool { + fn from(status: Status) -> Self { + match status { + Status::Active => true, + Status::Inactive => false, + } + } +} diff --git a/src/types/alpaca/shared/mod.rs b/src/types/alpaca/shared/mod.rs new file mode 100644 index 0000000..b345356 --- /dev/null +++ b/src/types/alpaca/shared/mod.rs @@ -0,0 +1,5 @@ +pub mod asset; +pub mod order; +pub mod source; + +pub use source::Source; diff --git a/src/types/alpaca/shared/order.rs b/src/types/alpaca/shared/order.rs new file mode 100644 index 0000000..9464dc3 --- /dev/null +++ b/src/types/alpaca/shared/order.rs @@ -0,0 +1,214 @@ +use crate::{impl_from_enum, types}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum Class { + #[serde(alias = "")] + Simple, + Bracket, + Oco, + Oto, +} + +impl_from_enum!(types::order::Class, Class, Simple, Bracket, Oco, Oto); + +#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum Type { + Market, + Limit, + Stop, + StopLimit, + TrailingStop, +} + +impl_from_enum!( + types::order::Type, + Type, + Market, + Limit, + Stop, + StopLimit, + TrailingStop +); + +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum Side { + Buy, + Sell, +} + +impl_from_enum!(types::order::Side, Side, Buy, Sell); + +#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TimeInForce { + Day, + Gtc, + Opg, + Cls, + Ioc, + Fok, +} + +impl_from_enum!( + types::order::TimeInForce, + TimeInForce, + Day, + Gtc, + Opg, + Cls, + Ioc, + Fok +); + +#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum Status { + New, + PartiallyFilled, + Filled, + DoneForDay, + Canceled, + Expired, + Replaced, + PendingCancel, + PendingReplace, + Accepted, + PendingNew, + AcceptedForBidding, + Stopped, + Rejected, + Suspended, + Calculated, +} + +impl_from_enum!( + types::order::Status, + Status, + New, + PartiallyFilled, + Filled, + DoneForDay, + Canceled, + Expired, + Replaced, + PendingCancel, + PendingReplace, + Accepted, + PendingNew, + AcceptedForBidding, + Stopped, + Rejected, + Suspended, + Calculated +); + +#[derive(Deserialize, Clone, Debug, PartialEq)] +#[allow(clippy::struct_field_names)] +pub struct Order { + pub id: Uuid, + pub client_order_id: Uuid, + #[serde(with = "time::serde::rfc3339")] + pub created_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339::option")] + pub updated_at: Option, + #[serde(with = "time::serde::rfc3339")] + pub submitted_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339::option")] + pub filled_at: Option, + #[serde(with = "time::serde::rfc3339::option")] + pub expired_at: Option, + #[serde(with = "time::serde::rfc3339::option")] + pub cancel_requested_at: Option, + #[serde(with = "time::serde::rfc3339::option")] + pub canceled_at: Option, + #[serde(with = "time::serde::rfc3339::option")] + pub failed_at: Option, + #[serde(with = "time::serde::rfc3339::option")] + pub replaced_at: Option, + pub replaced_by: Option, + pub replaces: Option, + pub asset_id: Uuid, + pub symbol: String, + pub asset_class: super::asset::Class, + pub notional: Option, + pub qty: Option, + pub filled_qty: f64, + pub filled_avg_price: Option, + pub order_class: Class, + #[serde(rename = "type")] + pub order_type: Type, + pub side: Side, + pub time_in_force: TimeInForce, + pub limit_price: Option, + pub stop_price: Option, + pub status: Status, + pub extended_hours: bool, + pub legs: Option>, + pub trail_percent: Option, + pub trail_price: Option, + pub hwm: Option, +} + +impl From for types::Order { + fn from(order: Order) -> Self { + Self { + id: order.id, + client_order_id: order.client_order_id, + time_submitted: order.submitted_at, + time_created: order.created_at, + time_updated: order.updated_at.unwrap_or(OffsetDateTime::UNIX_EPOCH), + time_filled: order.filled_at.unwrap_or(OffsetDateTime::UNIX_EPOCH), + time_expired: order.expired_at.unwrap_or(OffsetDateTime::UNIX_EPOCH), + time_cancel_requested: order + .cancel_requested_at + .unwrap_or(OffsetDateTime::UNIX_EPOCH), + time_canceled: order.canceled_at.unwrap_or(OffsetDateTime::UNIX_EPOCH), + time_failed: order.failed_at.unwrap_or(OffsetDateTime::UNIX_EPOCH), + time_replaced: order.replaced_at.unwrap_or(OffsetDateTime::UNIX_EPOCH), + replaced_by: order.replaced_by.unwrap_or_default(), + replaces: order.replaces.unwrap_or_default(), + symbol: order.symbol, + order_class: order.order_class.into(), + order_type: order.order_type.into(), + side: order.side.into(), + time_in_force: order.time_in_force.into(), + notional: order.notional.unwrap_or_default(), + qty: order.qty.unwrap_or_default(), + filled_qty: order.filled_qty, + filled_avg_price: order.filled_avg_price.unwrap_or_default(), + status: order.status.into(), + extended_hours: order.extended_hours, + limit_price: order.limit_price.unwrap_or_default(), + stop_price: order.stop_price.unwrap_or_default(), + trail_percent: order.trail_percent.unwrap_or_default(), + trail_price: order.trail_price.unwrap_or_default(), + hwm: order.hwm.unwrap_or_default(), + legs: order + .legs + .unwrap_or_default() + .into_iter() + .map(|order| order.id) + .collect(), + } + } +} + +impl Order { + pub fn normalize(self) -> Vec { + let mut orders = vec![self.clone().into()]; + + if let Some(legs) = self.legs { + for leg in legs { + orders.extend(leg.normalize()); + } + } + + orders + } +} diff --git a/src/types/alpaca/source.rs b/src/types/alpaca/shared/source.rs similarity index 95% rename from src/types/alpaca/source.rs rename to src/types/alpaca/shared/source.rs index 948bedc..f958d91 100644 --- a/src/types/alpaca/source.rs +++ b/src/types/alpaca/shared/source.rs @@ -5,7 +5,7 @@ use std::{ }; #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] +#[serde(rename_all = "snake_case")] pub enum Source { Iex, Sip, diff --git a/src/types/alpaca/websocket/outgoing/auth.rs b/src/types/alpaca/websocket/auth.rs similarity index 100% rename from src/types/alpaca/websocket/outgoing/auth.rs rename to src/types/alpaca/websocket/auth.rs diff --git a/src/types/alpaca/websocket/incoming/bar.rs b/src/types/alpaca/websocket/data/incoming/bar.rs similarity index 95% rename from src/types/alpaca/websocket/incoming/bar.rs rename to src/types/alpaca/websocket/data/incoming/bar.rs index 7f71ef6..31692d9 100644 --- a/src/types/alpaca/websocket/incoming/bar.rs +++ b/src/types/alpaca/websocket/data/incoming/bar.rs @@ -2,7 +2,7 @@ use crate::types; use serde::Deserialize; use time::OffsetDateTime; -#[derive(Clone, Debug, PartialEq, Deserialize)] +#[derive(Deserialize, Debug, PartialEq)] pub struct Message { #[serde(rename = "t")] #[serde(with = "time::serde::rfc3339")] diff --git a/src/types/alpaca/websocket/incoming/error.rs b/src/types/alpaca/websocket/data/incoming/error.rs similarity index 58% rename from src/types/alpaca/websocket/incoming/error.rs rename to src/types/alpaca/websocket/data/incoming/error.rs index 4b2ba29..3da19db 100644 --- a/src/types/alpaca/websocket/incoming/error.rs +++ b/src/types/alpaca/websocket/data/incoming/error.rs @@ -1,7 +1,6 @@ use serde::Deserialize; -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] -#[serde(rename_all = "camelCase")] +#[derive(Deserialize, Debug, PartialEq, Eq)] pub struct Message { pub code: u16, #[serde(rename = "msg")] diff --git a/src/types/alpaca/websocket/incoming/mod.rs b/src/types/alpaca/websocket/data/incoming/mod.rs similarity index 92% rename from src/types/alpaca/websocket/incoming/mod.rs rename to src/types/alpaca/websocket/data/incoming/mod.rs index 0ac40c9..26de877 100644 --- a/src/types/alpaca/websocket/incoming/mod.rs +++ b/src/types/alpaca/websocket/data/incoming/mod.rs @@ -7,7 +7,7 @@ pub mod success; use serde::Deserialize; -#[derive(Clone, Debug, PartialEq, Deserialize)] +#[derive(Deserialize, Debug, PartialEq)] #[serde(tag = "T")] pub enum Message { #[serde(rename = "success")] diff --git a/src/types/alpaca/websocket/incoming/news.rs b/src/types/alpaca/websocket/data/incoming/news.rs similarity index 96% rename from src/types/alpaca/websocket/incoming/news.rs rename to src/types/alpaca/websocket/data/incoming/news.rs index 1d3b009..9d3023a 100644 --- a/src/types/alpaca/websocket/incoming/news.rs +++ b/src/types/alpaca/websocket/data/incoming/news.rs @@ -5,7 +5,7 @@ use crate::{ use serde::Deserialize; use time::OffsetDateTime; -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize, Debug, PartialEq, Eq)] pub struct Message { pub id: i64, #[serde(with = "time::serde::rfc3339")] diff --git a/src/types/alpaca/websocket/incoming/status.rs b/src/types/alpaca/websocket/data/incoming/status.rs similarity index 95% rename from src/types/alpaca/websocket/incoming/status.rs rename to src/types/alpaca/websocket/data/incoming/status.rs index 2f7ec81..94ea21e 100644 --- a/src/types/alpaca/websocket/incoming/status.rs +++ b/src/types/alpaca/websocket/data/incoming/status.rs @@ -1,7 +1,7 @@ use serde::Deserialize; use time::OffsetDateTime; -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize, Debug, PartialEq, Eq)] pub enum Status { #[serde(rename = "2")] #[serde(alias = "H")] @@ -36,7 +36,7 @@ pub enum Status { VolatilityTradingPause, } -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize, Debug, PartialEq, Eq)] #[serde(tag = "rc", content = "rm")] pub enum Reason { #[serde(rename = "D")] @@ -125,7 +125,7 @@ pub enum Reason { MarketWideCircuitBreakerResumption, } -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize, Debug, PartialEq, Eq)] pub enum Tape { A, B, @@ -133,7 +133,7 @@ pub enum Tape { O, } -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize, Debug, PartialEq, Eq)] #[allow(clippy::struct_field_names)] pub struct Message { #[serde(rename = "t")] diff --git a/src/types/alpaca/websocket/incoming/subscription.rs b/src/types/alpaca/websocket/data/incoming/subscription.rs similarity index 78% rename from src/types/alpaca/websocket/incoming/subscription.rs rename to src/types/alpaca/websocket/data/incoming/subscription.rs index 2c88085..184e9ef 100644 --- a/src/types/alpaca/websocket/incoming/subscription.rs +++ b/src/types/alpaca/websocket/data/incoming/subscription.rs @@ -1,6 +1,6 @@ use serde::Deserialize; -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize, Debug, PartialEq, Eq)] #[serde(untagged)] pub enum Message { #[serde(rename_all = "camelCase")] @@ -15,6 +15,7 @@ pub enum Message { lulds: Option>, cancel_errors: Option>, }, - #[serde(rename_all = "camelCase")] - News { news: Vec }, + News { + news: Vec, + }, } diff --git a/src/types/alpaca/websocket/incoming/success.rs b/src/types/alpaca/websocket/data/incoming/success.rs similarity index 52% rename from src/types/alpaca/websocket/incoming/success.rs rename to src/types/alpaca/websocket/data/incoming/success.rs index be6592e..9e3df38 100644 --- a/src/types/alpaca/websocket/incoming/success.rs +++ b/src/types/alpaca/websocket/data/incoming/success.rs @@ -1,8 +1,8 @@ use serde::Deserialize; -#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize)] +#[derive(Deserialize, Debug, PartialEq, Eq)] #[serde(tag = "msg")] -#[serde(rename_all = "camelCase")] +#[serde(rename_all = "snake_case")] pub enum Message { Connected, Authenticated, diff --git a/src/utils/websocket.rs b/src/types/alpaca/websocket/data/mod.rs similarity index 67% rename from src/utils/websocket.rs rename to src/types/alpaca/websocket/data/mod.rs index 01c0b4d..62fd831 100644 --- a/src/utils/websocket.rs +++ b/src/types/alpaca/websocket/data/mod.rs @@ -1,3 +1,6 @@ +pub mod incoming; +pub mod outgoing; + use crate::{config::Config, types::alpaca::websocket}; use core::panic; use futures_util::{ @@ -16,18 +19,18 @@ pub async fn authenticate( ) { match stream.next().await.unwrap().unwrap() { Message::Text(data) - if from_str::>(&data) + if from_str::>(&data) .unwrap() .first() - == Some(&websocket::incoming::Message::Success( - websocket::incoming::success::Message::Connected, + == Some(&websocket::data::incoming::Message::Success( + websocket::data::incoming::success::Message::Connected, )) => {} _ => panic!("Failed to connect to Alpaca websocket."), } sink.send(Message::Text( - to_string(&websocket::outgoing::Message::Auth( - websocket::outgoing::auth::Message { + to_string(&websocket::data::outgoing::Message::Auth( + websocket::auth::Message { key: config.alpaca_api_key.clone(), secret: config.alpaca_api_secret.clone(), }, @@ -39,11 +42,11 @@ pub async fn authenticate( match stream.next().await.unwrap().unwrap() { Message::Text(data) - if from_str::>(&data) + if from_str::>(&data) .unwrap() .first() - == Some(&websocket::incoming::Message::Success( - websocket::incoming::success::Message::Authenticated, + == Some(&websocket::data::incoming::Message::Success( + websocket::data::incoming::success::Message::Authenticated, )) => {} _ => panic!("Failed to authenticate with Alpaca websocket."), }; diff --git a/src/types/alpaca/websocket/outgoing/mod.rs b/src/types/alpaca/websocket/data/outgoing/mod.rs similarity index 72% rename from src/types/alpaca/websocket/outgoing/mod.rs rename to src/types/alpaca/websocket/data/outgoing/mod.rs index bebf22b..4935a0a 100644 --- a/src/types/alpaca/websocket/outgoing/mod.rs +++ b/src/types/alpaca/websocket/data/outgoing/mod.rs @@ -1,11 +1,11 @@ -pub mod auth; pub mod subscribe; +use crate::types::alpaca::websocket::auth; use serde::Serialize; #[derive(Serialize)] #[serde(tag = "action")] -#[serde(rename_all = "camelCase")] +#[serde(rename_all = "snake_case")] pub enum Message { Auth(auth::Message), Subscribe(subscribe::Message), diff --git a/src/types/alpaca/websocket/outgoing/subscribe.rs b/src/types/alpaca/websocket/data/outgoing/subscribe.rs similarity index 96% rename from src/types/alpaca/websocket/outgoing/subscribe.rs rename to src/types/alpaca/websocket/data/outgoing/subscribe.rs index 30591b4..bd953aa 100644 --- a/src/types/alpaca/websocket/outgoing/subscribe.rs +++ b/src/types/alpaca/websocket/data/outgoing/subscribe.rs @@ -21,7 +21,6 @@ pub enum Market { #[serde(untagged)] pub enum Message { Market(Market), - #[serde(rename_all = "camelCase")] News { news: Vec, }, diff --git a/src/types/alpaca/websocket/mod.rs b/src/types/alpaca/websocket/mod.rs index 9aac270..e0a418d 100644 --- a/src/types/alpaca/websocket/mod.rs +++ b/src/types/alpaca/websocket/mod.rs @@ -1,2 +1,3 @@ -pub mod incoming; -pub mod outgoing; +pub mod auth; +pub mod data; +pub mod trading; diff --git a/src/types/alpaca/websocket/trading/incoming/auth.rs b/src/types/alpaca/websocket/trading/incoming/auth.rs new file mode 100644 index 0000000..9cf8482 --- /dev/null +++ b/src/types/alpaca/websocket/trading/incoming/auth.rs @@ -0,0 +1,22 @@ +use serde::Deserialize; + +#[derive(Deserialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum Status { + Authorized, + Unauthorized, +} + +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub enum Action { + #[serde(rename = "authenticate")] + Auth, + #[serde(rename = "listen")] + Subscribe, +} + +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct Message { + pub status: Status, + pub action: Action, +} diff --git a/src/types/alpaca/websocket/trading/incoming/mod.rs b/src/types/alpaca/websocket/trading/incoming/mod.rs new file mode 100644 index 0000000..2c53890 --- /dev/null +++ b/src/types/alpaca/websocket/trading/incoming/mod.rs @@ -0,0 +1,16 @@ +pub mod auth; +pub mod order; +pub mod subscription; + +use serde::Deserialize; + +#[derive(Deserialize, Debug, PartialEq)] +#[serde(tag = "stream", content = "data")] +pub enum Message { + #[serde(rename = "authorization")] + Auth(auth::Message), + #[serde(rename = "listening")] + Subscription(subscription::Message), + #[serde(rename = "trade_updates")] + Order(order::Message), +} diff --git a/src/types/alpaca/websocket/trading/incoming/order.rs b/src/types/alpaca/websocket/trading/incoming/order.rs new file mode 100644 index 0000000..f518d5c --- /dev/null +++ b/src/types/alpaca/websocket/trading/incoming/order.rs @@ -0,0 +1,86 @@ +use crate::types::alpaca::shared; +use serde::Deserialize; +use time::OffsetDateTime; +use uuid::Uuid; + +pub use shared::order::Order; + +#[derive(Deserialize, Debug, PartialEq)] +#[serde(rename_all = "snake_case")] +#[serde(tag = "event")] +pub enum Message { + New { + execution_id: Uuid, + order: Order, + }, + Fill { + execution_id: Uuid, + order: Order, + timestamp: OffsetDateTime, + position_qty: f64, + price: f64, + }, + PartialFill { + execution_id: Uuid, + order: Order, + timestamp: OffsetDateTime, + position_qty: f64, + price: f64, + }, + Canceled { + execution_id: Uuid, + order: Order, + timestamp: OffsetDateTime, + }, + Expired { + execution_id: Uuid, + order: Order, + timestamp: OffsetDateTime, + }, + DoneForDay { + execution_id: Uuid, + order: Order, + }, + Replaced { + execution_id: Uuid, + order: Order, + timestamp: OffsetDateTime, + }, + Rejected { + execution_id: Uuid, + order: Order, + timestamp: OffsetDateTime, + }, + PendingNew { + execution_id: Uuid, + order: Order, + }, + Stopped { + execution_id: Uuid, + order: Order, + }, + PendingCancel { + execution_id: Uuid, + order: Order, + }, + PendingReplace { + execution_id: Uuid, + order: Order, + }, + Calculated { + execution_id: Uuid, + order: Order, + }, + Suspended { + execution_id: Uuid, + order: Order, + }, + OrderReplaceRejected { + execution_id: Uuid, + order: Order, + }, + OrderCancelRejected { + execution_id: Uuid, + order: Order, + }, +} diff --git a/src/types/alpaca/websocket/trading/incoming/subscription.rs b/src/types/alpaca/websocket/trading/incoming/subscription.rs new file mode 100644 index 0000000..ee7c075 --- /dev/null +++ b/src/types/alpaca/websocket/trading/incoming/subscription.rs @@ -0,0 +1,6 @@ +use serde::Deserialize; + +#[derive(Deserialize, Debug, PartialEq, Eq)] +pub struct Message { + pub streams: Vec, +} diff --git a/src/types/alpaca/websocket/trading/mod.rs b/src/types/alpaca/websocket/trading/mod.rs new file mode 100644 index 0000000..bb18eb0 --- /dev/null +++ b/src/types/alpaca/websocket/trading/mod.rs @@ -0,0 +1,51 @@ +pub mod incoming; +pub mod outgoing; + +use crate::{config::Config, types::alpaca::websocket}; +use core::panic; +use futures_util::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use serde_json::{from_str, to_string}; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +pub async fn authenticate( + config: &Arc, + sink: &mut SplitSink>, Message>, + stream: &mut SplitStream>>, +) { + sink.send(Message::Text( + to_string(&websocket::trading::outgoing::Message::Auth( + websocket::auth::Message { + key: config.alpaca_api_key.clone(), + secret: config.alpaca_api_secret.clone(), + }, + )) + .unwrap(), + )) + .await + .unwrap(); + + match stream.next().await.unwrap().unwrap() { + Message::Binary(data) => { + let data = String::from_utf8(data).unwrap(); + + if from_str::>(&data) + .unwrap() + .first() + != Some(&websocket::trading::incoming::Message::Auth( + websocket::trading::incoming::auth::Message { + status: websocket::trading::incoming::auth::Status::Authorized, + action: websocket::trading::incoming::auth::Action::Auth, + }, + )) + { + panic!("Failed to authenticate with Alpaca websocket."); + } + } + _ => panic!("Failed to authenticate with Alpaca websocket."), + }; +} diff --git a/src/types/alpaca/websocket/trading/outgoing/mod.rs b/src/types/alpaca/websocket/trading/outgoing/mod.rs new file mode 100644 index 0000000..4b0475f --- /dev/null +++ b/src/types/alpaca/websocket/trading/outgoing/mod.rs @@ -0,0 +1,15 @@ +pub mod subscribe; + +use crate::types::alpaca::websocket::auth; +use serde::Serialize; + +#[derive(Serialize)] +#[serde(tag = "action")] +#[serde(rename_all = "snake_case")] +pub enum Message { + Auth(auth::Message), + #[serde(rename = "listen")] + Subscribe { + data: subscribe::Message, + }, +} diff --git a/src/types/alpaca/websocket/trading/outgoing/subscribe.rs b/src/types/alpaca/websocket/trading/outgoing/subscribe.rs new file mode 100644 index 0000000..46f3922 --- /dev/null +++ b/src/types/alpaca/websocket/trading/outgoing/subscribe.rs @@ -0,0 +1,6 @@ +use serde::Serialize; + +#[derive(Serialize)] +pub struct Message { + streams: Vec, +} diff --git a/src/types/mod.rs b/src/types/mod.rs index 5c4c806..9791365 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -3,8 +3,10 @@ pub mod asset; pub mod backfill; pub mod bar; pub mod news; +pub mod order; pub use asset::{Asset, Class, Exchange}; pub use backfill::Backfill; pub use bar::Bar; pub use news::News; +pub use order::Order; diff --git a/src/types/order.rs b/src/types/order.rs new file mode 100644 index 0000000..016395d --- /dev/null +++ b/src/types/order.rs @@ -0,0 +1,107 @@ +use clickhouse::Row; +use serde::{Deserialize, Serialize}; +use serde_repr::{Deserialize_repr, Serialize_repr}; +use time::OffsetDateTime; +use uuid::Uuid; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr)] +#[repr(i8)] +pub enum Class { + Simple = 1, + Bracket = 2, + Oco = 3, + Oto = 4, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr)] +#[repr(i8)] +pub enum Type { + Market = 1, + Limit = 2, + Stop = 3, + StopLimit = 4, + TrailingStop = 5, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr)] +#[repr(i8)] +pub enum Side { + Buy = 1, + Sell = -1, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr)] +#[repr(i8)] +pub enum TimeInForce { + Day = 1, + Gtc = 2, + Opg = 3, + Cls = 4, + Ioc = 5, + Fok = 6, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr)] +#[repr(i8)] +pub enum Status { + New = 1, + PartiallyFilled = 2, + Filled = 3, + DoneForDay = 4, + Canceled = 5, + Expired = 6, + Replaced = 7, + PendingCancel = 8, + PendingReplace = 9, + Accepted = 10, + PendingNew = 11, + AcceptedForBidding = 12, + Stopped = 13, + Rejected = 14, + Suspended = 15, + Calculated = 16, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Row)] +#[allow(clippy::struct_field_names)] +pub struct Order { + pub id: Uuid, + pub client_order_id: Uuid, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_submitted: OffsetDateTime, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_created: OffsetDateTime, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_updated: OffsetDateTime, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_filled: OffsetDateTime, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_expired: OffsetDateTime, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_cancel_requested: OffsetDateTime, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_canceled: OffsetDateTime, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_failed: OffsetDateTime, + #[serde(with = "clickhouse::serde::time::datetime")] + pub time_replaced: OffsetDateTime, + pub replaced_by: Uuid, + pub replaces: Uuid, + pub symbol: String, + pub order_class: Class, + pub order_type: Type, + pub side: Side, + pub time_in_force: TimeInForce, + pub extended_hours: bool, + pub notional: f64, + pub qty: f64, + pub filled_qty: f64, + pub filled_avg_price: f64, + pub status: Status, + pub limit_price: f64, + pub stop_price: f64, + pub trail_percent: f64, + pub trail_price: f64, + pub hwm: f64, + pub legs: Vec, +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 78a3b3e..f0bb3f3 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -3,9 +3,7 @@ pub mod cleanup; pub mod macros; pub mod news; pub mod time; -pub mod websocket; pub use cleanup::cleanup; pub use news::{add_slash_to_pair, normalize_news_content, remove_slash_from_pair}; pub use time::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE, ONE_SECOND}; -pub use websocket::authenticate; diff --git a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql index bebb309..dfb8050 100644 --- a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql +++ b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql @@ -64,3 +64,57 @@ CREATE TABLE IF NOT EXISTS qrust.backfills_news ( ) ENGINE = ReplacingMergeTree() PRIMARY KEY symbol; + +CREATE TABLE IF NOT EXISTS qrust.orders ( + id UUID, + client_order_id UUID, + time_submitted DateTime, + time_created DateTime, + time_updated DateTime, + time_filled DateTime, + time_expired DateTime, + time_cancel_requested DateTime, + time_canceled DateTime, + time_failed DateTime, + time_replaced DateTime, + replaced_by UUID, + replaces UUID, + symbol LowCardinality(String), + order_class Enum('simple' = 1, 'bracket' = 2, 'oco' = 3, 'oto' = 4), + order_type Enum('market' = 1, 'limit' = 2, 'stop' = 3, 'stop_limit' = 4, 'trailing_stop' = 5), + side Enum('buy' = 1, 'sell' = -1), + time_in_force Enum('day' = 1, 'gtc' = 2, 'opg' = 3, 'cls' = 4, 'ioc' = 5, 'fok' = 6), + extended_hours Boolean, + notional Float64, + qty Float64, + filled_qty Float64, + filled_avg_price Float64, + status Enum( + 'new' = 1, + 'partially_filled' = 2, + 'filled' = 3, + 'done_for_day' = 4, + 'canceled' = 5, + 'expired' = 6, + 'replaced' = 7, + 'pending_cancel' = 8, + 'pending_replace' = 9, + 'accepted' = 10, + 'pending_new' = 11, + 'accepted_for_bidding' = 12, + 'stopped' = 13, + 'rejected' = 14, + 'suspended' = 15, + 'calculated' = 16 + ), + limit_price Float64, + stop_price Float64, + trail_percent Float64, + trail_price Float64, + hwm Float64, + legs Array(UUID) +) +ENGINE = ReplacingMergeTree() +PARTITION BY toYYYYMM(time_submitted) +PRIMARY KEY id; +