Add market data backfilling

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-09-09 17:48:49 +03:00
parent 548a8e42d5
commit e26d2b95e7
72 changed files with 1847 additions and 1044 deletions

73
src/config.rs Normal file
View File

@@ -0,0 +1,73 @@
use crate::types::Source;
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use http::HeaderMap;
use reqwest::Client;
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::{env, num::NonZeroU32, sync::Arc};
use time::{format_description::FormatItem, macros::format_description};
use tokio::time::Duration;
pub const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets";
pub const ALPACA_STOCK_DATA_URL: &str = "https://data.alpaca.markets/v2/stocks/bars";
pub const ALPACA_CRYPTO_DATA_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars";
pub const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2";
pub const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us";
pub const ALPACA_TIMESTAMP_FORMAT: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z");
const NUM_CLIENTS: u32 = 10;
pub struct Config {
pub alpaca_api_key: String,
pub alpaca_api_secret: String,
pub alpaca_client: Client,
pub alpaca_rate_limit: DefaultDirectRateLimiter,
pub alpaca_historical_offset: Duration,
pub alpaca_source: Source,
pub postgres_pool: PgPool,
}
impl Config {
pub async fn from_env() -> Self {
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set.");
let alpaca_api_key = env::var("ALPACA_API_KEY").expect("ALPACA_API_KEY must be set.");
let alpaca_api_secret =
env::var("ALPACA_API_SECRET").expect("ALPACA_API_SECRET must be set.");
let alpaca_source: Source = env::var("ALPACA_SOURCE")
.expect("ALPACA_SOURCE must be set.")
.parse()
.expect("ALPACA_SOURCE must be a either 'iex' or 'sip'.");
Self {
alpaca_api_key: alpaca_api_key.clone(),
alpaca_api_secret: alpaca_api_secret.clone(),
alpaca_client: Client::builder()
.default_headers({
let mut headers = HeaderMap::new();
headers.insert("APCA-API-KEY-ID", alpaca_api_key.parse().unwrap());
headers.insert("APCA-API-SECRET-KEY", alpaca_api_secret.parse().unwrap());
headers
})
.build()
.unwrap(),
alpaca_rate_limit: RateLimiter::direct(Quota::per_minute(match alpaca_source {
Source::Iex => NonZeroU32::new(200).unwrap(),
Source::Sip => NonZeroU32::new(1000).unwrap(),
})),
alpaca_historical_offset: Duration::from_secs(match alpaca_source {
Source::Iex => 900,
Source::Sip => 0,
}),
alpaca_source,
postgres_pool: PgPoolOptions::new()
.max_connections(NUM_CLIENTS)
.connect(&database_url)
.await
.unwrap(),
}
}
pub async fn arc_from_env() -> Arc<Self> {
Arc::new(Self::from_env().await)
}
}

143
src/data/historical.rs Normal file
View File

@@ -0,0 +1,143 @@
use crate::{
config::{Config, ALPACA_CRYPTO_DATA_URL, ALPACA_STOCK_DATA_URL, ALPACA_TIMESTAMP_FORMAT},
database,
time::{next_minute, ONE_MINUTE},
types::{api::incoming, Asset, Bar, Class},
};
use http::StatusCode;
use indexmap::IndexMap;
use log::{error, info};
use std::{collections::HashMap, sync::Arc};
use time::OffsetDateTime;
use tokio::{sync::RwLock, task::spawn_blocking, time::sleep};
pub async fn backfill(
app_config: Arc<Config>,
asset: Asset,
backfilled: Arc<RwLock<HashMap<String, bool>>>,
) {
info!("Backfilling historical data for {}...", asset.symbol);
let task_run_offsetdatetime = next_minute() + app_config.alpaca_historical_offset;
let fetch_from = asset.timestamp_last + ONE_MINUTE;
let fetch_until = task_run_offsetdatetime - app_config.alpaca_historical_offset - ONE_MINUTE;
if fetch_from > fetch_until {
return;
}
let mut current_time = fetch_from;
let asset_clone = asset.clone();
let mut bars = spawn_blocking(move || {
let mut bars = IndexMap::new();
while current_time <= fetch_until {
bars.insert(
current_time,
Bar::empty(current_time, asset_clone.symbol.clone()),
);
current_time += ONE_MINUTE;
}
bars
})
.await
.unwrap();
let wait_duration = task_run_offsetdatetime - OffsetDateTime::now_utc();
if wait_duration.is_positive() {
sleep(wait_duration.unsigned_abs()).await;
}
let mut next_page_token = None;
loop {
let request = app_config
.alpaca_client
.get(match asset.class {
Class::UsEquity => ALPACA_STOCK_DATA_URL,
Class::Crypto => ALPACA_CRYPTO_DATA_URL,
})
.query(&[
("symbols", &asset.symbol),
("timeframe", &String::from("1Min")),
(
"start",
&fetch_from
.format(ALPACA_TIMESTAMP_FORMAT)
.unwrap()
.to_string(),
),
(
"end",
&fetch_until
.format(ALPACA_TIMESTAMP_FORMAT)
.unwrap()
.to_string(),
),
("limit", &String::from("10000")),
("page_token", &next_page_token.clone().unwrap_or_default()),
]);
app_config.alpaca_rate_limit.until_ready().await;
let response = request.send().await.unwrap();
let mut response = if response.status() == StatusCode::OK {
response.json::<incoming::bar::Message>().await.unwrap()
} else {
error!(
"Failed to backfill historical data for {} from {} to {}: {}",
asset.symbol,
fetch_from,
fetch_until,
response.text().await.unwrap()
);
break;
};
for bar in response.bars.remove(&asset.symbol).unwrap().unwrap() {
bars.insert(bar.timestamp, Bar::from((bar, asset.symbol.clone())));
}
if response.next_page_token.is_none() {
break;
}
next_page_token = response.next_page_token;
}
let bars = bars.into_values().collect::<Vec<Bar>>();
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::upsert_batch(&app_config.postgres_pool, &bars).await;
database::bars_filled::upsert_batch(&app_config.postgres_pool, &bars).await;
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&asset.symbol,
&fetch_until,
)
.await;
backfill_recent_nulls(&app_config, &asset, &fetch_until, &backfilled).await;
transaction.commit().await.unwrap();
info!("Backfilled historical data for {}.", asset.symbol);
}
#[allow(clippy::significant_drop_tightening)]
async fn backfill_recent_nulls(
app_config: &Arc<Config>,
asset: &Asset,
from: &OffsetDateTime,
backfilled: &Arc<RwLock<HashMap<String, bool>>>,
) {
let mut backfilled = backfilled.write().await;
let bars = database::bars::select_where_symbol_where_timestamp_larger_than(
&app_config.postgres_pool,
&asset.symbol,
from,
)
.await;
database::bars_filled::upsert_batch(&app_config.postgres_pool, &bars).await;
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&asset.symbol,
&bars.last().unwrap().timestamp,
)
.await;
backfilled.insert(asset.symbol.clone(), true);
}

