Improve outgoing Alpaca API types

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-02-05 00:30:11 +00:00
parent 61c573cbc7
commit caaa31133a
13 changed files with 185 additions and 170 deletions

View File

@@ -43,7 +43,7 @@ impl Config {
let alpaca_source: Source = env::var("ALPACA_SOURCE") let alpaca_source: Source = env::var("ALPACA_SOURCE")
.expect("ALPACA_SOURCE must be set.") .expect("ALPACA_SOURCE must be set.")
.parse() .parse()
.expect("ALPACA_SOURCE must be a either 'iex' or 'sip'."); .expect("ALPACA_SOURCE must be 'iex', 'sip', or 'otc'.");
let clickhouse_url = env::var("CLICKHOUSE_URL").expect("CLICKHOUSE_URL must be set."); let clickhouse_url = env::var("CLICKHOUSE_URL").expect("CLICKHOUSE_URL must be set.");
let clickhouse_user = env::var("CLICKHOUSE_USER").expect("CLICKHOUSE_USER must be set."); let clickhouse_user = env::var("CLICKHOUSE_USER").expect("CLICKHOUSE_USER must be set.");
@@ -75,6 +75,7 @@ impl Config {
alpaca_rate_limit: RateLimiter::direct(Quota::per_minute(match alpaca_source { alpaca_rate_limit: RateLimiter::direct(Quota::per_minute(match alpaca_source {
Source::Iex => unsafe { NonZeroU32::new_unchecked(200) }, Source::Iex => unsafe { NonZeroU32::new_unchecked(200) },
Source::Sip => unsafe { NonZeroU32::new_unchecked(10000) }, Source::Sip => unsafe { NonZeroU32::new_unchecked(10000) },
Source::Otc => unimplemented!("OTC rate limit not implemented."),
})), })),
alpaca_source, alpaca_source,
clickhouse_client: clickhouse::Client::default() clickhouse_client: clickhouse::Client::default()

View File

@@ -108,7 +108,14 @@ async fn handle_asset_status_message(
.await .await
.send(tungstenite::Message::Text( .send(tungstenite::Message::Text(
to_string(&websocket::outgoing::Message::Subscribe( to_string(&websocket::outgoing::Message::Subscribe(
create_websocket_market_message(thread_type, symbols), match thread_type {
ThreadType::Bars(_) => {
websocket::outgoing::subscribe::Message::new_market(symbols)
}
ThreadType::News => {
websocket::outgoing::subscribe::Message::new_news(symbols)
}
},
)) ))
.unwrap(), .unwrap(),
)) ))
@@ -140,7 +147,14 @@ async fn handle_asset_status_message(
.await .await
.send(tungstenite::Message::Text( .send(tungstenite::Message::Text(
to_string(&websocket::outgoing::Message::Unsubscribe( to_string(&websocket::outgoing::Message::Unsubscribe(
create_websocket_market_message(thread_type, symbols), match thread_type {
ThreadType::Bars(_) => {
websocket::outgoing::subscribe::Message::new_market(symbols)
}
ThreadType::News => {
websocket::outgoing::subscribe::Message::new_news(symbols)
}
},
)) ))
.unwrap(), .unwrap(),
)) ))
@@ -154,17 +168,3 @@ async fn handle_asset_status_message(
message.response.send(()).unwrap(); message.response.send(()).unwrap();
} }
fn create_websocket_market_message(
thread_type: ThreadType,
symbols: Vec<String>,
) -> websocket::outgoing::subscribe::Message {
match thread_type {
ThreadType::Bars(_) => websocket::outgoing::subscribe::Message::Market(
websocket::outgoing::subscribe::MarketMessage::new(symbols),
),
ThreadType::News => websocket::outgoing::subscribe::Message::News(
websocket::outgoing::subscribe::NewsMessage::new(symbols),
),
}
}

View File

@@ -3,11 +3,14 @@ use crate::{
config::{Config, ALPACA_CRYPTO_DATA_URL, ALPACA_NEWS_DATA_URL, ALPACA_STOCK_DATA_URL}, config::{Config, ALPACA_CRYPTO_DATA_URL, ALPACA_NEWS_DATA_URL, ALPACA_STOCK_DATA_URL},
database, database,
types::{ types::{
alpaca::{api, Source}, alpaca::{
api::{self, outgoing::Sort},
Source,
},
news::Prediction, news::Prediction,
Asset, Bar, Class, News, Subset, Asset, Bar, Class, News, Subset,
}, },
utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}, utils::{duration_until, last_minute, remove_slash_from_pair, FIFTEEN_MINUTES, ONE_MINUTE},
}; };
use backoff::{future::retry, ExponentialBackoff}; use backoff::{future::retry, ExponentialBackoff};
use futures_util::future::join_all; use futures_util::future::join_all;
@@ -251,14 +254,31 @@ async fn execute_backfill_bars(
app_config app_config
.alpaca_client .alpaca_client
.get(&data_url) .get(&data_url)
.query(&api::outgoing::bar::Bar::new( .query(&match thread_type {
vec![symbol.clone()], ThreadType::Bars(Class::UsEquity) => api::outgoing::bar::Bar::UsEquity {
ONE_MINUTE, symbols: vec![symbol.clone()],
fetch_from, timeframe: ONE_MINUTE,
fetch_to, start: Some(fetch_from),
10000, end: Some(fetch_to),
next_page_token.clone(), limit: Some(10000),
)) adjustment: None,
asof: None,
feed: Some(app_config.alpaca_source),
currency: None,
page_token: next_page_token.clone(),
sort: Some(Sort::Asc),
},
ThreadType::Bars(Class::Crypto) => api::outgoing::bar::Bar::Crypto {
symbols: vec![symbol.clone()],
timeframe: ONE_MINUTE,
start: Some(fetch_from),
end: Some(fetch_to),
limit: Some(10000),
page_token: next_page_token.clone(),
sort: Some(Sort::Asc),
},
_ => unreachable!(),
})
.send() .send()
.await? .await?
.error_for_status()? .error_for_status()?
@@ -325,15 +345,16 @@ async fn execute_backfill_news(
app_config app_config
.alpaca_client .alpaca_client
.get(&data_url) .get(&data_url)
.query(&api::outgoing::news::News::new( .query(&api::outgoing::news::News {
vec![symbol.clone()], symbols: vec![remove_slash_from_pair(&symbol)],
fetch_from, start: Some(fetch_from),
fetch_to, end: Some(fetch_to),
50, limit: Some(50),
true, include_content: Some(true),
false, exclude_contentless: Some(false),
next_page_token.clone(), page_token: next_page_token.clone(),
)) sort: Some(Sort::Asc),
})
.send() .send()
.await? .await?
.error_for_status()? .error_for_status()?
@@ -399,7 +420,7 @@ async fn execute_backfill_news(
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let backfill = (news[0].clone(), symbol.clone()).into(); let backfill = (news.last().unwrap().clone(), symbol.clone()).into();
database::news::upsert_batch(&app_config.clickhouse_client, news).await; database::news::upsert_batch(&app_config.clickhouse_client, news).await;
database::backfills::upsert(&app_config.clickhouse_client, &thread_type, &backfill).await; database::backfills::upsert(&app_config.clickhouse_client, &thread_type, &backfill).await;

View File

@@ -102,9 +102,8 @@ async fn handle_parsed_websocket_message(
match message { match message {
websocket::incoming::Message::Subscription(message) => { websocket::incoming::Message::Subscription(message) => {
let symbols = match message { let symbols = match message {
websocket::incoming::subscription::Message::Market(message) => message.bars, websocket::incoming::subscription::Message::Market { bars, .. } => bars,
websocket::incoming::subscription::Message::News(message) => message websocket::incoming::subscription::Message::News { news } => news
.news
.into_iter() .into_iter()
.map(|symbol| add_slash_to_pair(&symbol)) .map(|symbol| add_slash_to_pair(&symbol))
.collect(), .collect(),

View File

@@ -1,4 +1,5 @@
use super::serialize_symbols; use super::{serialize_symbols, Sort};
use crate::types::alpaca::Source;
use serde::Serialize; use serde::Serialize;
use std::time::Duration; use std::time::Duration;
use time::OffsetDateTime; use time::OffsetDateTime;
@@ -36,36 +37,60 @@ where
} }
#[derive(Serialize)] #[derive(Serialize)]
pub struct Bar { #[allow(dead_code)]
#[serde(serialize_with = "serialize_symbols")] pub enum Adjustment {
pub symbols: Vec<String>, Raw,
#[serde(serialize_with = "serialize_timeframe")] Split,
pub timeframe: Duration, Dividend,
#[serde(with = "time::serde::rfc3339")] All,
pub start: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
pub end: OffsetDateTime,
pub limit: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub page_token: Option<String>,
} }
impl Bar { #[derive(Serialize)]
pub const fn new( #[serde(untagged)]
pub enum Bar {
UsEquity {
#[serde(serialize_with = "serialize_symbols")]
symbols: Vec<String>, symbols: Vec<String>,
#[serde(serialize_with = "serialize_timeframe")]
timeframe: Duration, timeframe: Duration,
start: OffsetDateTime, #[serde(skip_serializing_if = "Option::is_none")]
end: OffsetDateTime, #[serde(with = "time::serde::rfc3339::option")]
limit: i64, start: Option<OffsetDateTime>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "time::serde::rfc3339::option")]
end: Option<OffsetDateTime>,
#[serde(skip_serializing_if = "Option::is_none")]
limit: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
adjustment: Option<Adjustment>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "time::serde::rfc3339::option")]
asof: Option<OffsetDateTime>,
#[serde(skip_serializing_if = "Option::is_none")]
feed: Option<Source>,
#[serde(skip_serializing_if = "Option::is_none")]
currency: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
page_token: Option<String>, page_token: Option<String>,
) -> Self { #[serde(skip_serializing_if = "Option::is_none")]
Self { sort: Option<Sort>,
symbols, },
timeframe, Crypto {
start, #[serde(serialize_with = "serialize_symbols")]
end, symbols: Vec<String>,
limit, #[serde(serialize_with = "serialize_timeframe")]
page_token, timeframe: Duration,
} #[serde(skip_serializing_if = "Option::is_none")]
} #[serde(with = "time::serde::rfc3339::option")]
start: Option<OffsetDateTime>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "time::serde::rfc3339::option")]
end: Option<OffsetDateTime>,
#[serde(skip_serializing_if = "Option::is_none")]
limit: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
page_token: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
sort: Option<Sort>,
},
} }

View File

@@ -1,7 +1,15 @@
pub mod bar; pub mod bar;
pub mod news; pub mod news;
use serde::Serializer; use serde::{Serialize, Serializer};
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
#[allow(dead_code)]
pub enum Sort {
Asc,
Desc,
}
fn serialize_symbols<S>(symbols: &[String], serializer: S) -> Result<S::Ok, S::Error> fn serialize_symbols<S>(symbols: &[String], serializer: S) -> Result<S::Ok, S::Error>
where where

View File

@@ -1,5 +1,4 @@
use super::serialize_symbols; use super::{serialize_symbols, Sort};
use crate::utils::remove_slash_from_pair;
use serde::Serialize; use serde::Serialize;
use time::OffsetDateTime; use time::OffsetDateTime;
@@ -7,38 +6,20 @@ use time::OffsetDateTime;
pub struct News { pub struct News {
#[serde(serialize_with = "serialize_symbols")] #[serde(serialize_with = "serialize_symbols")]
pub symbols: Vec<String>, pub symbols: Vec<String>,
#[serde(with = "time::serde::rfc3339")] #[serde(skip_serializing_if = "Option::is_none")]
pub start: OffsetDateTime, #[serde(with = "time::serde::rfc3339::option")]
#[serde(with = "time::serde::rfc3339")] pub start: Option<OffsetDateTime>,
pub end: OffsetDateTime, #[serde(skip_serializing_if = "Option::is_none")]
pub limit: i64, #[serde(with = "time::serde::rfc3339::option")]
pub include_content: bool, pub end: Option<OffsetDateTime>,
pub exclude_contentless: bool, #[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub include_content: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub exclude_contentless: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub page_token: Option<String>, pub page_token: Option<String>,
} #[serde(skip_serializing_if = "Option::is_none")]
pub sort: Option<Sort>,
impl News {
pub fn new(
symbols: Vec<String>,
start: OffsetDateTime,
end: OffsetDateTime,
limit: i64,
include_content: bool,
exclude_contentless: bool,
page_token: Option<String>,
) -> Self {
Self {
symbols: symbols
.into_iter()
.map(|symbol| remove_slash_from_pair(&symbol))
.collect(),
start,
end,
limit,
include_content,
exclude_contentless,
page_token,
}
}
} }

View File

@@ -1,12 +1,15 @@
use serde::{Deserialize, Serialize};
use std::{ use std::{
fmt::{Display, Formatter}, fmt::{Display, Formatter},
str::FromStr, str::FromStr,
}; };
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Source { pub enum Source {
Iex, Iex,
Sip, Sip,
Otc,
} }
impl FromStr for Source { impl FromStr for Source {
@@ -26,6 +29,7 @@ impl Display for Source {
match self { match self {
Self::Iex => write!(f, "iex"), Self::Iex => write!(f, "iex"),
Self::Sip => write!(f, "sip"), Self::Sip => write!(f, "sip"),
Self::Otc => write!(f, "otc"),
} }
} }
} }

View File

@@ -1,28 +1,20 @@
use serde::Deserialize; use serde::Deserialize;
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MarketMessage {
pub trades: Vec<String>,
pub quotes: Vec<String>,
pub bars: Vec<String>,
pub updated_bars: Vec<String>,
pub daily_bars: Vec<String>,
pub orderbooks: Option<Vec<String>>,
pub statuses: Option<Vec<String>>,
pub lulds: Option<Vec<String>>,
pub cancel_errors: Option<Vec<String>>,
}
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NewsMessage {
pub news: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[serde(untagged)] #[serde(untagged)]
pub enum Message { pub enum Message {
Market(MarketMessage), #[serde(rename_all = "camelCase")]
News(NewsMessage), Market {
trades: Vec<String>,
quotes: Vec<String>,
bars: Vec<String>,
updated_bars: Vec<String>,
daily_bars: Vec<String>,
orderbooks: Option<Vec<String>>,
statuses: Option<Vec<String>>,
lulds: Option<Vec<String>>,
cancel_errors: Option<Vec<String>>,
},
#[serde(rename_all = "camelCase")]
News { news: Vec<String> },
} }

View File

@@ -2,12 +2,6 @@ use serde::Serialize;
#[derive(Serialize)] #[derive(Serialize)]
pub struct Message { pub struct Message {
key: String, pub key: String,
secret: String, pub secret: String,
}
impl Message {
pub const fn new(key: String, secret: String) -> Self {
Self { key, secret }
}
} }

View File

@@ -2,30 +2,27 @@ use crate::utils::remove_slash_from_pair;
use serde::Serialize; use serde::Serialize;
#[derive(Serialize)] #[derive(Serialize)]
#[serde(rename_all = "camelCase")] #[serde(untagged)]
pub struct MarketMessage { pub enum Message {
#[serde(rename_all = "camelCase")]
Market {
bars: Vec<String>, bars: Vec<String>,
updated_bars: Vec<String>, updated_bars: Vec<String>,
},
#[serde(rename_all = "camelCase")]
News { news: Vec<String> },
} }
impl MarketMessage { impl Message {
pub fn new(symbols: Vec<String>) -> Self { pub fn new_market(symbols: Vec<String>) -> Self {
Self { Self::Market {
bars: symbols.clone(), bars: symbols.clone(),
updated_bars: symbols, updated_bars: symbols,
} }
} }
}
#[derive(Serialize)] pub fn new_news(symbols: Vec<String>) -> Self {
#[serde(rename_all = "camelCase")] Self::News {
pub struct NewsMessage {
news: Vec<String>,
}
impl NewsMessage {
pub fn new(symbols: Vec<String>) -> Self {
Self {
news: symbols news: symbols
.into_iter() .into_iter()
.map(|symbol| remove_slash_from_pair(&symbol)) .map(|symbol| remove_slash_from_pair(&symbol))
@@ -33,10 +30,3 @@ impl NewsMessage {
} }
} }
} }
#[derive(Serialize)]
#[serde(untagged)]
pub enum Message {
Market(MarketMessage),
News(NewsMessage),
}

View File

@@ -10,20 +10,20 @@ pub struct Backfill {
pub time: OffsetDateTime, pub time: OffsetDateTime,
} }
impl Backfill {
pub const fn new(symbol: String, time: OffsetDateTime) -> Self {
Self { symbol, time }
}
}
impl From<Bar> for Backfill { impl From<Bar> for Backfill {
fn from(bar: Bar) -> Self { fn from(bar: Bar) -> Self {
Self::new(bar.symbol, bar.time) Self {
symbol: bar.symbol,
time: bar.time,
}
} }
} }
impl From<(News, String)> for Backfill { impl From<(News, String)> for Backfill {
fn from((news, symbol): (News, String)) -> Self { fn from((news, symbol): (News, String)) -> Self {
Self::new(symbol, news.time_created) Self {
symbol,
time: news.time_created,
}
} }
} }

View File

@@ -28,10 +28,10 @@ pub async fn authenticate(
sender sender
.send(Message::Text( .send(Message::Text(
to_string(&websocket::outgoing::Message::Auth( to_string(&websocket::outgoing::Message::Auth(
websocket::outgoing::auth::Message::new( websocket::outgoing::auth::Message {
app_config.alpaca_api_key.clone(), key: app_config.alpaca_api_key.clone(),
app_config.alpaca_api_secret.clone(), secret: app_config.alpaca_api_secret.clone(),
), },
)) ))
.unwrap(), .unwrap(),
)) ))