diff --git a/src/config.rs b/src/config.rs index a19c4ac..09d3c24 100644 --- a/src/config.rs +++ b/src/config.rs @@ -43,7 +43,7 @@ impl Config { let alpaca_source: Source = env::var("ALPACA_SOURCE") .expect("ALPACA_SOURCE must be set.") .parse() - .expect("ALPACA_SOURCE must be a either 'iex' or 'sip'."); + .expect("ALPACA_SOURCE must be 'iex', 'sip', or 'otc'."); let clickhouse_url = env::var("CLICKHOUSE_URL").expect("CLICKHOUSE_URL must be set."); let clickhouse_user = env::var("CLICKHOUSE_USER").expect("CLICKHOUSE_USER must be set."); @@ -75,6 +75,7 @@ impl Config { alpaca_rate_limit: RateLimiter::direct(Quota::per_minute(match alpaca_source { Source::Iex => unsafe { NonZeroU32::new_unchecked(200) }, Source::Sip => unsafe { NonZeroU32::new_unchecked(10000) }, + Source::Otc => unimplemented!("OTC rate limit not implemented."), })), alpaca_source, clickhouse_client: clickhouse::Client::default() diff --git a/src/threads/data/asset_status.rs b/src/threads/data/asset_status.rs index b93360a..959c743 100644 --- a/src/threads/data/asset_status.rs +++ b/src/threads/data/asset_status.rs @@ -108,7 +108,14 @@ async fn handle_asset_status_message( .await .send(tungstenite::Message::Text( to_string(&websocket::outgoing::Message::Subscribe( - create_websocket_market_message(thread_type, symbols), + match thread_type { + ThreadType::Bars(_) => { + websocket::outgoing::subscribe::Message::new_market(symbols) + } + ThreadType::News => { + websocket::outgoing::subscribe::Message::new_news(symbols) + } + }, )) .unwrap(), )) @@ -140,7 +147,14 @@ async fn handle_asset_status_message( .await .send(tungstenite::Message::Text( to_string(&websocket::outgoing::Message::Unsubscribe( - create_websocket_market_message(thread_type, symbols), + match thread_type { + ThreadType::Bars(_) => { + websocket::outgoing::subscribe::Message::new_market(symbols) + } + ThreadType::News => { + websocket::outgoing::subscribe::Message::new_news(symbols) + } + }, )) .unwrap(), )) @@ -154,17 +168,3 @@ async fn handle_asset_status_message( message.response.send(()).unwrap(); } - -fn create_websocket_market_message( - thread_type: ThreadType, - symbols: Vec, -) -> websocket::outgoing::subscribe::Message { - match thread_type { - ThreadType::Bars(_) => websocket::outgoing::subscribe::Message::Market( - websocket::outgoing::subscribe::MarketMessage::new(symbols), - ), - ThreadType::News => websocket::outgoing::subscribe::Message::News( - websocket::outgoing::subscribe::NewsMessage::new(symbols), - ), - } -} diff --git a/src/threads/data/backfill.rs b/src/threads/data/backfill.rs index d02ceac..0faf277 100644 --- a/src/threads/data/backfill.rs +++ b/src/threads/data/backfill.rs @@ -3,11 +3,14 @@ use crate::{ config::{Config, ALPACA_CRYPTO_DATA_URL, ALPACA_NEWS_DATA_URL, ALPACA_STOCK_DATA_URL}, database, types::{ - alpaca::{api, Source}, + alpaca::{ + api::{self, outgoing::Sort}, + Source, + }, news::Prediction, Asset, Bar, Class, News, Subset, }, - utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}, + utils::{duration_until, last_minute, remove_slash_from_pair, FIFTEEN_MINUTES, ONE_MINUTE}, }; use backoff::{future::retry, ExponentialBackoff}; use futures_util::future::join_all; @@ -251,14 +254,31 @@ async fn execute_backfill_bars( app_config .alpaca_client .get(&data_url) - .query(&api::outgoing::bar::Bar::new( - vec![symbol.clone()], - ONE_MINUTE, - fetch_from, - fetch_to, - 10000, - next_page_token.clone(), - )) + .query(&match thread_type { + ThreadType::Bars(Class::UsEquity) => api::outgoing::bar::Bar::UsEquity { + symbols: vec![symbol.clone()], + timeframe: ONE_MINUTE, + start: Some(fetch_from), + end: Some(fetch_to), + limit: Some(10000), + adjustment: None, + asof: None, + feed: Some(app_config.alpaca_source), + currency: None, + page_token: next_page_token.clone(), + sort: Some(Sort::Asc), + }, + ThreadType::Bars(Class::Crypto) => api::outgoing::bar::Bar::Crypto { + symbols: vec![symbol.clone()], + timeframe: ONE_MINUTE, + start: Some(fetch_from), + end: Some(fetch_to), + limit: Some(10000), + page_token: next_page_token.clone(), + sort: Some(Sort::Asc), + }, + _ => unreachable!(), + }) .send() .await? .error_for_status()? @@ -325,15 +345,16 @@ async fn execute_backfill_news( app_config .alpaca_client .get(&data_url) - .query(&api::outgoing::news::News::new( - vec![symbol.clone()], - fetch_from, - fetch_to, - 50, - true, - false, - next_page_token.clone(), - )) + .query(&api::outgoing::news::News { + symbols: vec![remove_slash_from_pair(&symbol)], + start: Some(fetch_from), + end: Some(fetch_to), + limit: Some(50), + include_content: Some(true), + exclude_contentless: Some(false), + page_token: next_page_token.clone(), + sort: Some(Sort::Asc), + }) .send() .await? .error_for_status()? @@ -399,7 +420,7 @@ async fn execute_backfill_news( }) .collect::>(); - let backfill = (news[0].clone(), symbol.clone()).into(); + let backfill = (news.last().unwrap().clone(), symbol.clone()).into(); database::news::upsert_batch(&app_config.clickhouse_client, news).await; database::backfills::upsert(&app_config.clickhouse_client, &thread_type, &backfill).await; diff --git a/src/threads/data/websocket.rs b/src/threads/data/websocket.rs index 806ce64..e5b6ba5 100644 --- a/src/threads/data/websocket.rs +++ b/src/threads/data/websocket.rs @@ -102,9 +102,8 @@ async fn handle_parsed_websocket_message( match message { websocket::incoming::Message::Subscription(message) => { let symbols = match message { - websocket::incoming::subscription::Message::Market(message) => message.bars, - websocket::incoming::subscription::Message::News(message) => message - .news + websocket::incoming::subscription::Message::Market { bars, .. } => bars, + websocket::incoming::subscription::Message::News { news } => news .into_iter() .map(|symbol| add_slash_to_pair(&symbol)) .collect(), diff --git a/src/types/alpaca/api/outgoing/bar.rs b/src/types/alpaca/api/outgoing/bar.rs index 48b758f..9e1bce9 100644 --- a/src/types/alpaca/api/outgoing/bar.rs +++ b/src/types/alpaca/api/outgoing/bar.rs @@ -1,4 +1,5 @@ -use super::serialize_symbols; +use super::{serialize_symbols, Sort}; +use crate::types::alpaca::Source; use serde::Serialize; use std::time::Duration; use time::OffsetDateTime; @@ -36,36 +37,60 @@ where } #[derive(Serialize)] -pub struct Bar { - #[serde(serialize_with = "serialize_symbols")] - pub symbols: Vec, - #[serde(serialize_with = "serialize_timeframe")] - pub timeframe: Duration, - #[serde(with = "time::serde::rfc3339")] - pub start: OffsetDateTime, - #[serde(with = "time::serde::rfc3339")] - pub end: OffsetDateTime, - pub limit: i64, - #[serde(skip_serializing_if = "Option::is_none")] - pub page_token: Option, +#[allow(dead_code)] +pub enum Adjustment { + Raw, + Split, + Dividend, + All, } -impl Bar { - pub const fn new( +#[derive(Serialize)] +#[serde(untagged)] +pub enum Bar { + UsEquity { + #[serde(serialize_with = "serialize_symbols")] symbols: Vec, + #[serde(serialize_with = "serialize_timeframe")] timeframe: Duration, - start: OffsetDateTime, - end: OffsetDateTime, - limit: i64, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "time::serde::rfc3339::option")] + start: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "time::serde::rfc3339::option")] + end: Option, + #[serde(skip_serializing_if = "Option::is_none")] + limit: Option, + #[serde(skip_serializing_if = "Option::is_none")] + adjustment: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "time::serde::rfc3339::option")] + asof: Option, + #[serde(skip_serializing_if = "Option::is_none")] + feed: Option, + #[serde(skip_serializing_if = "Option::is_none")] + currency: Option, + #[serde(skip_serializing_if = "Option::is_none")] page_token: Option, - ) -> Self { - Self { - symbols, - timeframe, - start, - end, - limit, - page_token, - } - } + #[serde(skip_serializing_if = "Option::is_none")] + sort: Option, + }, + Crypto { + #[serde(serialize_with = "serialize_symbols")] + symbols: Vec, + #[serde(serialize_with = "serialize_timeframe")] + timeframe: Duration, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "time::serde::rfc3339::option")] + start: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "time::serde::rfc3339::option")] + end: Option, + #[serde(skip_serializing_if = "Option::is_none")] + limit: Option, + #[serde(skip_serializing_if = "Option::is_none")] + page_token: Option, + #[serde(skip_serializing_if = "Option::is_none")] + sort: Option, + }, } diff --git a/src/types/alpaca/api/outgoing/mod.rs b/src/types/alpaca/api/outgoing/mod.rs index 6293fb8..f26e908 100644 --- a/src/types/alpaca/api/outgoing/mod.rs +++ b/src/types/alpaca/api/outgoing/mod.rs @@ -1,7 +1,15 @@ pub mod bar; pub mod news; -use serde::Serializer; +use serde::{Serialize, Serializer}; + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +#[allow(dead_code)] +pub enum Sort { + Asc, + Desc, +} fn serialize_symbols(symbols: &[String], serializer: S) -> Result where diff --git a/src/types/alpaca/api/outgoing/news.rs b/src/types/alpaca/api/outgoing/news.rs index 5a3c2ee..2d57614 100644 --- a/src/types/alpaca/api/outgoing/news.rs +++ b/src/types/alpaca/api/outgoing/news.rs @@ -1,5 +1,4 @@ -use super::serialize_symbols; -use crate::utils::remove_slash_from_pair; +use super::{serialize_symbols, Sort}; use serde::Serialize; use time::OffsetDateTime; @@ -7,38 +6,20 @@ use time::OffsetDateTime; pub struct News { #[serde(serialize_with = "serialize_symbols")] pub symbols: Vec, - #[serde(with = "time::serde::rfc3339")] - pub start: OffsetDateTime, - #[serde(with = "time::serde::rfc3339")] - pub end: OffsetDateTime, - pub limit: i64, - pub include_content: bool, - pub exclude_contentless: bool, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "time::serde::rfc3339::option")] + pub start: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "time::serde::rfc3339::option")] + pub end: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub limit: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub include_content: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub exclude_contentless: Option, #[serde(skip_serializing_if = "Option::is_none")] pub page_token: Option, -} - -impl News { - pub fn new( - symbols: Vec, - start: OffsetDateTime, - end: OffsetDateTime, - limit: i64, - include_content: bool, - exclude_contentless: bool, - page_token: Option, - ) -> Self { - Self { - symbols: symbols - .into_iter() - .map(|symbol| remove_slash_from_pair(&symbol)) - .collect(), - start, - end, - limit, - include_content, - exclude_contentless, - page_token, - } - } + #[serde(skip_serializing_if = "Option::is_none")] + pub sort: Option, } diff --git a/src/types/alpaca/source.rs b/src/types/alpaca/source.rs index f734d3e..948bedc 100644 --- a/src/types/alpaca/source.rs +++ b/src/types/alpaca/source.rs @@ -1,12 +1,15 @@ +use serde::{Deserialize, Serialize}; use std::{ fmt::{Display, Formatter}, str::FromStr, }; -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub enum Source { Iex, Sip, + Otc, } impl FromStr for Source { @@ -26,6 +29,7 @@ impl Display for Source { match self { Self::Iex => write!(f, "iex"), Self::Sip => write!(f, "sip"), + Self::Otc => write!(f, "otc"), } } } diff --git a/src/types/alpaca/websocket/incoming/subscription.rs b/src/types/alpaca/websocket/incoming/subscription.rs index 7e44d02..2c88085 100644 --- a/src/types/alpaca/websocket/incoming/subscription.rs +++ b/src/types/alpaca/websocket/incoming/subscription.rs @@ -1,28 +1,20 @@ use serde::Deserialize; -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct MarketMessage { - pub trades: Vec, - pub quotes: Vec, - pub bars: Vec, - pub updated_bars: Vec, - pub daily_bars: Vec, - pub orderbooks: Option>, - pub statuses: Option>, - pub lulds: Option>, - pub cancel_errors: Option>, -} - -#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct NewsMessage { - pub news: Vec, -} - #[derive(Clone, Debug, PartialEq, Eq, Deserialize)] #[serde(untagged)] pub enum Message { - Market(MarketMessage), - News(NewsMessage), + #[serde(rename_all = "camelCase")] + Market { + trades: Vec, + quotes: Vec, + bars: Vec, + updated_bars: Vec, + daily_bars: Vec, + orderbooks: Option>, + statuses: Option>, + lulds: Option>, + cancel_errors: Option>, + }, + #[serde(rename_all = "camelCase")] + News { news: Vec }, } diff --git a/src/types/alpaca/websocket/outgoing/auth.rs b/src/types/alpaca/websocket/outgoing/auth.rs index a8933e5..94b10fb 100644 --- a/src/types/alpaca/websocket/outgoing/auth.rs +++ b/src/types/alpaca/websocket/outgoing/auth.rs @@ -2,12 +2,6 @@ use serde::Serialize; #[derive(Serialize)] pub struct Message { - key: String, - secret: String, -} - -impl Message { - pub const fn new(key: String, secret: String) -> Self { - Self { key, secret } - } + pub key: String, + pub secret: String, } diff --git a/src/types/alpaca/websocket/outgoing/subscribe.rs b/src/types/alpaca/websocket/outgoing/subscribe.rs index 465ab62..ef78c28 100644 --- a/src/types/alpaca/websocket/outgoing/subscribe.rs +++ b/src/types/alpaca/websocket/outgoing/subscribe.rs @@ -2,30 +2,27 @@ use crate::utils::remove_slash_from_pair; use serde::Serialize; #[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct MarketMessage { - bars: Vec, - updated_bars: Vec, +#[serde(untagged)] +pub enum Message { + #[serde(rename_all = "camelCase")] + Market { + bars: Vec, + updated_bars: Vec, + }, + #[serde(rename_all = "camelCase")] + News { news: Vec }, } -impl MarketMessage { - pub fn new(symbols: Vec) -> Self { - Self { +impl Message { + pub fn new_market(symbols: Vec) -> Self { + Self::Market { bars: symbols.clone(), updated_bars: symbols, } } -} -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct NewsMessage { - news: Vec, -} - -impl NewsMessage { - pub fn new(symbols: Vec) -> Self { - Self { + pub fn new_news(symbols: Vec) -> Self { + Self::News { news: symbols .into_iter() .map(|symbol| remove_slash_from_pair(&symbol)) @@ -33,10 +30,3 @@ impl NewsMessage { } } } - -#[derive(Serialize)] -#[serde(untagged)] -pub enum Message { - Market(MarketMessage), - News(NewsMessage), -} diff --git a/src/types/backfill.rs b/src/types/backfill.rs index e940d76..92ad587 100644 --- a/src/types/backfill.rs +++ b/src/types/backfill.rs @@ -10,20 +10,20 @@ pub struct Backfill { pub time: OffsetDateTime, } -impl Backfill { - pub const fn new(symbol: String, time: OffsetDateTime) -> Self { - Self { symbol, time } - } -} - impl From for Backfill { fn from(bar: Bar) -> Self { - Self::new(bar.symbol, bar.time) + Self { + symbol: bar.symbol, + time: bar.time, + } } } impl From<(News, String)> for Backfill { fn from((news, symbol): (News, String)) -> Self { - Self::new(symbol, news.time_created) + Self { + symbol, + time: news.time_created, + } } } diff --git a/src/utils/websocket.rs b/src/utils/websocket.rs index 5aef8ee..6983bff 100644 --- a/src/utils/websocket.rs +++ b/src/utils/websocket.rs @@ -28,10 +28,10 @@ pub async fn authenticate( sender .send(Message::Text( to_string(&websocket::outgoing::Message::Auth( - websocket::outgoing::auth::Message::new( - app_config.alpaca_api_key.clone(), - app_config.alpaca_api_secret.clone(), - ), + websocket::outgoing::auth::Message { + key: app_config.alpaca_api_key.clone(), + secret: app_config.alpaca_api_secret.clone(), + }, )) .unwrap(), ))