313
src/data/live.rs Normal file
View File

@@ -0,0 +1,313 @@
use crate::{
config::{Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL},
data::historical::backfill,
database,
time::{duration_until, last_minute, next_30s, ONE_MINUTE, THIRTY_SECONDS},
types::{
asset,
websocket::{incoming, outgoing},
Bar, BroadcastMessage, Class,
},
};
use core::panic;
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use log::{error, info, warn};
use serde_json::{from_str, to_string};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Instant,
};
use tokio::{
net::TcpStream,
spawn,
sync::{
broadcast::{Receiver, Sender},
RwLock,
},
time::interval_at,
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
pub async fn run(
app_config: Arc<Config>,
class: Class,
asset_broadcast_sender: Sender<BroadcastMessage>,
) {
info!("Running live data threads for {:?}.", class);
let websocket_url = match class {
Class::UsEquity => format!(
"{}/{}",
ALPACA_STOCK_WEBSOCKET_URL, app_config.alpaca_source
),
Class::Crypto => ALPACA_CRYPTO_WEBSOCKET_URL.to_string(),
};
let (stream, _) = connect_async(websocket_url).await.unwrap();
let (mut sink, mut stream) = stream.split();
authenticate_websocket(&app_config, &mut stream, &mut sink).await;
let sink = Arc::new(RwLock::new(sink));
let backfilled = Arc::new(RwLock::new(HashMap::new()));
spawn(websocket_broadcast_handler(
class,
sink.clone(),
asset_broadcast_sender.subscribe(),
));
database::assets::select_where_class(&app_config.postgres_pool, class)
.await
.into_iter()
.for_each(|asset| {
asset_broadcast_sender
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Added(
asset,
)))
.unwrap();
});
spawn(null_handler(app_config.clone(), backfilled.clone()));
websocket_message_handler(app_config, class, stream, sink, backfilled).await;
unreachable!()
}
async fn authenticate_websocket(
app_config: &Arc<Config>,
stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
sink: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
match stream.next().await {
Some(Ok(Message::Text(data)))
if from_str::<Vec<incoming::Message>>(&data).unwrap().get(0)
== Some(&incoming::Message::Success(incoming::success::Message {
msg: incoming::success::MessageType::Connected,
})) => {}
_ => panic!(),
}
sink.send(Message::Text(
to_string(&outgoing::Message::Auth(outgoing::auth::Message::new(
app_config.alpaca_api_key.clone(),
app_config.alpaca_api_secret.clone(),
)))
.unwrap(),
))
.await
.unwrap();
match stream.next().await {
Some(Ok(Message::Text(data)))
if from_str::<Vec<incoming::Message>>(&data).unwrap().get(0)
== Some(&incoming::Message::Success(incoming::success::Message {
msg: incoming::success::MessageType::Authenticated,
})) => {}
_ => panic!(),
};
}
async fn websocket_broadcast_handler(
class: Class,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
mut asset_broadcast_receiver: Receiver<BroadcastMessage>,
) {
loop {
match asset_broadcast_receiver.recv().await.unwrap() {
BroadcastMessage::Asset(asset::BroadcastMessage::Added(asset))
if asset.class == class =>
{
sink.write()
.await
.send(Message::Text(
serde_json::to_string(&outgoing::Message::Subscribe(
outgoing::subscribe::Message::new(asset.clone().symbol),
))
.unwrap(),
))
.await
.unwrap();
}
BroadcastMessage::Asset(asset::BroadcastMessage::Deleted(asset))
if asset.class == class =>
{
sink.write()
.await
.send(Message::Text(
serde_json::to_string(&outgoing::Message::Unsubscribe(
outgoing::subscribe::Message::new(asset.clone().symbol),
))
.unwrap(),
))
.await
.unwrap();
}
BroadcastMessage::Asset(_) => {}
}
}
}
async fn websocket_message_handler(
app_config: Arc<Config>,
class: Class,
mut stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
backfilled: Arc<RwLock<HashMap<String, bool>>>,
) {
loop {
match stream.next().await {
Some(Ok(Message::Text(data))) => {
let parsed_data = from_str::<Vec<incoming::Message>>(&data);
if let Err(e) = &parsed_data {
warn!("Unparsed incoming message: {:?}: {}", data, e);
}
for message in parsed_data.unwrap_or_default() {
handle_message(&app_config, class, message, &backfilled).await;
}
}
Some(Ok(Message::Ping(_))) => sink
.write()
.await
.send(Message::Pong(vec![]))
.await
.unwrap(),
Some(unknown) => error!("Unknown incoming message: {:?}", unknown),
None => panic!(),
}
}
}
async fn handle_message(
app_config: &Arc<Config>,
class: Class,
message: incoming::Message,
backfilled: &Arc<RwLock<HashMap<String, bool>>>,
) {
match message {
incoming::Message::Subscription(subscription_message) => {
let old_assets = backfilled
.read()
.await
.keys()
.cloned()
.collect::<HashSet<_>>();
let new_assets = subscription_message
.bars
.into_iter()
.collect::<HashSet<_>>();
let added_assets = new_assets.difference(&old_assets).collect::<HashSet<_>>();
let deleted_assets = old_assets.difference(&new_assets).collect::<HashSet<_>>();
for asset_symbol in &added_assets {
let asset =
database::assets::select_where_symbol(&app_config.postgres_pool, asset_symbol)
.await
.unwrap();
backfilled.write().await.insert(asset.symbol.clone(), false);
spawn(backfill(
app_config.clone(),
asset.clone(),
backfilled.clone(),
));
}
for asset_symbol in &deleted_assets {
backfilled.write().await.remove(*asset_symbol);
}
info!(
"Subscription update for {:?}: {:?} added, {:?} deleted.",
class, added_assets, deleted_assets
);
}
incoming::Message::Bars(bar_message) => {
let bar = Bar::from(bar_message);
info!("Incoming bar for {}: {}", bar.asset_symbol, bar.timestamp);
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::upsert(&app_config.postgres_pool, &bar).await;
if *backfilled.read().await.get(&bar.asset_symbol).unwrap() {
database::bars_filled::upsert(&app_config.postgres_pool, &bar).await;
}
transaction.commit().await.unwrap();
}
incoming::Message::UpdatedBars(bar_message) => {
let bar = Bar::from(bar_message);
info!("Incoming bar for {}: {}", bar.asset_symbol, bar.timestamp);
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::upsert(&app_config.postgres_pool, &bar).await;
if *backfilled.read().await.get(&bar.asset_symbol).unwrap() {
database::bars_filled::upsert(&app_config.postgres_pool, &bar).await;
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&bar.asset_symbol,
&bar.timestamp,
)
.await;
}
transaction.commit().await.unwrap();
}
incoming::Message::Success(_) => {}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum NullHandlerState {
Bars,
UpdatedBars,
}
#[allow(clippy::significant_drop_in_scrutinee)]
async fn null_handler(app_config: Arc<Config>, backfilled: Arc<RwLock<HashMap<String, bool>>>) {
let next_30s = next_30s();
let mut state = if next_30s.unix_timestamp() % 30 == 0 {
NullHandlerState::Bars
} else {
NullHandlerState::UpdatedBars
};
let mut interval = interval_at(
(Instant::now() + duration_until(next_30s)).into(),
THIRTY_SECONDS,
);
loop {
interval.tick().await;
let timestamp = last_minute() - ONE_MINUTE;
let backfilled = backfilled.read().await;
for asset_symbol in backfilled.keys().cloned() {
let bar = Bar::empty(timestamp, asset_symbol);
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::insert_or_skip(&app_config.postgres_pool, &bar).await;
if *backfilled.get(&bar.asset_symbol).unwrap() {
database::bars_filled::insert_or_skip(&app_config.postgres_pool, &bar).await;
if state == NullHandlerState::UpdatedBars {
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&bar.asset_symbol,
&bar.timestamp,
)
.await;
}
}
transaction.commit().await.unwrap();
}
state = match state {
NullHandlerState::Bars => NullHandlerState::UpdatedBars,
NullHandlerState::UpdatedBars => NullHandlerState::Bars,
};
}
}

