Add news data support

- Refactor everything in the process, oops

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-01-25 10:46:42 +00:00
parent 178a062c25
commit 002f70e299
53 changed files with 1683 additions and 677 deletions

View File

@@ -10,8 +10,11 @@ pub const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets";
pub const ALPACA_CLOCK_API_URL: &str = "https://api.alpaca.markets/v2/clock";
pub const ALPACA_STOCK_DATA_URL: &str = "https://data.alpaca.markets/v2/stocks/bars";
pub const ALPACA_CRYPTO_DATA_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars";
pub const ALPACA_NEWS_DATA_URL: &str = "https://data.alpaca.markets/v1beta1/news";
pub const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2";
pub const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us";
pub const ALPACA_NEWS_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta1/news";
pub struct Config {
pub alpaca_api_key: String,

View File

@@ -1,449 +0,0 @@
use crate::{
config::{Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL},
data::authenticate_websocket,
database,
types::{
alpaca::{api, websocket, Source},
state, Asset, Backfill, Bar, BroadcastMessage, Class,
},
utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE},
};
use backoff::{future::retry, ExponentialBackoff};
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use log::{error, info, warn};
use serde_json::{from_str, to_string};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use time::OffsetDateTime;
use tokio::{
net::TcpStream,
spawn,
sync::{broadcast::Sender, Mutex, RwLock},
task::JoinHandle,
time::sleep,
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
pub struct Guard {
symbols: HashSet<String>,
backfill_jobs: HashMap<String, JoinHandle<()>>,
pending_subscriptions: HashMap<String, Asset>,
pending_unsubscriptions: HashMap<String, Asset>,
}
pub async fn run(
app_config: Arc<Config>,
class: Class,
broadcast_bus_sender: Sender<BroadcastMessage>,
) {
info!("Running live threads for {:?}.", class);
let websocket_url = match class {
Class::UsEquity => format!(
"{}/{}",
ALPACA_STOCK_WEBSOCKET_URL, app_config.alpaca_source
),
Class::Crypto => ALPACA_CRYPTO_WEBSOCKET_URL.to_string(),
};
let (stream, _) = connect_async(websocket_url).await.unwrap();
let (mut sink, mut stream) = stream.split();
authenticate_websocket(&app_config, &mut stream, &mut sink).await;
let sink = Arc::new(Mutex::new(sink));
let guard = Arc::new(RwLock::new(Guard {
symbols: HashSet::new(),
backfill_jobs: HashMap::new(),
pending_subscriptions: HashMap::new(),
pending_unsubscriptions: HashMap::new(),
}));
spawn(broadcast_bus_handler(
app_config.clone(),
class,
sink.clone(),
broadcast_bus_sender.clone(),
guard.clone(),
));
spawn(websocket_handler(
app_config.clone(),
stream,
sink,
broadcast_bus_sender.clone(),
guard.clone(),
));
let assets = database::assets::select_where_class(&app_config.clickhouse_client, &class).await;
broadcast_bus_sender
.send(BroadcastMessage::Asset((
state::asset::BroadcastMessage::Add,
assets,
)))
.unwrap();
}
pub async fn broadcast_bus_handler(
app_config: Arc<Config>,
class: Class,
sink: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
broadcast_bus_sender: Sender<BroadcastMessage>,
guard: Arc<RwLock<Guard>>,
) {
let mut broadcast_bus_receiver = broadcast_bus_sender.subscribe();
loop {
let app_config = app_config.clone();
let sink = sink.clone();
let broadcast_bus_sender = broadcast_bus_sender.clone();
let guard = guard.clone();
let message = broadcast_bus_receiver.recv().await.unwrap();
spawn(broadcast_bus_handle_message(
app_config,
class,
sink,
broadcast_bus_sender,
guard,
message,
));
}
}
#[allow(clippy::significant_drop_tightening)]
#[allow(clippy::too_many_lines)]
async fn broadcast_bus_handle_message(
app_config: Arc<Config>,
class: Class,
sink: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
broadcast_bus_sender: Sender<BroadcastMessage>,
guard: Arc<RwLock<Guard>>,
message: BroadcastMessage,
) {
match message {
BroadcastMessage::Asset((action, mut assets)) => {
assets.retain(|asset| asset.class == class);
if assets.is_empty() {
return;
}
let assets = assets
.into_iter()
.map(|asset| (asset.symbol.clone(), asset))
.collect::<HashMap<_, _>>();
let symbols = assets.keys().cloned().collect::<Vec<_>>();
match action {
state::asset::BroadcastMessage::Add => {
database::assets::upsert_batch(
&app_config.clickhouse_client,
assets.clone().into_values(),
)
.await;
let mut guard = guard.write().await;
guard.symbols.extend(symbols.clone());
guard.pending_subscriptions.extend(assets);
info!("Added {:?}.", symbols);
sink.lock()
.await
.send(Message::Text(
to_string(&websocket::data::outgoing::Message::Subscribe(
websocket::data::outgoing::subscribe::Message::new(symbols),
))
.unwrap(),
))
.await
.unwrap();
}
state::asset::BroadcastMessage::Delete => {
database::assets::delete_where_symbols(&app_config.clickhouse_client, &symbols)
.await;
let mut guard = guard.write().await;
guard.symbols.retain(|symbol| !assets.contains_key(symbol));
guard.pending_unsubscriptions.extend(assets);
info!("Deleted {:?}.", symbols);
sink.lock()
.await
.send(Message::Text(
to_string(&websocket::data::outgoing::Message::Unsubscribe(
websocket::data::outgoing::subscribe::Message::new(symbols),
))
.unwrap(),
))
.await
.unwrap();
}
state::asset::BroadcastMessage::Backfill => {
let guard_clone = guard.clone();
let mut guard = guard.write().await;
info!("Creating backfill jobs for {:?}.", symbols);
for (symbol, asset) in assets {
if let Some(backfill_job) = guard.backfill_jobs.remove(&symbol) {
backfill_job.abort();
backfill_job.await.unwrap_err();
}
guard.backfill_jobs.insert(symbol.clone(), {
let guard = guard_clone.clone();
let app_config = app_config.clone();
spawn(async move {
backfill(app_config, class, asset.clone()).await;
let mut guard = guard.write().await;
guard.backfill_jobs.remove(&symbol);
})
});
}
}
state::asset::BroadcastMessage::Purge => {
let mut guard = guard.write().await;
info!("Purging {:?}.", symbols);
for (symbol, _) in assets {
if let Some(backfill_job) = guard.backfill_jobs.remove(&symbol) {
backfill_job.abort();
backfill_job.await.unwrap_err();
}
}
database::backfills::delete_where_symbols(
&app_config.clickhouse_client,
&symbols,
)
.await;
database::bars::delete_where_symbols(&app_config.clickhouse_client, &symbols)
.await;
}
}
}
BroadcastMessage::Clock(_) => {
broadcast_bus_sender
.send(BroadcastMessage::Asset((
state::asset::BroadcastMessage::Backfill,
database::assets::select(&app_config.clickhouse_client).await,
)))
.unwrap();
}
}
}
async fn websocket_handler(
app_config: Arc<Config>,
mut stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
sink: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
broadcast_bus_sender: Sender<BroadcastMessage>,
guard: Arc<RwLock<Guard>>,
) {
loop {
let app_config = app_config.clone();
let sink = sink.clone();
let broadcast_bus_sender = broadcast_bus_sender.clone();
let guard = guard.clone();
let message = stream.next().await.expect("Websocket stream closed.");
spawn(async move {
match message {
Ok(Message::Text(data)) => {
let parsed_data = from_str::<Vec<websocket::data::incoming::Message>>(&data);
if let Ok(messages) = parsed_data {
for message in messages {
websocket_handle_message(
app_config.clone(),
broadcast_bus_sender.clone(),
guard.clone(),
message,
)
.await;
}
} else {
error!(
"Unparsed websocket message: {:?}: {}.",
data,
parsed_data.unwrap_err()
);
}
}
Ok(Message::Ping(_)) => {
sink.lock().await.send(Message::Pong(vec![])).await.unwrap();
}
_ => error!("Unknown websocket message: {:?}.", message),
}
});
}
}
#[allow(clippy::significant_drop_tightening)]
async fn websocket_handle_message(
app_config: Arc<Config>,
broadcast_bus_sender: Sender<BroadcastMessage>,
guard: Arc<RwLock<Guard>>,
message: websocket::data::incoming::Message,
) {
match message {
websocket::data::incoming::Message::Subscription(message) => {
let symbols = message.bars.into_iter().collect::<HashSet<_>>();
let mut guard = guard.write().await;
let newly_subscribed_assets = guard
.pending_subscriptions
.extract_if(|symbol, _| symbols.contains(symbol))
.collect::<HashMap<_, _>>();
if !newly_subscribed_assets.is_empty() {
info!(
"Subscribed to {:?}.",
newly_subscribed_assets.keys().collect::<Vec<_>>()
);
broadcast_bus_sender
.send(BroadcastMessage::Asset((
state::asset::BroadcastMessage::Backfill,
newly_subscribed_assets.into_values().collect::<Vec<_>>(),
)))
.unwrap();
}
let newly_unsubscribed_assets = guard
.pending_unsubscriptions
.extract_if(|symbol, _| !symbols.contains(symbol))
.collect::<HashMap<_, _>>();
if !newly_unsubscribed_assets.is_empty() {
info!(
"Unsubscribed from {:?}.",
newly_unsubscribed_assets.keys().collect::<Vec<_>>()
);
broadcast_bus_sender
.send(BroadcastMessage::Asset((
state::asset::BroadcastMessage::Purge,
newly_unsubscribed_assets.into_values().collect::<Vec<_>>(),
)))
.unwrap();
}
}
websocket::data::incoming::Message::Bars(bar_message)
| websocket::data::incoming::Message::UpdatedBars(bar_message) => {
let bar = Bar::from(bar_message);
let guard = guard.read().await;
let symbol_status = guard.symbols.get(&bar.symbol);
if symbol_status.is_none() {
warn!(
"Race condition: received bar for unsubscribed symbol: {:?}.",
bar.symbol
);
return;
}
info!("Received bar for {}: {}.", bar.symbol, bar.time);
database::bars::upsert(&app_config.clickhouse_client, &bar).await;
}
websocket::data::incoming::Message::Success(_) => {}
}
}
pub async fn backfill(app_config: Arc<Config>, class: Class, asset: Asset) {
let latest_backfill = database::backfills::select_latest_where_symbol(
&app_config.clickhouse_client,
&asset.symbol,
)
.await;
let fetch_from = if let Some(backfill) = latest_backfill {
backfill.time + ONE_MINUTE
} else {
OffsetDateTime::UNIX_EPOCH
};
let fetch_until = last_minute();
if fetch_from > fetch_until {
return;
}
if app_config.alpaca_source == Source::Iex {
let task_run_delay = duration_until(fetch_until + FIFTEEN_MINUTES + ONE_MINUTE);
info!(
"Queing backfill for {} in {:?}.",
asset.symbol, task_run_delay
);
sleep(task_run_delay).await;
}
info!("Running backfill for {}.", asset.symbol);
let mut bars = Vec::new();
let mut next_page_token = None;
loop {
let message = retry(ExponentialBackoff::default(), || async {
app_config.alpaca_rate_limit.until_ready().await;
app_config
.alpaca_client
.get(class.get_data_url())
.query(&api::outgoing::bar::Bar::new(
vec![asset.symbol.clone()],
ONE_MINUTE,
fetch_from,
fetch_until,
10000,
next_page_token.clone(),
))
.send()
.await?
.error_for_status()?
.json::<api::incoming::bar::Message>()
.await
.map_err(backoff::Error::Permanent)
})
.await;
let message = match message {
Ok(message) => message,
Err(e) => {
error!("Failed to backfill data for {}: {}.", asset.symbol, e);
return;
}
};
message.bars.into_iter().for_each(|(symbol, bar_vec)| {
bar_vec.unwrap_or_default().into_iter().for_each(|bar| {
bars.push(Bar::from((bar, symbol.clone())));
});
});
if message.next_page_token.is_none() {
break;
}
next_page_token = message.next_page_token;
}
database::bars::upsert_batch(&app_config.clickhouse_client, bars).await;
database::backfills::upsert(
&app_config.clickhouse_client,
&Backfill::new(asset.symbol.clone(), fetch_until),
)
.await;
info!("Backfilled data for {}.", asset.symbol);
}

View File

@@ -1,53 +0,0 @@
pub mod clock;
pub mod market;
use crate::{config::Config, types::alpaca::websocket};
use core::panic;
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use serde_json::{from_str, to_string};
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
async fn authenticate_websocket(
app_config: &Arc<Config>,
stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
sink: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
match stream.next().await {
Some(Ok(Message::Text(data)))
if from_str::<Vec<websocket::data::incoming::Message>>(&data)
.unwrap()
.first()
== Some(&websocket::data::incoming::Message::Success(
websocket::data::incoming::success::Message::Connected,
)) => {}
_ => panic!("Failed to connect to Alpaca websocket."),
}
sink.send(Message::Text(
to_string(&websocket::data::outgoing::Message::Auth(
websocket::data::outgoing::auth::Message::new(
app_config.alpaca_api_key.clone(),
app_config.alpaca_api_secret.clone(),
),
))
.unwrap(),
))
.await
.unwrap();
match stream.next().await {
Some(Ok(Message::Text(data)))
if from_str::<Vec<websocket::data::incoming::Message>>(&data)
.unwrap()
.first()
== Some(&websocket::data::incoming::Message::Success(
websocket::data::incoming::success::Message::Authenticated,
)) => {}
_ => panic!("Failed to authenticate with Alpaca websocket."),
};
}

View File

@@ -1,4 +1,4 @@
use crate::types::{Asset, Class};
use crate::types::Asset;
use clickhouse::Client;
use serde::Serialize;
@@ -10,21 +10,13 @@ pub async fn select(clickhouse_client: &Client) -> Vec<Asset> {
.unwrap()
}
pub async fn select_where_class(clickhouse_client: &Client, class: &Class) -> Vec<Asset> {
clickhouse_client
.query("SELECT ?fields FROM assets FINAL WHERE class = ?")
.bind(class)
.fetch_all::<Asset>()
.await
.unwrap()
}
pub async fn select_where_symbol<T>(clickhouse_client: &Client, symbol: &T) -> Option<Asset>
where
T: AsRef<str> + Serialize + Send + Sync,
{
clickhouse_client
.query("SELECT ?fields FROM assets FINAL WHERE symbol = ?")
.query("SELECT ?fields FROM assets FINAL WHERE symbol = ? OR abbreviation = ?")
.bind(symbol)
.bind(symbol)
.fetch_optional::<Asset>()
.await

View File

@@ -1,48 +1,93 @@
use crate::types::Backfill;
use crate::{database::assets, threads::data::ThreadType, types::Backfill};
use clickhouse::Client;
use serde::Serialize;
use tokio::join;
pub async fn select_latest_where_symbol<T>(
clickhouse_client: &Client,
thread_type: &ThreadType,
symbol: &T,
) -> Option<Backfill>
where
T: AsRef<str> + Serialize + Send + Sync,
{
clickhouse_client
.query("SELECT ?fields FROM backfills FINAL WHERE symbol = ? ORDER BY time DESC LIMIT 1")
.query(&format!(
"SELECT ?fields FROM {} FINAL WHERE symbol = ? ORDER BY time DESC LIMIT 1",
match thread_type {
ThreadType::Bars(_) => "backfills_bars",
ThreadType::News => "backfills_news",
}
))
.bind(symbol)
.fetch_optional::<Backfill>()
.await
.unwrap()
}
pub async fn upsert(clickhouse_client: &Client, backfill: &Backfill) {
let mut insert = clickhouse_client.insert("backfills").unwrap();
pub async fn upsert(clickhouse_client: &Client, thread_type: &ThreadType, backfill: &Backfill) {
let mut insert = clickhouse_client
.insert(match thread_type {
ThreadType::Bars(_) => "backfills_bars",
ThreadType::News => "backfills_news",
})
.unwrap();
insert.write(backfill).await.unwrap();
insert.end().await.unwrap();
}
pub async fn delete_where_symbols<T>(clickhouse_client: &Client, symbols: &[T])
where
pub async fn delete_where_symbols<T>(
clickhouse_client: &Client,
thread_type: &ThreadType,
symbols: &[T],
) where
T: AsRef<str> + Serialize + Send + Sync,
{
clickhouse_client
.query("DELETE FROM backfills WHERE symbol IN ?")
.query(&format!(
"DELETE FROM {} WHERE symbol IN ?",
match thread_type {
ThreadType::Bars(_) => "backfills_bars",
ThreadType::News => "backfills_news",
}
))
.bind(symbols)
.execute()
.await
.unwrap();
}
pub async fn delete_where_not_symbols<T>(clickhouse_client: &Client, symbols: &[T])
where
T: AsRef<str> + Serialize + Send + Sync,
{
clickhouse_client
.query("DELETE FROM backfills WHERE symbol NOT IN ?")
.bind(symbols)
.execute()
.await
.unwrap();
pub async fn cleanup(clickhouse_client: &Client) {
let assets = assets::select(clickhouse_client).await;
let bars_symbols = assets
.clone()
.into_iter()
.map(|asset| asset.symbol)
.collect::<Vec<_>>();
let news_symbols = assets
.into_iter()
.map(|asset| asset.abbreviation)
.collect::<Vec<_>>();
let delete_bars_future = async {
clickhouse_client
.query("DELETE FROM backfills_bars WHERE symbol NOT IN ?")
.bind(bars_symbols)
.execute()
.await
.unwrap();
};
let delete_news_future = async {
clickhouse_client
.query("DELETE FROM backfills_news WHERE symbol NOT IN ?")
.bind(news_symbols)
.execute()
.await
.unwrap();
};
join!(delete_bars_future, delete_news_future);
}

View File

@@ -1,3 +1,4 @@
use super::assets;
use crate::types::Bar;
use clickhouse::Client;
use serde::Serialize;
@@ -32,10 +33,14 @@ where
.unwrap();
}
pub async fn delete_where_not_symbols<T>(clickhouse_client: &Client, symbols: &[T])
where
T: AsRef<str> + Serialize + Send + Sync,
{
pub async fn cleanup(clickhouse_client: &Client) {
let assets = assets::select(clickhouse_client).await;
let symbols = assets
.into_iter()
.map(|asset| asset.symbol)
.collect::<Vec<_>>();
clickhouse_client
.query("DELETE FROM bars WHERE symbol NOT IN ?")
.bind(symbols)

View File

@@ -1,3 +1,4 @@
pub mod assets;
pub mod backfills;
pub mod bars;
pub mod news;

50
src/database/news.rs Normal file
View File

@@ -0,0 +1,50 @@
use super::assets;
use crate::types::News;
use clickhouse::Client;
use serde::Serialize;
pub async fn upsert(clickhouse_client: &Client, news: &News) {
let mut insert = clickhouse_client.insert("news").unwrap();
insert.write(news).await.unwrap();
insert.end().await.unwrap();
}
pub async fn upsert_batch<T>(clickhouse_client: &Client, news: T)
where
T: IntoIterator<Item = News> + Send + Sync,
T::IntoIter: Send,
{
let mut insert = clickhouse_client.insert("news").unwrap();
for news in news {
insert.write(&news).await.unwrap();
}
insert.end().await.unwrap();
}
pub async fn delete_where_symbols<T>(clickhouse_client: &Client, symbols: &[T])
where
T: AsRef<str> + Serialize + Send + Sync,
{
clickhouse_client
.query("DELETE FROM news WHERE hasAny(symbols, ?)")
.bind(symbols)
.execute()
.await
.unwrap();
}
pub async fn cleanup(clickhouse_client: &Client) {
let assets = assets::select(clickhouse_client).await;
let symbols = assets
.into_iter()
.map(|asset| asset.abbreviation)
.collect::<Vec<_>>();
clickhouse_client
.query("DELETE FROM news WHERE NOT hasAny(symbols, ?)")
.bind(symbols)
.execute()
.await
.unwrap();
}

View File

@@ -3,9 +3,9 @@
#![feature(hash_extract_if)]
mod config;
mod data;
mod database;
mod routes;
mod threads;
mod types;
mod utils;
@@ -13,8 +13,7 @@ use crate::utils::cleanup;
use config::Config;
use dotenv::dotenv;
use log4rs::config::Deserializers;
use tokio::{spawn, sync::broadcast};
use types::{BroadcastMessage, Class};
use tokio::{spawn, sync::mpsc};
#[tokio::main]
async fn main() {
@@ -24,21 +23,27 @@ async fn main() {
cleanup(&app_config.clickhouse_client).await;
let (broadcast_bus, _) = broadcast::channel::<BroadcastMessage>(100);
let (asset_status_sender, asset_status_receiver) =
mpsc::channel::<threads::data::asset_status::Message>(100);
let (clock_sender, clock_receiver) = mpsc::channel::<threads::clock::Message>(1);
spawn(data::market::run(
spawn(threads::data::run(
app_config.clone(),
Class::UsEquity,
broadcast_bus.clone(),
asset_status_receiver,
clock_receiver,
));
spawn(data::market::run(
app_config.clone(),
Class::Crypto,
broadcast_bus.clone(),
));
spawn(threads::clock::run(app_config.clone(), clock_sender));
spawn(data::clock::run(app_config.clone(), broadcast_bus.clone()));
let assets = database::assets::select(&app_config.clickhouse_client).await;
routes::run(app_config, broadcast_bus).await;
let (asset_status_message, asset_status_receiver) =
threads::data::asset_status::Message::new(threads::data::asset_status::Action::Add, assets);
asset_status_sender
.send(asset_status_message)
.await
.unwrap();
asset_status_receiver.await.unwrap();
routes::run(app_config, asset_status_sender).await;
}

View File

@@ -1,9 +1,8 @@
use crate::{
config::{Config, ALPACA_ASSET_API_URL},
database,
database, threads,
types::{
alpaca::api::incoming::{self, asset::Status},
state::{self, BroadcastMessage},
Asset,
},
};
@@ -13,7 +12,7 @@ use core::panic;
use http::StatusCode;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc;
pub async fn get(
Extension(app_config): Extension<Arc<Config>>,
@@ -39,7 +38,7 @@ pub struct AddAssetRequest {
pub async fn add(
Extension(app_config): Extension<Arc<Config>>,
Extension(broadcast_bus_sender): Extension<Sender<BroadcastMessage>>,
Extension(asset_status_sender): Extension<mpsc::Sender<threads::data::asset_status::Message>>,
Json(request): Json<AddAssetRequest>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> {
if database::assets::select_where_symbol(&app_config.clickhouse_client, &request.symbol)
@@ -77,31 +76,39 @@ pub async fn add(
let asset = Asset::from(asset);
broadcast_bus_sender
.send(BroadcastMessage::Asset((
state::asset::BroadcastMessage::Add,
vec![asset.clone()],
)))
let (asset_status_message, asset_status_response) = threads::data::asset_status::Message::new(
threads::data::asset_status::Action::Add,
vec![asset.clone()],
);
asset_status_sender
.send(asset_status_message)
.await
.unwrap();
asset_status_response.await.unwrap();
Ok((StatusCode::CREATED, Json(asset)))
}
pub async fn delete(
Extension(app_config): Extension<Arc<Config>>,
Extension(broadcast_bus_sender): Extension<Sender<BroadcastMessage>>,
Extension(asset_status_sender): Extension<mpsc::Sender<threads::data::asset_status::Message>>,
Path(symbol): Path<String>,
) -> Result<StatusCode, StatusCode> {
let asset = database::assets::select_where_symbol(&app_config.clickhouse_client, &symbol)
.await
.ok_or(StatusCode::NOT_FOUND)?;
broadcast_bus_sender
.send(BroadcastMessage::Asset((
state::asset::BroadcastMessage::Delete,
vec![asset],
)))
let (asset_status_message, asset_status_response) = threads::data::asset_status::Message::new(
threads::data::asset_status::Action::Remove,
vec![asset],
);
asset_status_sender
.send(asset_status_message)
.await
.unwrap();
asset_status_response.await.unwrap();
Ok(StatusCode::NO_CONTENT)
}

View File

@@ -1,22 +1,25 @@
use crate::{config::Config, types::BroadcastMessage};
pub mod assets;
use crate::{config::Config, threads};
use axum::{
routing::{delete, get, post},
serve, Extension, Router,
};
use log::info;
use std::{net::SocketAddr, sync::Arc};
use tokio::{net::TcpListener, sync::broadcast::Sender};
use tokio::{net::TcpListener, sync::mpsc};
pub mod assets;
pub async fn run(app_config: Arc<Config>, broadcast_sender: Sender<BroadcastMessage>) {
pub async fn run(
app_config: Arc<Config>,
asset_status_sender: mpsc::Sender<threads::data::asset_status::Message>,
) {
let app = Router::new()
.route("/assets", get(assets::get))
.route("/assets/:symbol", get(assets::get_where_symbol))
.route("/assets", post(assets::add))
.route("/assets/:symbol", delete(assets::delete))
.layer(Extension(app_config))
.layer(Extension(broadcast_sender));
.layer(Extension(asset_status_sender));
let addr = SocketAddr::from(([0, 0, 0, 0], 7878));
let listener = TcpListener::bind(addr).await.unwrap();

View File

@@ -1,17 +1,41 @@
use crate::{
config::{Config, ALPACA_CLOCK_API_URL},
types::{
alpaca,
state::{self, BroadcastMessage},
},
types::alpaca,
utils::duration_until,
};
use backoff::{future::retry, ExponentialBackoff};
use log::info;
use std::sync::Arc;
use tokio::{sync::broadcast::Sender, time::sleep};
use time::OffsetDateTime;
use tokio::{sync::mpsc, time::sleep};
pub async fn run(app_config: Arc<Config>, broadcast_bus_sender: Sender<BroadcastMessage>) {
pub enum Status {
Open,
Closed,
}
pub struct Message {
pub status: Status,
pub next_switch: OffsetDateTime,
}
impl From<alpaca::api::incoming::clock::Clock> for Message {
fn from(clock: alpaca::api::incoming::clock::Clock) -> Self {
if clock.is_open {
Self {
status: Status::Open,
next_switch: clock.next_close,
}
} else {
Self {
status: Status::Closed,
next_switch: clock.next_open,
}
}
}
}
pub async fn run(app_config: Arc<Config>, clock_sender: mpsc::Sender<Message>) {
loop {
let clock = retry(ExponentialBackoff::default(), || async {
app_config.alpaca_rate_limit.until_ready().await;
@@ -37,13 +61,6 @@ pub async fn run(app_config: Arc<Config>, broadcast_bus_sender: Sender<Broadcast
});
sleep(sleep_until).await;
broadcast_bus_sender
.send(BroadcastMessage::Clock(if clock.is_open {
state::clock::BroadcastMessage::Open
} else {
state::clock::BroadcastMessage::Close
}))
.unwrap();
clock_sender.send(clock.into()).await.unwrap();
}
}

View File

@@ -0,0 +1,174 @@
use super::{Guard, ThreadType};
use crate::{
config::Config,
database,
types::{alpaca::websocket, Asset},
};
use futures_util::{stream::SplitSink, SinkExt};
use log::info;
use serde_json::to_string;
use std::sync::Arc;
use tokio::{
join,
net::TcpStream,
spawn,
sync::{mpsc, oneshot, Mutex, RwLock},
};
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
#[derive(Clone)]
pub enum Action {
Add,
Remove,
}
pub struct Message {
pub action: Action,
pub assets: Vec<Asset>,
pub response: oneshot::Sender<()>,
}
impl Message {
pub fn new(action: Action, assets: Vec<Asset>) -> (Self, oneshot::Receiver<()>) {
let (sender, receiver) = oneshot::channel::<()>();
(
Self {
action,
assets,
response: sender,
},
receiver,
)
}
}
pub async fn run(
app_config: Arc<Config>,
thread_type: ThreadType,
guard: Arc<RwLock<Guard>>,
mut asset_status_receiver: mpsc::Receiver<Message>,
websocket_sender: Arc<
Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>,
>,
) {
loop {
let app_config = app_config.clone();
let guard = guard.clone();
let websocket_sender = websocket_sender.clone();
let message = asset_status_receiver.recv().await.unwrap();
spawn(handle_asset_status_message(
app_config,
thread_type,
guard,
websocket_sender,
message,
));
}
}
#[allow(clippy::significant_drop_tightening)]
async fn handle_asset_status_message(
app_config: Arc<Config>,
thread_type: ThreadType,
guard: Arc<RwLock<Guard>>,
websocket_sender: Arc<
Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>,
>,
message: Message,
) {
let symbols = message
.assets
.clone()
.into_iter()
.map(|asset| match thread_type {
ThreadType::Bars(_) => asset.symbol,
ThreadType::News => asset.abbreviation,
})
.collect::<Vec<_>>();
match message.action {
Action::Add => {
let mut guard = guard.write().await;
guard.symbols.extend(symbols.clone());
guard
.pending_subscriptions
.extend(symbols.clone().into_iter().zip(message.assets.clone()));
info!("{:?} - Added {:?}.", thread_type, symbols);
let database_future = async {
if matches!(thread_type, ThreadType::Bars(_)) {
database::assets::upsert_batch(&app_config.clickhouse_client, message.assets)
.await;
}
};
let websocket_future = async move {
websocket_sender
.lock()
.await
.send(tungstenite::Message::Text(
to_string(&websocket::outgoing::Message::Subscribe(
websocket_market_message_factory(thread_type, symbols),
))
.unwrap(),
))
.await
.unwrap();
};
join!(database_future, websocket_future);
}
Action::Remove => {
let mut guard = guard.write().await;
guard.symbols.retain(|symbol| !symbols.contains(symbol));
guard
.pending_unsubscriptions
.extend(symbols.clone().into_iter().zip(message.assets.clone()));
info!("{:?} - Removed {:?}.", thread_type, symbols);
let sybols_clone = symbols.clone();
let database_future = database::assets::delete_where_symbols(
&app_config.clickhouse_client,
&sybols_clone,
);
let websocket_future = async move {
websocket_sender
.lock()
.await
.send(tungstenite::Message::Text(
to_string(&websocket::outgoing::Message::Unsubscribe(
websocket_market_message_factory(thread_type, symbols),
))
.unwrap(),
))
.await
.unwrap();
};
join!(database_future, websocket_future);
}
}
message.response.send(()).unwrap();
}
fn websocket_market_message_factory(
thread_type: ThreadType,
symbols: Vec<String>,
) -> websocket::outgoing::subscribe::Message {
match thread_type {
ThreadType::Bars(_) => websocket::outgoing::subscribe::Message::Market(
websocket::outgoing::subscribe::MarketMessage::new(symbols),
),
ThreadType::News => websocket::outgoing::subscribe::Message::News(
websocket::outgoing::subscribe::NewsMessage::new(symbols),
),
}
}

View File

@@ -0,0 +1,374 @@
use super::{Guard, ThreadType};
use crate::{
config::{Config, ALPACA_CRYPTO_DATA_URL, ALPACA_NEWS_DATA_URL, ALPACA_STOCK_DATA_URL},
database,
types::{
alpaca::{api, Source},
Asset, Bar, Class, News, Subset,
},
utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE},
};
use backoff::{future::retry, ExponentialBackoff};
use log::{error, info};
use std::{collections::HashMap, sync::Arc};
use time::OffsetDateTime;
use tokio::{
join, spawn,
sync::{mpsc, oneshot, Mutex, RwLock},
task::JoinHandle,
time::sleep,
};
pub enum Action {
Backfill,
Purge,
}
pub struct Message {
pub action: Action,
pub assets: Subset<Asset>,
pub response: oneshot::Sender<()>,
}
impl Message {
pub fn new(action: Action, assets: Subset<Asset>) -> (Self, oneshot::Receiver<()>) {
let (sender, receiver) = oneshot::channel::<()>();
(
Self {
action,
assets,
response: sender,
},
receiver,
)
}
}
pub async fn run(
app_config: Arc<Config>,
thread_type: ThreadType,
guard: Arc<RwLock<Guard>>,
mut backfill_receiver: mpsc::Receiver<Message>,
) {
let backfill_jobs = Arc::new(Mutex::new(HashMap::new()));
let data_url = match thread_type {
ThreadType::Bars(Class::UsEquity) => ALPACA_STOCK_DATA_URL.to_string(),
ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_DATA_URL.to_string(),
ThreadType::News => ALPACA_NEWS_DATA_URL.to_string(),
};
loop {
let app_config = app_config.clone();
let guard = guard.clone();
let backfill_jobs = backfill_jobs.clone();
let data_url = data_url.clone();
let message = backfill_receiver.recv().await.unwrap();
spawn(handle_backfill_message(
app_config,
thread_type,
guard,
data_url,
backfill_jobs,
message,
));
}
}
#[allow(clippy::significant_drop_tightening)]
#[allow(clippy::too_many_lines)]
async fn handle_backfill_message(
app_config: Arc<Config>,
thread_type: ThreadType,
guard: Arc<RwLock<Guard>>,
data_url: String,
backfill_jobs: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
message: Message,
) {
let guard = guard.read().await;
let mut backfill_jobs = backfill_jobs.lock().await;
let symbols = match message.assets {
Subset::All => guard.symbols.clone().into_iter().collect::<Vec<_>>(),
Subset::Some(assets) => assets
.into_iter()
.map(|asset| match thread_type {
ThreadType::Bars(_) => asset.symbol,
ThreadType::News => asset.abbreviation,
})
.filter(|symbol| match message.action {
Action::Backfill => guard.symbols.contains(symbol),
Action::Purge => !guard.symbols.contains(symbol),
})
.collect::<Vec<_>>(),
};
match message.action {
Action::Backfill => {
for symbol in symbols {
if let Some(job) = backfill_jobs.remove(&symbol) {
if !job.is_finished() {
job.abort();
}
job.await.unwrap_err();
}
let app_config = app_config.clone();
let data_url = data_url.clone();
backfill_jobs.insert(
symbol.clone(),
spawn(async move {
let (fetch_from, fetch_to) =
queue_backfill(&app_config, thread_type, &symbol).await;
match thread_type {
ThreadType::Bars(_) => {
execute_backfill_bars(
app_config,
thread_type,
data_url,
symbol,
fetch_from,
fetch_to,
)
.await;
}
ThreadType::News => {
execute_backfill_news(
app_config,
thread_type,
data_url,
symbol,
fetch_from,
fetch_to,
)
.await;
}
}
}),
);
}
}
Action::Purge => {
for symbol in &symbols {
if let Some(job) = backfill_jobs.remove(symbol) {
if !job.is_finished() {
job.abort();
}
job.await.unwrap_err();
}
}
let backfills_future = database::backfills::delete_where_symbols(
&app_config.clickhouse_client,
&thread_type,
&symbols,
);
let data_future = async {
match thread_type {
ThreadType::Bars(_) => {
database::bars::delete_where_symbols(
&app_config.clickhouse_client,
&symbols,
)
.await;
}
ThreadType::News => {
database::news::delete_where_symbols(
&app_config.clickhouse_client,
&symbols,
)
.await;
}
}
};
join!(backfills_future, data_future);
}
}
message.response.send(()).unwrap();
}
async fn queue_backfill(
app_config: &Arc<Config>,
thread_type: ThreadType,
symbol: &String,
) -> (OffsetDateTime, OffsetDateTime) {
let latest_backfill = database::backfills::select_latest_where_symbol(
&app_config.clickhouse_client,
&thread_type,
&symbol,
)
.await;
let fetch_from = latest_backfill
.as_ref()
.map_or(OffsetDateTime::UNIX_EPOCH, |backfill| {
backfill.time + ONE_MINUTE
});
let fetch_to = last_minute();
if app_config.alpaca_source == Source::Iex {
let run_delay = duration_until(fetch_to + FIFTEEN_MINUTES + ONE_MINUTE);
info!(
"{:?} - Queing backfill for {} in {:?}.",
thread_type, symbol, run_delay
);
sleep(run_delay).await;
}
(fetch_from, fetch_to)
}
async fn execute_backfill_bars(
app_config: Arc<Config>,
thread_type: ThreadType,
data_url: String,
symbol: String,
fetch_from: OffsetDateTime,
fetch_to: OffsetDateTime,
) {
if fetch_from > fetch_to {
return;
}
info!("{:?} - Backfilling data for {}.", thread_type, symbol);
let mut bars = Vec::new();
let mut next_page_token = None;
loop {
let message = retry(ExponentialBackoff::default(), || async {
app_config.alpaca_rate_limit.until_ready().await;
app_config
.alpaca_client
.get(&data_url)
.query(&api::outgoing::bar::Bar::new(
vec![symbol.clone()],
ONE_MINUTE,
fetch_from,
fetch_to,
10000,
next_page_token.clone(),
))
.send()
.await?
.error_for_status()?
.json::<api::incoming::bar::Message>()
.await
.map_err(backoff::Error::Permanent)
})
.await;
let message = match message {
Ok(message) => message,
Err(e) => {
error!(
"{:?} - Failed to backfill data for {}: {}.",
thread_type, symbol, e
);
return;
}
};
message.bars.into_iter().for_each(|(symbol, bar_vec)| {
for bar in bar_vec {
bars.push(Bar::from((bar, symbol.clone())));
}
});
if message.next_page_token.is_none() {
break;
}
next_page_token = message.next_page_token;
}
if bars.is_empty() {
return;
}
let backfill = bars.last().unwrap().clone().into();
database::bars::upsert_batch(&app_config.clickhouse_client, bars).await;
database::backfills::upsert(&app_config.clickhouse_client, &thread_type, &backfill).await;
info!("{:?} - Backfilled data for {}.", thread_type, symbol);
}
async fn execute_backfill_news(
app_config: Arc<Config>,
thread_type: ThreadType,
data_url: String,
symbol: String,
fetch_from: OffsetDateTime,
fetch_to: OffsetDateTime,
) {
if fetch_from > fetch_to {
return;
}
info!("{:?} - Backfilling data for {}.", thread_type, symbol);
let mut news = Vec::new();
let mut next_page_token = None;
loop {
let message = retry(ExponentialBackoff::default(), || async {
app_config.alpaca_rate_limit.until_ready().await;
app_config
.alpaca_client
.get(&data_url)
.query(&api::outgoing::news::News::new(
vec![symbol.clone()],
fetch_from,
fetch_to,
50,
true,
false,
next_page_token.clone(),
))
.send()
.await?
.error_for_status()?
.json::<api::incoming::news::Message>()
.await
.map_err(backoff::Error::Permanent)
})
.await;
let message = match message {
Ok(message) => message,
Err(e) => {
error!(
"{:?} - Failed to backfill data for {}: {}.",
thread_type, symbol, e
);
return;
}
};
message.news.into_iter().for_each(|news_item| {
news.push(News::from(news_item));
});
if message.next_page_token.is_none() {
break;
}
next_page_token = message.next_page_token;
}
if news.is_empty() {
return;
}
let backfill = (news.last().unwrap().clone(), symbol.clone()).into();
database::news::upsert_batch(&app_config.clickhouse_client, news).await;
database::backfills::upsert(&app_config.clickhouse_client, &thread_type, &backfill).await;
info!("{:?} - Backfilled data for {}.", thread_type, symbol);
}

233
src/threads/data/mod.rs Normal file
View File

@@ -0,0 +1,233 @@
pub mod asset_status;
pub mod backfill;
pub mod websocket;
use super::clock;
use crate::{
config::{
Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_NEWS_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL,
},
types::{Asset, Class, Subset},
utils::authenticate,
};
use futures_util::StreamExt;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::{
join, select, spawn,
sync::{mpsc, Mutex, RwLock},
};
use tokio_tungstenite::connect_async;
pub struct Guard {
pub symbols: HashSet<String>,
pub pending_subscriptions: HashMap<String, Asset>,
pub pending_unsubscriptions: HashMap<String, Asset>,
}
impl Guard {
pub fn new() -> Self {
Self {
symbols: HashSet::new(),
pending_subscriptions: HashMap::new(),
pending_unsubscriptions: HashMap::new(),
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum ThreadType {
Bars(Class),
News,
}
pub async fn run(
app_config: Arc<Config>,
mut asset_receiver: mpsc::Receiver<asset_status::Message>,
mut clock_receiver: mpsc::Receiver<clock::Message>,
) {
let (bars_us_equity_asset_status_sender, bars_us_equity_backfill_sender) =
init_thread(app_config.clone(), ThreadType::Bars(Class::UsEquity)).await;
let (bars_crypto_asset_status_sender, bars_crypto_backfill_sender) =
init_thread(app_config.clone(), ThreadType::Bars(Class::Crypto)).await;
let (news_asset_status_sender, news_backfill_sender) =
init_thread(app_config.clone(), ThreadType::News).await;
loop {
select! {
Some(asset_message) = asset_receiver.recv() => {
let bars_us_equity_asset_status_sender = bars_us_equity_asset_status_sender.clone();
let bars_crypto_asset_status_sender = bars_crypto_asset_status_sender.clone();
let news_asset_status_sender = news_asset_status_sender.clone();
spawn(handle_asset_message(
bars_us_equity_asset_status_sender,
bars_crypto_asset_status_sender,
news_asset_status_sender,
asset_message,
));
}
Some(_) = clock_receiver.recv() => {
let bars_us_equity_backfill_sender = bars_us_equity_backfill_sender.clone();
let bars_crypto_backfill_sender = bars_crypto_backfill_sender.clone();
let news_backfill_sender = news_backfill_sender.clone();
spawn(handle_clock_message(
bars_us_equity_backfill_sender,
bars_crypto_backfill_sender,
news_backfill_sender,
));
}
else => {
panic!("Communication channel unexpectedly closed.")
}
}
}
}
async fn init_thread(
app_config: Arc<Config>,
thread_type: ThreadType,
) -> (
mpsc::Sender<asset_status::Message>,
mpsc::Sender<backfill::Message>,
) {
let guard = Arc::new(RwLock::new(Guard::new()));
let websocket_url = match thread_type {
ThreadType::Bars(Class::UsEquity) => format!(
"{}/{}",
ALPACA_STOCK_WEBSOCKET_URL, &app_config.alpaca_source
),
ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_WEBSOCKET_URL.into(),
ThreadType::News => ALPACA_NEWS_WEBSOCKET_URL.into(),
};
let (websocket, _) = connect_async(websocket_url).await.unwrap();
let (mut websocket_sender, mut websocket_receiver) = websocket.split();
authenticate(&app_config, &mut websocket_sender, &mut websocket_receiver).await;
let websocket_sender = Arc::new(Mutex::new(websocket_sender));
let (asset_status_sender, asset_status_receiver) = mpsc::channel(100);
spawn(asset_status::run(
app_config.clone(),
thread_type,
guard.clone(),
asset_status_receiver,
websocket_sender.clone(),
));
let (backfill_sender, backfill_receiver) = mpsc::channel(100);
spawn(backfill::run(
app_config.clone(),
thread_type,
guard.clone(),
backfill_receiver,
));
spawn(websocket::run(
app_config.clone(),
thread_type,
guard.clone(),
websocket_sender,
websocket_receiver,
backfill_sender.clone(),
));
(asset_status_sender, backfill_sender)
}
async fn handle_asset_message(
bars_us_equity_asset_status_sender: mpsc::Sender<asset_status::Message>,
bars_crypto_asset_status_sender: mpsc::Sender<asset_status::Message>,
news_asset_status_sender: mpsc::Sender<asset_status::Message>,
asset_status_message: asset_status::Message,
) {
let (us_equity_assets, crypto_assets): (Vec<_>, Vec<_>) = asset_status_message
.assets
.clone()
.into_iter()
.partition(|asset| asset.class == Class::UsEquity);
let bars_us_equity_future = async {
if !us_equity_assets.is_empty() {
let (bars_us_equity_asset_status_message, bars_us_equity_asset_status_receiver) =
asset_status::Message::new(asset_status_message.action.clone(), us_equity_assets);
bars_us_equity_asset_status_sender
.send(bars_us_equity_asset_status_message)
.await
.unwrap();
bars_us_equity_asset_status_receiver.await.unwrap();
}
};
let bars_crypto_future = async {
if !crypto_assets.is_empty() {
let (crypto_asset_status_message, crypto_asset_status_receiver) =
asset_status::Message::new(asset_status_message.action.clone(), crypto_assets);
bars_crypto_asset_status_sender
.send(crypto_asset_status_message)
.await
.unwrap();
crypto_asset_status_receiver.await.unwrap();
}
};
let news_future = async {
if !asset_status_message.assets.is_empty() {
let (news_asset_status_message, news_asset_status_receiver) =
asset_status::Message::new(
asset_status_message.action.clone(),
asset_status_message.assets,
);
news_asset_status_sender
.send(news_asset_status_message)
.await
.unwrap();
news_asset_status_receiver.await.unwrap();
}
};
join!(bars_us_equity_future, bars_crypto_future, news_future);
asset_status_message.response.send(()).unwrap();
}
async fn handle_clock_message(
bars_us_equity_backfill_sender: mpsc::Sender<backfill::Message>,
bars_crypto_backfill_sender: mpsc::Sender<backfill::Message>,
news_backfill_sender: mpsc::Sender<backfill::Message>,
) {
let bars_us_equity_future = async {
let (bars_us_equity_backfill_message, bars_us_equity_backfill_receiver) =
backfill::Message::new(backfill::Action::Backfill, Subset::All);
bars_us_equity_backfill_sender
.send(bars_us_equity_backfill_message)
.await
.unwrap();
bars_us_equity_backfill_receiver.await.unwrap();
};
let bars_crypto_future = async {
let (bars_crypto_backfill_message, bars_crypto_backfill_receiver) =
backfill::Message::new(backfill::Action::Backfill, Subset::All);
bars_crypto_backfill_sender
.send(bars_crypto_backfill_message)
.await
.unwrap();
bars_crypto_backfill_receiver.await.unwrap();
};
let news_future = async {
let (news_backfill_message, news_backfill_receiver) =
backfill::Message::new(backfill::Action::Backfill, Subset::All);
news_backfill_sender
.send(news_backfill_message)
.await
.unwrap();
news_backfill_receiver.await.unwrap();
};
join!(bars_us_equity_future, bars_crypto_future, news_future);
}

View File

@@ -0,0 +1,217 @@
use super::{backfill, Guard, ThreadType};
use crate::{
config::Config,
database,
types::{alpaca::websocket, Bar, News, Subset},
};
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use log::{error, info, warn};
use serde_json::from_str;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::{
join,
net::TcpStream,
spawn,
sync::{mpsc, Mutex, RwLock},
};
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
pub async fn run(
app_config: Arc<Config>,
thread_type: ThreadType,
guard: Arc<RwLock<Guard>>,
websocket_sender: Arc<
Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>,
>,
mut websocket_receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
backfill_sender: mpsc::Sender<backfill::Message>,
) {
loop {
let app_config = app_config.clone();
let guard = guard.clone();
let websocket_sender = websocket_sender.clone();
let backfill_sender = backfill_sender.clone();
let message = websocket_receiver.next().await.unwrap().unwrap();
spawn(handle_websocket_message(
app_config,
thread_type,
guard,
websocket_sender,
backfill_sender,
message,
));
}
}
async fn handle_websocket_message(
app_config: Arc<Config>,
thread_type: ThreadType,
guard: Arc<RwLock<Guard>>,
websocket_sender: Arc<
Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>,
>,
backfill_sender: mpsc::Sender<backfill::Message>,
message: tungstenite::Message,
) {
match message {
tungstenite::Message::Text(message) => {
let message = from_str::<Vec<websocket::incoming::Message>>(&message);
if let Ok(message) = message {
for message in message {
let app_config = app_config.clone();
let guard = guard.clone();
let backfill_sender = backfill_sender.clone();
spawn(handle_parsed_websocket_message(
app_config,
thread_type,
guard,
backfill_sender,
message,
));
}
} else {
error!(
"{:?} - Failed to deserialize websocket message: {:?}",
thread_type, message
);
}
}
tungstenite::Message::Ping(_) => {
websocket_sender
.lock()
.await
.send(tungstenite::Message::Pong(vec![]))
.await
.unwrap();
}
_ => error!(
"{:?} - Unexpected websocket message: {:?}",
thread_type, message
),
}
}
#[allow(clippy::significant_drop_tightening)]
async fn handle_parsed_websocket_message(
app_config: Arc<Config>,
thread_type: ThreadType,
guard: Arc<RwLock<Guard>>,
backfill_sender: mpsc::Sender<backfill::Message>,
message: websocket::incoming::Message,
) {
match message {
websocket::incoming::Message::Subscription(message) => {
let symbols = match message {
websocket::incoming::subscription::Message::Market(message) => message.bars,
websocket::incoming::subscription::Message::News(message) => message.news,
};
let mut guard = guard.write().await;
let newly_subscribed = guard
.pending_subscriptions
.extract_if(|symbol, _| symbols.contains(symbol))
.collect::<HashMap<_, _>>();
let newly_unsubscribed = guard
.pending_unsubscriptions
.extract_if(|symbol, _| !symbols.contains(symbol))
.collect::<HashMap<_, _>>();
drop(guard);
let newly_subscribed_future = async {
if !newly_subscribed.is_empty() {
info!(
"{:?} - Subscribed to {:?}.",
thread_type,
newly_subscribed.keys().collect::<Vec<_>>()
);
let (backfill_message, backfill_receiver) = backfill::Message::new(
backfill::Action::Backfill,
Subset::Some(newly_subscribed.into_values().collect::<Vec<_>>()),
);
backfill_sender.send(backfill_message).await.unwrap();
backfill_receiver.await.unwrap();
}
};
let newly_unsubscribed_future = async {
if !newly_unsubscribed.is_empty() {
info!(
"{:?} - Unsubscribed from {:?}.",
thread_type,
newly_unsubscribed.keys().collect::<Vec<_>>()
);
let (purge_message, purge_receiver) = backfill::Message::new(
backfill::Action::Purge,
Subset::Some(newly_unsubscribed.into_values().collect::<Vec<_>>()),
);
backfill_sender.send(purge_message).await.unwrap();
purge_receiver.await.unwrap();
}
};
join!(newly_subscribed_future, newly_unsubscribed_future);
}
websocket::incoming::Message::Bar(message)
| websocket::incoming::Message::UpdatedBar(message) => {
let bar = Bar::from(message);
let guard = guard.read().await;
if guard.symbols.get(&bar.symbol).is_none() {
warn!(
"{:?} - Race condition: received bar for unsubscribed symbol: {:?}.",
thread_type, bar.symbol
);
return;
}
info!(
"{:?} - Received bar for {}: {}.",
thread_type, bar.symbol, bar.time
);
database::bars::upsert(&app_config.clickhouse_client, &bar).await;
}
websocket::incoming::Message::News(message) => {
let news = News::from(message);
let symbols = news.symbols.clone().into_iter().collect::<HashSet<_>>();
let guard = guard.read().await;
if !guard.symbols.iter().any(|symbol| symbols.contains(symbol)) {
warn!(
"{:?} - Race condition: received news for unsubscribed symbols: {:?}.",
thread_type, news.symbols
);
return;
}
info!(
"{:?} - Received news for {:?}: {}.",
thread_type, news.symbols, news.time_created
);
database::news::upsert(&app_config.clickhouse_client, &news).await;
}
websocket::incoming::Message::Success(_) => {}
websocket::incoming::Message::Error(message) => {
error!(
"{:?} - Received error message: {}.",
thread_type, message.message
);
}
}
}

2
src/threads/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod clock;
pub mod data;

View File

@@ -0,0 +1,3 @@
pub mod subset;
pub use subset::Subset;

View File

@@ -0,0 +1,5 @@
#[derive(Clone, Debug)]
pub enum Subset<T> {
Some(Vec<T>),
All,
}

View File

@@ -1,4 +1,4 @@
use crate::types::alpaca::api::impl_from_enum;
use crate::types::{self, alpaca::api::impl_from_enum};
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
@@ -8,7 +8,7 @@ pub enum Class {
Crypto,
}
impl_from_enum!(crate::types::Class, Class, UsEquity, Crypto);
impl_from_enum!(types::Class, Class, UsEquity, Crypto);
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
@@ -24,7 +24,7 @@ pub enum Exchange {
}
impl_from_enum!(
crate::types::Exchange,
types::Exchange,
Exchange,
Amex,
Arca,
@@ -61,10 +61,11 @@ pub struct Asset {
pub attributes: Option<Vec<String>>,
}
impl From<Asset> for crate::types::Asset {
impl From<Asset> for types::Asset {
fn from(item: Asset) -> Self {
Self {
symbol: item.symbol,
symbol: item.symbol.clone(),
abbreviation: item.symbol.replace('/', ""),
class: item.class.into(),
exchange: item.exchange.into(),
time_added: time::OffsetDateTime::now_utc(),

View File

@@ -1,3 +1,4 @@
use crate::types;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use time::OffsetDateTime;
@@ -23,7 +24,7 @@ pub struct Bar {
pub vwap: f64,
}
impl From<(Bar, String)> for crate::types::Bar {
impl From<(Bar, String)> for types::Bar {
fn from((bar, symbol): (Bar, String)) -> Self {
Self {
time: bar.time,
@@ -41,6 +42,6 @@ impl From<(Bar, String)> for crate::types::Bar {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Message {
pub bars: HashMap<String, Option<Vec<Bar>>>,
pub bars: HashMap<String, Vec<Bar>>,
pub next_page_token: Option<String>,
}

View File

@@ -1,3 +1,4 @@
pub mod asset;
pub mod bar;
pub mod clock;
pub mod news;

View File

@@ -0,0 +1,64 @@
use crate::types;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use time::OffsetDateTime;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ImageSize {
Thumb,
Small,
Large,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Image {
pub size: ImageSize,
pub url: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde_as]
pub struct News {
pub id: i64,
#[serde(with = "time::serde::rfc3339")]
#[serde(rename = "created_at")]
pub time_created: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
#[serde(rename = "updated_at")]
pub time_updated: OffsetDateTime,
pub symbols: Vec<String>,
pub headline: String,
pub author: String,
#[serde_as(as = "NoneAsEmptyString")]
pub source: Option<String>,
#[serde_as(as = "NoneAsEmptyString")]
pub summary: Option<String>,
#[serde_as(as = "NoneAsEmptyString")]
pub content: Option<String>,
#[serde_as(as = "NoneAsEmptyString")]
pub url: Option<String>,
pub images: Vec<Image>,
}
impl From<News> for types::News {
fn from(news: News) -> Self {
Self {
id: news.id,
time_created: news.time_created,
time_updated: news.time_updated,
symbols: news.symbols,
headline: news.headline,
author: news.author,
source: news.source,
summary: news.summary,
url: news.url,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Message {
pub news: Vec<News>,
pub next_page_token: Option<String>,
}

View File

@@ -1,15 +1,8 @@
use serde::{Serialize, Serializer};
use super::serialize_symbols;
use serde::Serialize;
use std::time::Duration;
use time::OffsetDateTime;
fn serialize_symbols<S>(symbols: &[String], serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let string = symbols.join(",");
serializer.serialize_str(&string)
}
fn serialize_timeframe<S>(timeframe: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,

View File

@@ -1 +1,12 @@
pub mod bar;
pub mod news;
use serde::Serializer;
fn serialize_symbols<S>(symbols: &[String], serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let string = symbols.join(",");
serializer.serialize_str(&string)
}

View File

@@ -0,0 +1,40 @@
use super::serialize_symbols;
use serde::Serialize;
use time::OffsetDateTime;
#[derive(Serialize)]
pub struct News {
#[serde(serialize_with = "serialize_symbols")]
pub symbols: Vec<String>,
#[serde(with = "time::serde::rfc3339")]
pub start: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
pub end: OffsetDateTime,
pub limit: i64,
pub include_content: bool,
pub exclude_contentless: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub page_token: Option<String>,
}
impl News {
pub const fn new(
symbols: Vec<String>,
start: OffsetDateTime,
end: OffsetDateTime,
limit: i64,
include_content: bool,
exclude_contentless: bool,
page_token: Option<String>,
) -> Self {
Self {
symbols,
start,
end,
limit,
include_content,
exclude_contentless,
page_token,
}
}
}

View File

@@ -1,2 +0,0 @@
pub mod incoming;
pub mod outgoing;

View File

@@ -1,17 +0,0 @@
use serde::Serialize;
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Message {
bars: Vec<String>,
updated_bars: Vec<String>,
}
impl Message {
pub fn new(symbols: Vec<String>) -> Self {
Self {
bars: symbols.clone(),
updated_bars: symbols,
}
}
}

View File

@@ -1,3 +1,4 @@
use crate::types;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
@@ -24,7 +25,7 @@ pub struct Message {
pub vwap: f64,
}
impl From<Message> for crate::types::Bar {
impl From<Message> for types::Bar {
fn from(bar: Message) -> Self {
Self {
time: bar.time,

View File

@@ -0,0 +1,9 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Message {
pub code: u16,
#[serde(rename = "msg")]
pub message: String,
}

View File

@@ -1,4 +1,6 @@
pub mod bar;
pub mod error;
pub mod news;
pub mod subscription;
pub mod success;
@@ -12,7 +14,11 @@ pub enum Message {
#[serde(rename = "subscription")]
Subscription(subscription::Message),
#[serde(rename = "b")]
Bars(bar::Message),
Bar(bar::Message),
#[serde(rename = "u")]
UpdatedBars(bar::Message),
UpdatedBar(bar::Message),
#[serde(rename = "n")]
News(news::Message),
#[serde(rename = "error")]
Error(error::Message),
}

View File

@@ -0,0 +1,43 @@
use crate::types;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use time::OffsetDateTime;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde_as]
pub struct Message {
pub id: i64,
#[serde(with = "time::serde::rfc3339")]
#[serde(rename = "created_at")]
pub time_created: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
#[serde(rename = "updated_at")]
pub time_updated: OffsetDateTime,
pub symbols: Vec<String>,
pub headline: String,
pub author: String,
#[serde_as(as = "NoneAsEmptyString")]
pub source: Option<String>,
#[serde_as(as = "NoneAsEmptyString")]
pub summary: Option<String>,
#[serde_as(as = "NoneAsEmptyString")]
pub content: Option<String>,
#[serde_as(as = "NoneAsEmptyString")]
pub url: Option<String>,
}
impl From<Message> for types::News {
fn from(news: Message) -> Self {
Self {
id: news.id,
time_created: news.time_created,
time_updated: news.time_updated,
symbols: news.symbols,
headline: news.headline,
author: news.author,
source: news.source,
summary: news.summary,
url: news.url,
}
}
}

View File

@@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Message {
pub struct MarketMessage {
pub trades: Vec<String>,
pub quotes: Vec<String>,
pub bars: Vec<String>,
@@ -13,3 +13,16 @@ pub struct Message {
pub lulds: Option<Vec<String>>,
pub cancel_errors: Option<Vec<String>>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NewsMessage {
pub news: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Message {
Market(MarketMessage),
News(NewsMessage),
}

View File

@@ -1 +1,2 @@
pub mod data;
pub mod incoming;
pub mod outgoing;

View File

@@ -0,0 +1,36 @@
use serde::Serialize;
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct MarketMessage {
bars: Vec<String>,
updated_bars: Vec<String>,
}
impl MarketMessage {
pub fn new(symbols: Vec<String>) -> Self {
Self {
bars: symbols.clone(),
updated_bars: symbols,
}
}
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct NewsMessage {
news: Vec<String>,
}
impl NewsMessage {
pub fn new(symbols: Vec<String>) -> Self {
Self { news: symbols }
}
}
#[derive(Serialize)]
#[serde(untagged)]
pub enum Message {
Market(MarketMessage),
News(NewsMessage),
}

View File

@@ -1,4 +1,3 @@
use crate::config::{ALPACA_CRYPTO_DATA_URL, ALPACA_STOCK_DATA_URL};
use clickhouse::Row;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
@@ -11,15 +10,6 @@ pub enum Class {
Crypto = 2,
}
impl Class {
pub const fn get_data_url(self) -> &'static str {
match self {
Self::UsEquity => ALPACA_STOCK_DATA_URL,
Self::Crypto => ALPACA_CRYPTO_DATA_URL,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
pub enum Exchange {
@@ -36,6 +26,7 @@ pub enum Exchange {
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Row)]
pub struct Asset {
pub symbol: String,
pub abbreviation: String,
pub class: Class,
pub exchange: Exchange,
#[serde(with = "clickhouse::serde::time::datetime")]

View File

@@ -1,4 +1,4 @@
use super::Bar;
use super::{Bar, News};
use clickhouse::Row;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
@@ -21,3 +21,9 @@ impl From<Bar> for Backfill {
Self::new(bar.symbol, bar.time)
}
}
impl From<(News, String)> for Backfill {
fn from((news, symbol): (News, String)) -> Self {
Self::new(symbol, news.time_created)
}
}

View File

@@ -1,10 +1,12 @@
pub mod algebraic;
pub mod alpaca;
pub mod asset;
pub mod backfill;
pub mod bar;
pub mod state;
pub mod news;
pub use algebraic::Subset;
pub use asset::{Asset, Class, Exchange};
pub use backfill::Backfill;
pub use bar::Bar;
pub use state::BroadcastMessage;
pub use news::News;

23
src/types/news.rs Normal file
View File

@@ -0,0 +1,23 @@
use clickhouse::Row;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use time::OffsetDateTime;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Row)]
#[serde_as]
pub struct News {
pub id: i64,
#[serde(with = "clickhouse::serde::time::datetime")]
pub time_created: OffsetDateTime,
#[serde(with = "clickhouse::serde::time::datetime")]
pub time_updated: OffsetDateTime,
pub symbols: Vec<String>,
pub headline: String,
pub author: String,
#[serde_as(as = "NoneAsEmptyString")]
pub source: Option<String>,
#[serde_as(as = "NoneAsEmptyString")]
pub summary: Option<String>,
#[serde_as(as = "NoneAsEmptyString")]
pub url: Option<String>,
}

View File

@@ -1,7 +0,0 @@
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum BroadcastMessage {
Add,
Backfill,
Delete,
Purge,
}

View File

@@ -1,5 +0,0 @@
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum BroadcastMessage {
Open,
Close,
}

View File

@@ -1,10 +0,0 @@
use crate::types::Asset;
pub mod asset;
pub mod clock;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum BroadcastMessage {
Asset((asset::BroadcastMessage, Vec<Asset>)),
Clock(clock::BroadcastMessage),
}

View File

@@ -1,13 +1,11 @@
use crate::database;
use clickhouse::Client;
use tokio::join;
pub async fn cleanup(clickhouse_client: &Client) {
let assets = database::assets::select(clickhouse_client).await;
let symbols = assets
.iter()
.map(|asset| asset.symbol.clone())
.collect::<Vec<_>>();
let bars_future = database::bars::cleanup(clickhouse_client);
let news_future = database::news::cleanup(clickhouse_client);
let backfills_future = database::backfills::cleanup(clickhouse_client);
database::bars::delete_where_not_symbols(clickhouse_client, &symbols).await;
database::backfills::delete_where_not_symbols(clickhouse_client, &symbols).await;
join!(bars_future, news_future, backfills_future);
}

View File

@@ -1,5 +1,7 @@
pub mod cleanup;
pub mod time;
pub mod websocket;
pub use cleanup::cleanup;
pub use time::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE};
pub use websocket::authenticate;

51
src/utils/websocket.rs Normal file
View File

@@ -0,0 +1,51 @@
use crate::{config::Config, types::alpaca::websocket};
use core::panic;
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use serde_json::{from_str, to_string};
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
pub async fn authenticate(
app_config: &Arc<Config>,
sender: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
receiver: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) {
match receiver.next().await.unwrap().unwrap() {
Message::Text(data)
if from_str::<Vec<websocket::incoming::Message>>(&data)
.unwrap()
.first()
== Some(&websocket::incoming::Message::Success(
websocket::incoming::success::Message::Connected,
)) => {}
_ => panic!("Failed to connect to Alpaca websocket."),
}
sender
.send(Message::Text(
to_string(&websocket::outgoing::Message::Auth(
websocket::outgoing::auth::Message::new(
app_config.alpaca_api_key.clone(),
app_config.alpaca_api_secret.clone(),
),
))
.unwrap(),
))
.await
.unwrap();
match receiver.next().await.unwrap().unwrap() {
Message::Text(data)
if from_str::<Vec<websocket::incoming::Message>>(&data)
.unwrap()
.first()
== Some(&websocket::incoming::Message::Success(
websocket::incoming::success::Message::Authenticated,
)) => {}
_ => panic!("Failed to authenticate with Alpaca websocket."),
};
}