2
src/data/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod historical;
pub mod live;

92
src/database/assets.rs Normal file
View File

@@ -0,0 +1,92 @@
use crate::types::{Asset, Class, Exchange};
use sqlx::{query_as, PgPool};
use std::convert::Into;
use time::OffsetDateTime;
pub async fn select(postgres_pool: &PgPool) -> Vec<Asset> {
query_as!(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last FROM assets"#
)
.fetch_all(postgres_pool)
.await
.unwrap()
}
pub async fn select_where_class(postgres_pool: &PgPool, class: Class) -> Vec<Asset> {
query_as!(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last FROM assets WHERE class = $1::CLASS"#,
class as Class
)
.fetch_all(postgres_pool)
.await
.unwrap()
}
pub async fn select_where_symbol(postgres_pool: &PgPool, symbol: &str) -> Option<Asset> {
query_as!(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last FROM assets WHERE symbol = $1"#,
symbol
)
.fetch_optional(postgres_pool)
.await
.unwrap()
}
pub async fn insert(postgres_pool: &PgPool, asset: &Asset) -> Asset {
query_as!(
Asset,
r#"INSERT INTO assets (symbol, class, exchange, trading, timestamp_added, timestamp_first, timestamp_last) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5, $6, $7)
RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last"#,
asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.timestamp_added, asset.timestamp_first, asset.timestamp_last
)
.fetch_one(postgres_pool)
.await
.unwrap()
}
pub async fn update_trading_where_symbol(
postgres_pool: &PgPool,
symbol: &str,
trading: &bool,
) -> Option<Asset> {
query_as!(
Asset,
r#"UPDATE assets SET trading = $1 WHERE symbol = $2
RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last"#,
trading, symbol
)
.fetch_optional(postgres_pool)
.await
.unwrap()
}
pub async fn update_timestamp_last_where_symbol(
postgres_pool: &PgPool,
symbol: &str,
timestamp_last: &OffsetDateTime,
) -> Option<Asset> {
query_as!(
Asset,
r#"UPDATE assets SET timestamp_last = $1 WHERE symbol = $2
RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last"#,
timestamp_last, symbol
)
.fetch_optional(postgres_pool)
.await
.unwrap()
}
pub async fn delete_where_symbol(postgres_pool: &PgPool, symbol: &str) -> Option<Asset> {
query_as!(
Asset,
r#"DELETE FROM assets WHERE symbol = $1
RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last"#,
symbol
)
.fetch_optional(postgres_pool)
.await
.unwrap()
}

89
src/database/bars.rs Normal file
View File

@@ -0,0 +1,89 @@
use crate::types::Bar;
use sqlx::{query_as, PgPool, Postgres};
use std::convert::Into;
use time::OffsetDateTime;
pub async fn upsert(postgres_pool: &PgPool, bar: &Bar) -> Bar {
query_as!(
Bar,
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9
RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#,
bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted
)
.fetch_one(postgres_pool)
.await
.unwrap()
}
pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar) {
query_as!(
Bar,
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (timestamp, asset_symbol) DO NOTHING"#,
bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted
)
.execute(postgres_pool)
.await
.unwrap();
}
pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec<Bar> {
let mut timestamp = Vec::with_capacity(bars.len());
let mut asset_symbol = Vec::with_capacity(bars.len());
let mut open = Vec::with_capacity(bars.len());
let mut high = Vec::with_capacity(bars.len());
let mut low = Vec::with_capacity(bars.len());
let mut close = Vec::with_capacity(bars.len());
let mut volume = Vec::with_capacity(bars.len());
let mut num_trades = Vec::with_capacity(bars.len());
let mut volume_weighted = Vec::with_capacity(bars.len());
for bar in bars {
timestamp.push(bar.timestamp);
asset_symbol.push(bar.asset_symbol.clone());
open.push(bar.open);
high.push(bar.high);
low.push(bar.low);
close.push(bar.close);
volume.push(bar.volume);
num_trades.push(bar.num_trades);
volume_weighted.push(bar.volume_weighted);
}
// No type-safety here because of NULLABLE bulk insert
query_as::<Postgres, Bar>(
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted)
SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[])
ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, num_trades = EXCLUDED.num_trades, volume_weighted = EXCLUDED.volume_weighted
RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#,
)
.bind(timestamp)
.bind(asset_symbol)
.bind(open)
.bind(high)
.bind(low)
.bind(close)
.bind(volume)
.bind(num_trades)
.bind(volume_weighted)
.fetch_all(postgres_pool)
.await
.unwrap()
}
pub async fn select_where_symbol_where_timestamp_larger_than(
postgres_pool: &PgPool,
symbol: &str,
timestamp: &OffsetDateTime,
) -> Vec<Bar> {
query_as!(
Bar,
r#"SELECT * FROM bars WHERE asset_symbol = $1 AND timestamp > $2 ORDER BY timestamp ASC"#,
symbol,
timestamp
)
.fetch_all(postgres_pool)
.await
.unwrap()
}

133
src/database/bars_filled.rs Normal file
View File

@@ -0,0 +1,133 @@
use crate::types::Bar;
use sqlx::{query_as, PgPool, Postgres};
use std::convert::Into;
pub async fn upsert(postgres_pool: &PgPool, bar: &Bar) -> Bar {
let mut bar = bar.clone();
if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() {
let filled_bar = query_as!(
Bar,
r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#,
bar.timestamp,
bar.asset_symbol
)
.fetch_one(postgres_pool)
.await
.unwrap();
bar.merge_empty(&filled_bar);
}
query_as!(
Bar,
r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9
RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#,
bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted
)
.fetch_one(postgres_pool)
.await
.unwrap()
}
pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar) {
let mut bar = bar.clone();
if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() {
let filled_bar = query_as!(
Bar,
r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#,
bar.timestamp,
bar.asset_symbol
)
.fetch_one(postgres_pool)
.await
.unwrap();
bar.merge_empty(&filled_bar);
}
query_as!(
Bar,
r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (timestamp, asset_symbol) DO NOTHING"#,
bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted
)
.execute(postgres_pool)
.await
.unwrap();
}
pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec<Bar> {
let mut bars = bars.to_vec();
if bars.is_empty() {
return bars;
}
if bars[0].open.is_none()
|| bars[0].high.is_none()
|| bars[0].low.is_none()
|| bars[0].close.is_none()
{
let filled_bar = &query_as!(
Bar,
r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#,
bars[0].timestamp,
bars[0].asset_symbol
)
.fetch_one(postgres_pool)
.await
.unwrap();
bars[0].merge_empty(filled_bar);
}
let mut timestamp = Vec::with_capacity(bars.len());
let mut asset_symbol = Vec::with_capacity(bars.len());
let mut open = Vec::with_capacity(bars.len());
let mut high = Vec::with_capacity(bars.len());
let mut low = Vec::with_capacity(bars.len());
let mut close = Vec::with_capacity(bars.len());
let mut volume = Vec::with_capacity(bars.len());
let mut num_trades = Vec::with_capacity(bars.len());
let mut volume_weighted = Vec::with_capacity(bars.len());
let mut last_filled_bar = bars[0].clone();
for mut bar in bars {
if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() {
bar.merge_empty(&last_filled_bar);
} else {
last_filled_bar = bar.clone();
}
timestamp.push(bar.timestamp);
asset_symbol.push(bar.asset_symbol.clone());
open.push(bar.open);
high.push(bar.high);
low.push(bar.low);
close.push(bar.close);
volume.push(bar.volume);
num_trades.push(bar.num_trades);
volume_weighted.push(bar.volume_weighted);
}
// No type-safety here because of NULLABLE bulk insert
query_as::<Postgres, Bar>(
r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted)
SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[])
ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, num_trades = EXCLUDED.num_trades, volume_weighted = EXCLUDED.volume_weighted
RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#,
)
.bind(timestamp)
.bind(asset_symbol)
.bind(open)
.bind(high)
.bind(low)
.bind(close)
.bind(volume)
.bind(num_trades)
.bind(volume_weighted)
.fetch_all(postgres_pool)
.await
.unwrap()
}

3
src/database/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod assets;
pub mod bars;
pub mod bars_filled;

49
src/main.rs Normal file
View File

@@ -0,0 +1,49 @@
#![warn(clippy::all, clippy::pedantic, clippy::nursery)]
#![allow(clippy::missing_docs_in_private_items)]
mod config;
mod data;
mod database;
mod routes;
mod time;
mod types;
use config::Config;
use dotenv::dotenv;
use log4rs::config::Deserializers;
use sqlx::error::BoxDynError;
use tokio::{spawn, sync::broadcast};
use types::{BroadcastMessage, Class};
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
dotenv().ok();
log4rs::init_file("log4rs.yaml", Deserializers::default())?;
let app_config = Config::arc_from_env().await;
let mut threads = Vec::new();
let (asset_broadcast_sender, _) = broadcast::channel::<BroadcastMessage>(100);
threads.push(spawn(data::live::run(
app_config.clone(),
Class::UsEquity,
asset_broadcast_sender.clone(),
)));
threads.push(spawn(data::live::run(
app_config.clone(),
Class::Crypto,
asset_broadcast_sender.clone(),
)));
threads.push(spawn(routes::run(
app_config.clone(),
asset_broadcast_sender,
)));
for thread in threads {
thread.await?;
}
unreachable!()
}

171
src/routes/assets.rs Normal file
View File

@@ -0,0 +1,171 @@
use crate::config::{
Config, ALPACA_ASSET_API_URL, ALPACA_CRYPTO_DATA_URL, ALPACA_STOCK_DATA_URL,
ALPACA_TIMESTAMP_FORMAT,
};
use crate::database;
use crate::types::Class;
use crate::types::{api::incoming, asset, Asset, BroadcastMessage, Status};
use axum::{extract::Path, http::StatusCode, Extension, Json};
use log::info;
use serde::Deserialize;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::broadcast::Sender;
pub async fn get_all(
Extension(app_config): Extension<Arc<Config>>,
) -> Result<(StatusCode, Json<Vec<Asset>>), StatusCode> {
let assets = database::assets::select(&app_config.postgres_pool).await;
Ok((StatusCode::OK, Json(assets)))
}
pub async fn get(
Extension(app_config): Extension<Arc<Config>>,
Path(symbol): Path<String>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> {
let asset = database::assets::select_where_symbol(&app_config.postgres_pool, &symbol).await;
asset.map_or(Err(StatusCode::NOT_FOUND), |asset| {
Ok((StatusCode::OK, Json(asset)))
})
}
#[derive(Deserialize)]
pub struct AddAssetRequest {
symbol: String,
trading: Option<bool>,
}
pub async fn add(
Extension(app_config): Extension<Arc<Config>>,
Extension(asset_broadcast_sender): Extension<Sender<BroadcastMessage>>,
Json(request): Json<AddAssetRequest>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> {
if database::assets::select_where_symbol(&app_config.postgres_pool, &request.symbol)
.await
.is_some()
{
return Err(StatusCode::CONFLICT);
}
app_config.alpaca_rate_limit.until_ready().await;
let asset = app_config
.alpaca_client
.get(&format!("{}/{}", ALPACA_ASSET_API_URL, request.symbol))
.send()
.await
.map_err(|e| match e.status() {
Some(StatusCode::NOT_FOUND) => StatusCode::NOT_FOUND,
_ => panic!(),
})?;
let asset = asset.json::<incoming::Asset>().await.unwrap();
if asset.status != Status::Active || !asset.tradable || !asset.fractionable {
return Err(StatusCode::FORBIDDEN);
}
let mut earliest_bar_request = app_config
.alpaca_client
.get(match asset.class {
Class::UsEquity => ALPACA_STOCK_DATA_URL,
Class::Crypto => ALPACA_CRYPTO_DATA_URL,
})
.query(&[
("symbols", &asset.symbol),
("timeframe", &String::from("1Min")),
(
"start",
&OffsetDateTime::UNIX_EPOCH
.format(ALPACA_TIMESTAMP_FORMAT)
.unwrap(),
),
("limit", &String::from("1")),
]);
if asset.class == Class::UsEquity {
earliest_bar_request =
earliest_bar_request.query(&[("feed", &app_config.alpaca_source.to_string())]);
}
let earliest_bar = earliest_bar_request
.send()
.await
.unwrap()
.json::<incoming::bar::Message>()
.await
.unwrap();
let earliest_bar = earliest_bar
.bars
.get(&asset.symbol)
.ok_or(StatusCode::NOT_FOUND)?
.as_ref()
.ok_or(StatusCode::NOT_FOUND)?
.first()
.ok_or(StatusCode::NOT_FOUND)?;
let asset = Asset::from((
asset,
request.trading.unwrap_or(false),
earliest_bar.timestamp,
));
database::assets::insert(&app_config.postgres_pool, &asset).await;
asset_broadcast_sender
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Added(
asset.clone(),
)))
.unwrap();
info!("Added asset {}.", asset.symbol);
Ok((StatusCode::CREATED, Json(asset)))
}
#[allow(dead_code)]
#[derive(Deserialize)]
pub struct UpdateAssetRequest {
trading: bool,
}
pub async fn update(
Extension(app_config): Extension<Arc<Config>>,
Extension(asset_broadcast_sender): Extension<Sender<BroadcastMessage>>,
Path(symbol): Path<String>,
Json(request): Json<UpdateAssetRequest>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> {
let asset = database::assets::update_trading_where_symbol(
&app_config.postgres_pool,
&symbol,
&request.trading,
)
.await;
asset.map_or(Err(StatusCode::NOT_FOUND), |asset| {
asset_broadcast_sender
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Updated(
asset.clone(),
)))
.unwrap();
info!("Updated asset {}.", symbol);
Ok((StatusCode::OK, Json(asset)))
})
}
pub async fn delete(
Extension(app_config): Extension<Arc<Config>>,
Extension(asset_broadcast_sender): Extension<Sender<BroadcastMessage>>,
Path(symbol): Path<String>,
) -> Result<StatusCode, StatusCode> {
let asset = database::assets::delete_where_symbol(&app_config.postgres_pool, &symbol).await;
asset.map_or(Err(StatusCode::NOT_FOUND), |asset| {
asset_broadcast_sender
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Deleted(
asset,
)))
.unwrap();
info!("Deleted asset {}.", symbol);
Ok(StatusCode::NO_CONTENT)
})
}

30
src/routes/mod.rs Normal file
View File

@@ -0,0 +1,30 @@
use crate::{config::Config, types::BroadcastMessage};
use axum::{
routing::{delete, get, post},
Extension, Router, Server,
};
use log::info;
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::broadcast::Sender;
pub mod assets;
pub async fn run(app_config: Arc<Config>, asset_broadcast_sender: Sender<BroadcastMessage>) {
let app = Router::new()
.route("/assets", get(assets::get_all))
.route("/assets/:symbol", get(assets::get))
.route("/assets", post(assets::add))
.route("/assets/:symbol", post(assets::update))
.route("/assets/:symbol", delete(assets::delete))
.layer(Extension(app_config))
.layer(Extension(asset_broadcast_sender));
let addr = SocketAddr::from(([0, 0, 0, 0], 7878));
info!("Listening on {}.", addr);
Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
unreachable!()
}

34
src/time.rs Normal file
View File

@@ -0,0 +1,34 @@
use std::time::Duration;
use time::OffsetDateTime;
pub const THIRTY_SECONDS: Duration = Duration::from_secs(30);
pub const ONE_MINUTE: Duration = Duration::from_secs(60);
pub fn last_minute() -> OffsetDateTime {
let now_timestamp = OffsetDateTime::now_utc().unix_timestamp();
OffsetDateTime::from_unix_timestamp(now_timestamp - now_timestamp % 60).unwrap()
}
pub fn next_minute() -> OffsetDateTime {
last_minute() + ONE_MINUTE
}
pub fn last_30s() -> OffsetDateTime {
let now_timestamp = OffsetDateTime::now_utc().unix_timestamp();
OffsetDateTime::from_unix_timestamp(now_timestamp - now_timestamp % 30).unwrap()
}
pub fn next_30s() -> OffsetDateTime {
last_30s() + THIRTY_SECONDS
}
pub fn duration_until(time: OffsetDateTime) -> Duration {
let now = OffsetDateTime::now_utc();
let duration = time - now;
if duration.is_positive() {
duration.unsigned_abs()
} else {
Duration::default()
}
}

View File

@@ -0,0 +1,21 @@
#![allow(clippy::struct_excessive_bools)]
use crate::types::{Class, Exchange, Status};
use serde::Deserialize;
#[derive(Deserialize)]
pub struct Asset {
pub id: String,
pub class: Class,
pub exchange: Exchange,
pub symbol: String,
pub name: String,
pub status: Status,
pub tradable: bool,
pub marginable: bool,
pub shortable: bool,
pub easy_to_borrow: bool,
pub fractionable: bool,
pub maintenance_margin_requirement: Option<f32>,
pub attributes: Option<Vec<String>>,
}

View File

@@ -0,0 +1,30 @@
use serde::Deserialize;
use std::collections::HashMap;
use time::OffsetDateTime;
#[derive(Debug, PartialEq, Deserialize)]
pub struct Bar {
#[serde(rename = "t")]
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
#[serde(rename = "o")]
pub open: f64,
#[serde(rename = "h")]
pub high: f64,
#[serde(rename = "l")]
pub low: f64,
#[serde(rename = "c")]
pub close: f64,
#[serde(rename = "v")]
pub volume: f64,
#[serde(rename = "n")]
pub num_trades: i64,
#[serde(rename = "vw")]
pub volume_weighted: f64,
}
#[derive(Debug, PartialEq, Deserialize)]
pub struct Message {
pub bars: HashMap<String, Option<Vec<Bar>>>,
pub next_page_token: Option<String>,
}

View File

@@ -0,0 +1,29 @@
use serde::{Deserialize, Deserializer};
use time::{macros::format_description, Date, Time};
#[derive(Debug, PartialEq, Eq, Deserialize)]
pub struct CalendarDate {
#[serde(deserialize_with = "deserialize_date")]
pub date: Date,
#[serde(deserialize_with = "deserialize_time")]
pub open: Time,
#[serde(deserialize_with = "deserialize_time")]
pub close: Time,
}
fn deserialize_date<'de, D>(deserializer: D) -> Result<Date, D::Error>
where
D: Deserializer<'de>,
{
let date_str = String::deserialize(deserializer)?;
Date::parse(&date_str, format_description!("[year]-[month]-[day]"))
.map_err(serde::de::Error::custom)
}
fn deserialize_time<'de, D>(deserializer: D) -> Result<Time, D::Error>
where
D: Deserializer<'de>,
{
let time_str = String::deserialize(deserializer)?;
Time::parse(&time_str, format_description!("[hour]:[minute]")).map_err(serde::de::Error::custom)
}

View File

@@ -0,0 +1,7 @@
pub mod asset;
pub mod bar;
pub mod calendar_date;
pub use asset::Asset;
pub use bar::Bar;
pub use calendar_date::CalendarDate;

1
src/types/api/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod incoming;

37
src/types/asset.rs Normal file
View File

@@ -0,0 +1,37 @@
use super::{api::incoming, class::Class, exchange::Exchange};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use time::OffsetDateTime;
#[derive(Clone, Debug, PartialEq, Eq, FromRow, Serialize, Deserialize, Hash)]
pub struct Asset {
pub symbol: String,
pub class: Class,
pub exchange: Exchange,
pub trading: bool,
pub timestamp_added: OffsetDateTime,
pub timestamp_first: OffsetDateTime,
pub timestamp_last: OffsetDateTime,
}
impl From<(incoming::Asset, bool, OffsetDateTime)> for Asset {
fn from((asset, trading, timestamp_first): (incoming::Asset, bool, OffsetDateTime)) -> Self {
Self {
symbol: asset.symbol,
class: asset.class,
exchange: asset.exchange,
trading,
timestamp_added: OffsetDateTime::now_utc(),
timestamp_first,
timestamp_last: timestamp_first,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum BroadcastMessage {
Added(Asset),
Updated(Asset),
Deleted(Asset),
Reset(Asset),
}

72
src/types/bar.rs Normal file
View File

@@ -0,0 +1,72 @@
use super::{api, websocket};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use time::OffsetDateTime;
#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Bar {
pub timestamp: OffsetDateTime,
pub asset_symbol: String,
pub open: Option<f64>,
pub high: Option<f64>,
pub low: Option<f64>,
pub close: Option<f64>,
pub volume: f64,
pub num_trades: i64,
pub volume_weighted: f64,
}
impl Bar {
pub const fn empty(timestamp: OffsetDateTime, asset_symbol: String) -> Self {
Self {
timestamp,
asset_symbol,
open: None,
high: None,
low: None,
close: None,
volume: 0.0,
num_trades: 0,
volume_weighted: 0.0,
}
}
pub fn merge_empty(&mut self, other: &Self) {
self.open = other.open;
self.high = other.high;
self.low = other.low;
self.close = other.close;
}
}
impl From<websocket::incoming::bar::Message> for Bar {
fn from(bar_message: websocket::incoming::bar::Message) -> Self {
Self {
timestamp: bar_message.timestamp,
asset_symbol: bar_message.symbol,
open: Some(bar_message.open),
high: Some(bar_message.high),
low: Some(bar_message.low),
close: Some(bar_message.close),
volume: bar_message.volume,
num_trades: bar_message.num_trades,
volume_weighted: bar_message.volume_weighted,
}
}
}
impl From<(api::incoming::Bar, String)> for Bar {
fn from((bar, asset_symbol): (api::incoming::Bar, String)) -> Self {
Self {
timestamp: bar.timestamp,
asset_symbol,
open: Some(bar.open),
high: Some(bar.high),
low: Some(bar.low),
close: Some(bar.close),
volume: bar.volume,
num_trades: bar.num_trades,
volume_weighted: bar.volume_weighted,
}
}
}

12
src/types/class.rs Normal file
View File

@@ -0,0 +1,12 @@
use serde::{Deserialize, Serialize};
use sqlx::Type;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type, Hash)]
pub enum Class {
#[sqlx(rename = "us_equity")]
#[serde(rename = "us_equity")]
UsEquity,
#[sqlx(rename = "crypto")]
#[serde(rename = "crypto")]
Crypto,
}

30
src/types/exchange.rs Normal file
View File

@@ -0,0 +1,30 @@
use serde::{Deserialize, Serialize};
use sqlx::Type;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type, Hash)]
pub enum Exchange {
#[sqlx(rename = "AMEX")]
#[serde(rename = "AMEX")]
Amex,
#[sqlx(rename = "ARCA")]
#[serde(rename = "ARCA")]
Arca,
#[sqlx(rename = "BATS")]
#[serde(rename = "BATS")]
Bats,
#[sqlx(rename = "NYSE")]
#[serde(rename = "NYSE")]
Nyse,
#[sqlx(rename = "NASDAQ")]
#[serde(rename = "NASDAQ")]
Nasdaq,
#[sqlx(rename = "NYSEARCA")]
#[serde(rename = "NYSEARCA")]
Nysearca,
#[sqlx(rename = "OTC")]
#[serde(rename = "OTC")]
Otc,
#[sqlx(rename = "CRYPTO")]
#[serde(rename = "CRYPTO")]
Crypto,
}

22
src/types/mod.rs Normal file
View File

@@ -0,0 +1,22 @@
pub mod api;
pub mod asset;
pub mod bar;
pub mod class;
pub mod exchange;
pub mod source;
pub mod status;
pub mod websocket;
pub use asset::Asset;
pub use bar::Bar;
pub use class::Class;
pub use exchange::Exchange;
pub use source::Source;
pub use status::Status;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum BroadcastMessage {
Asset(asset::BroadcastMessage),
}

31
src/types/source.rs Normal file
View File

@@ -0,0 +1,31 @@
use std::{
fmt::{Display, Formatter},
str::FromStr,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Source {
Iex,
Sip,
}
impl FromStr for Source {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"iex" => Ok(Self::Iex),
"sip" => Ok(Self::Sip),
_ => Err(format!("Unknown source: {s}")),
}
}
}
impl Display for Source {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
Self::Iex => write!(f, "iex"),
Self::Sip => write!(f, "sip"),
}
}
}

12
src/types/status.rs Normal file
View File

@@ -0,0 +1,12 @@
use serde::{Deserialize, Serialize};
use sqlx::Type;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type)]
pub enum Status {
#[sqlx(rename = "active")]
#[serde(rename = "active")]
Active,
#[sqlx(rename = "inactive")]
#[serde(rename = "inactive")]
Inactive,
}

View File

@@ -0,0 +1,25 @@
use serde::Deserialize;
use time::OffsetDateTime;
#[derive(Debug, PartialEq, Deserialize)]
pub struct Message {
#[serde(rename = "t")]
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
#[serde(rename = "S")]
pub symbol: String,
#[serde(rename = "o")]
pub open: f64,
#[serde(rename = "h")]
pub high: f64,
#[serde(rename = "l")]
pub low: f64,
#[serde(rename = "c")]
pub close: f64,
#[serde(rename = "v")]
pub volume: f64,
#[serde(rename = "n")]
pub num_trades: i64,
#[serde(rename = "vw")]
pub volume_weighted: f64,
}

View File

@@ -0,0 +1,18 @@
pub mod bar;
pub mod subscription;
pub mod success;
use serde::Deserialize;
#[derive(Debug, Deserialize, PartialEq)]
#[serde(tag = "T")]
pub enum Message {
#[serde(rename = "success")]
Success(success::Message),
#[serde(rename = "subscription")]
Subscription(subscription::Message),
#[serde(rename = "b")]
Bars(bar::Message),
#[serde(rename = "u")]
UpdatedBars(bar::Message),
}

View File

@@ -0,0 +1,17 @@
use serde::Deserialize;
#[derive(Debug, PartialEq, Eq, Deserialize)]
pub struct Message {
pub trades: Vec<String>,
pub quotes: Vec<String>,
pub bars: Vec<String>,
#[serde(rename = "updatedBars")]
pub updated_bars: Vec<String>,
#[serde(rename = "dailyBars")]
pub daily_bars: Vec<String>,
pub orderbooks: Option<Vec<String>>,
pub statuses: Option<Vec<String>>,
pub lulds: Option<Vec<String>>,
#[serde(rename = "cancelErrors")]
pub cancel_errors: Option<Vec<String>>,
}

View File

@@ -0,0 +1,14 @@
use serde::Deserialize;
#[derive(Debug, PartialEq, Eq, Deserialize)]
pub enum MessageType {
#[serde(rename = "connected")]
Connected,
#[serde(rename = "authenticated")]
Authenticated,
}
#[derive(Debug, PartialEq, Eq, Deserialize)]
pub struct Message {
pub msg: MessageType,
}

View File

@@ -0,0 +1,2 @@
pub mod incoming;
pub mod outgoing;

View File

@@ -0,0 +1,13 @@
use serde::Serialize;
#[derive(Debug, Serialize)]
pub struct Message {
key: String,
secret: String,
}
impl Message {
pub const fn new(key: String, secret: String) -> Self {
Self { key, secret }
}
}

View File

@@ -0,0 +1,15 @@
pub mod auth;
pub mod subscribe;
use serde::Serialize;
#[derive(Debug, Serialize)]
#[serde(tag = "action")]
pub enum Message {
#[serde(rename = "auth")]
Auth(auth::Message),
#[serde(rename = "subscribe")]
Subscribe(subscribe::Message),
#[serde(rename = "unsubscribe")]
Unsubscribe(subscribe::Message),
}

View File

@@ -0,0 +1,17 @@
use serde::Serialize;
#[derive(Debug, Serialize)]
pub struct Message {
bars: Vec<String>,
#[serde(rename = "updatedBars")]
updated_bars: Vec<String>,
}
impl Message {
pub fn new(symbol: String) -> Self {
Self {
bars: vec![symbol.clone()],
updated_bars: vec![symbol],
}
}
}