Add asynchronous handling

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-01-16 21:02:10 +00:00
parent 3ee72a0e1b
commit 36ee6030ce
6 changed files with 150 additions and 96 deletions

View File

@@ -6,10 +6,7 @@ use crate::{
data::authenticate_websocket, data::authenticate_websocket,
database, database,
types::{ types::{
alpaca::{ alpaca::{api, websocket, Source},
api::{incoming, outgoing},
websocket, Source,
},
asset::{self, Asset}, asset::{self, Asset},
Bar, BarValidity, BroadcastMessage, Class, Bar, BarValidity, BroadcastMessage, Class,
}, },
@@ -63,16 +60,13 @@ pub async fn run(
broadcast_sender.subscribe(), broadcast_sender.subscribe(),
)); ));
database::assets::select_where_class(&app_config.clickhouse_client, class) let assets = database::assets::select_where_class(&app_config.clickhouse_client, class).await;
.await
.into_iter()
.for_each(|asset| {
broadcast_sender broadcast_sender
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Added( .send(BroadcastMessage::Asset((
asset, asset::BroadcastMessage::Added,
assets,
))) )))
.unwrap(); .unwrap();
});
websocket_handler(app_config, class, stream, sink).await; websocket_handler(app_config, class, stream, sink).await;
@@ -86,39 +80,41 @@ async fn broadcast_handler(
) { ) {
loop { loop {
match broadcast_receiver.recv().await.unwrap() { match broadcast_receiver.recv().await.unwrap() {
BroadcastMessage::Asset(asset::BroadcastMessage::Added(asset)) BroadcastMessage::Asset((action, assets)) => {
if asset.class == class => let symbols = assets
{ .into_iter()
.filter(|asset| asset.class == class)
.map(|asset| asset.symbol)
.collect::<Vec<_>>();
if symbols.is_empty() {
continue;
}
sink.write() sink.write()
.await .await
.send(Message::Text( .send(Message::Text(
serde_json::to_string(&websocket::data::outgoing::Message::Subscribe( serde_json::to_string(&match action {
asset::BroadcastMessage::Added => {
websocket::data::outgoing::Message::Subscribe(
websocket::data::outgoing::subscribe::Message::new( websocket::data::outgoing::subscribe::Message::new(
asset.clone().symbol, symbols.clone(),
), ),
)) )
}
asset::BroadcastMessage::Deleted => {
websocket::data::outgoing::Message::Unsubscribe(
websocket::data::outgoing::subscribe::Message::new(
symbols.clone(),
),
)
}
})
.unwrap(), .unwrap(),
)) ))
.await .await
.unwrap(); .unwrap();
} }
BroadcastMessage::Asset(asset::BroadcastMessage::Deleted(asset))
if asset.class == class =>
{
sink.write()
.await
.send(Message::Text(
serde_json::to_string(&websocket::data::outgoing::Message::Unsubscribe(
websocket::data::outgoing::subscribe::Message::new(
asset.clone().symbol,
),
))
.unwrap(),
))
.await
.unwrap();
}
BroadcastMessage::Asset(_) => {}
} }
} }
} }
@@ -143,7 +139,12 @@ async fn websocket_handler(
} }
for message in parsed_data.unwrap_or_default() { for message in parsed_data.unwrap_or_default() {
websocket_handle_message(&app_config, class, &backfilled, message).await; spawn(websocket_handle_message(
app_config.clone(),
class,
backfilled.clone(),
message,
));
} }
} }
Some(Ok(Message::Ping(_))) => sink Some(Ok(Message::Ping(_))) => sink
@@ -159,68 +160,75 @@ async fn websocket_handler(
} }
async fn websocket_handle_message( async fn websocket_handle_message(
app_config: &Arc<Config>, app_config: Arc<Config>,
class: Class, class: Class,
backfilled: &Arc<RwLock<HashMap<String, bool>>>, backfilled: Arc<RwLock<HashMap<String, bool>>>,
message: websocket::data::incoming::Message, message: websocket::data::incoming::Message,
) { ) {
match message { match message {
websocket::data::incoming::Message::Subscription(subscription_message) => { websocket::data::incoming::Message::Subscription(message) => {
let old_assets = backfilled let added_asset_symbols;
.read() let deleted_asset_symbols;
.await
.keys() {
let mut backfilled = backfilled.write().await;
let old_asset_sybols = backfilled.keys().cloned().collect::<HashSet<_>>();
let new_asset_symbols = message.bars.into_iter().collect::<HashSet<_>>();
added_asset_symbols = new_asset_symbols
.difference(&old_asset_sybols)
.cloned() .cloned()
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();
let new_assets = subscription_message
.bars for asset_symbol in &added_asset_symbols {
.into_iter() backfilled.insert(asset_symbol.clone(), false);
}
deleted_asset_symbols = old_asset_sybols
.difference(&new_asset_symbols)
.cloned()
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();
let added_assets = new_assets.difference(&old_assets).collect::<HashSet<_>>(); for asset_symbol in &deleted_asset_symbols {
let deleted_assets = old_assets.difference(&new_assets).collect::<HashSet<_>>(); backfilled.remove(asset_symbol);
}
for asset_symbol in &added_assets { drop(backfilled);
info!(
"Subscription update for {:?}: {:?} added, {:?} deleted.",
class, added_asset_symbols, deleted_asset_symbols
);
}
for asset_symbol in added_asset_symbols {
let asset = database::assets::select_where_symbol( let asset = database::assets::select_where_symbol(
&app_config.clickhouse_client, &app_config.clickhouse_client,
asset_symbol, &asset_symbol,
) )
.await .await
.unwrap(); .unwrap();
backfilled.write().await.insert(asset.symbol.clone(), false);
let bar_validity = BarValidity::none(asset.symbol.clone());
database::bars::insert_validity_if_not_exists( database::bars::insert_validity_if_not_exists(
&app_config.clickhouse_client, &app_config.clickhouse_client,
&bar_validity, &BarValidity::none(asset.symbol.clone()),
) )
.await; .await;
spawn(backfill( spawn(backfill(app_config.clone(), backfilled.clone(), asset));
app_config.clone(),
backfilled.clone(),
asset.clone(),
));
} }
for asset_symbol in &deleted_assets { for asset_symbol in deleted_asset_symbols {
database::bars::delete_validity_where_symbol( database::bars::delete_validity_where_symbol(
&app_config.clickhouse_client, &app_config.clickhouse_client,
asset_symbol, &asset_symbol,
) )
.await; .await;
database::bars::delete_where_symbol(&app_config.clickhouse_client, asset_symbol) database::bars::delete_where_symbol(&app_config.clickhouse_client, &asset_symbol)
.await; .await;
backfilled.write().await.remove(*asset_symbol);
} }
info!(
"Subscription update for {:?}: {:?} added, {:?} deleted.",
class, added_assets, deleted_assets
);
} }
websocket::data::incoming::Message::Bars(bar_message) websocket::data::incoming::Message::Bars(bar_message)
| websocket::data::incoming::Message::UpdatedBars(bar_message) => { | websocket::data::incoming::Message::UpdatedBars(bar_message) => {
@@ -228,7 +236,7 @@ async fn websocket_handle_message(
info!("websocket::Incoming bar for {}: {}", bar.symbol, bar.time); info!("websocket::Incoming bar for {}: {}", bar.symbol, bar.time);
database::bars::upsert(&app_config.clickhouse_client, &bar).await; database::bars::upsert(&app_config.clickhouse_client, &bar).await;
if backfilled.read().await[&bar.symbol] { if *backfilled.read().await.get(&bar.symbol).unwrap() {
database::bars::upsert_validity(&app_config.clickhouse_client, &bar.into()).await; database::bars::upsert_validity(&app_config.clickhouse_client, &bar.into()).await;
} }
} }
@@ -255,7 +263,7 @@ pub async fn backfill(
.send() .send()
.await .await
.unwrap() .unwrap()
.json::<incoming::clock::Clock>() .json::<api::incoming::clock::Clock>()
.await .await
.unwrap(); .unwrap();
@@ -294,9 +302,9 @@ pub async fn backfill(
Class::UsEquity => ALPACA_STOCK_DATA_URL, Class::UsEquity => ALPACA_STOCK_DATA_URL,
Class::Crypto => ALPACA_CRYPTO_DATA_URL, Class::Crypto => ALPACA_CRYPTO_DATA_URL,
}) })
.query(&outgoing::bar::Bar::new( .query(&api::outgoing::bar::Bar::new(
vec![asset.symbol.clone()], vec![asset.symbol.clone()],
String::from("1Min"), ONE_MINUTE,
fetch_from, fetch_from,
fetch_until, fetch_until,
10000, 10000,
@@ -305,7 +313,7 @@ pub async fn backfill(
.send() .send()
.await .await
.unwrap() .unwrap()
.json::<incoming::bar::Message>() .json::<api::incoming::bar::Message>()
.await .await
.unwrap(); .unwrap();

View File

@@ -69,8 +69,9 @@ pub async fn add(
database::assets::upsert(&app_config.clickhouse_client, &asset).await; database::assets::upsert(&app_config.clickhouse_client, &asset).await;
broadcast_sender broadcast_sender
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Added( .send(BroadcastMessage::Asset((
asset.clone(), asset::BroadcastMessage::Added,
vec![asset.clone()],
))) )))
.unwrap(); .unwrap();
@@ -89,8 +90,9 @@ pub async fn delete(
.unwrap(); .unwrap();
broadcast_sender broadcast_sender
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Deleted( .send(BroadcastMessage::Asset((
asset, asset::BroadcastMessage::Deleted,
vec![asset.clone()],
))) )))
.unwrap(); .unwrap();

View File

@@ -1,10 +1,54 @@
use serde::{Deserialize, Serialize}; use std::time::Duration;
use serde::{Serialize, Serializer};
use time::OffsetDateTime; use time::OffsetDateTime;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] fn serialize_symbols<S>(symbols: &[String], serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let string = symbols.join(",");
serializer.serialize_str(&string)
}
fn serialize_timeframe<S>(timeframe: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mins = timeframe.as_secs() / 60;
if mins < 60 {
return serializer.serialize_str(&format!("{mins}Min"));
}
let hours = mins / 60;
if hours < 24 {
return serializer.serialize_str(&format!("{hours}Hour"));
}
let days = hours / 24;
if days == 1 {
return serializer.serialize_str("1Day");
}
let weeks = days / 7;
if weeks == 1 {
return serializer.serialize_str("1Week");
}
let months = days / 30;
if [1, 2, 3, 4, 6, 12].contains(&months) {
return serializer.serialize_str(&format!("{months}Month"));
};
Err(serde::ser::Error::custom("Invalid timeframe duration"))
}
#[derive(Serialize)]
pub struct Bar { pub struct Bar {
#[serde(serialize_with = "serialize_symbols")]
pub symbols: Vec<String>, pub symbols: Vec<String>,
pub timeframe: String, #[serde(serialize_with = "serialize_timeframe")]
pub timeframe: Duration,
#[serde(with = "time::serde::rfc3339")] #[serde(with = "time::serde::rfc3339")]
pub start: OffsetDateTime, pub start: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")] #[serde(with = "time::serde::rfc3339")]
@@ -15,9 +59,9 @@ pub struct Bar {
} }
impl Bar { impl Bar {
pub fn new( pub const fn new(
symbols: Vec<String>, symbols: Vec<String>,
timeframe: String, timeframe: Duration,
start: OffsetDateTime, start: OffsetDateTime,
end: OffsetDateTime, end: OffsetDateTime,
limit: i64, limit: i64,

View File

@@ -8,10 +8,10 @@ pub struct Message {
} }
impl Message { impl Message {
pub fn new(symbol: String) -> Self { pub fn new(symbols: Vec<String>) -> Self {
Self { Self {
bars: vec![symbol.clone()], bars: symbols.clone(),
updated_bars: vec![symbol], updated_bars: symbols,
} }
} }
} }

View File

@@ -34,6 +34,6 @@ pub struct Asset {
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub enum BroadcastMessage { pub enum BroadcastMessage {
Added(Asset), Added,
Deleted(Asset), Deleted,
} }

View File

@@ -7,5 +7,5 @@ pub use bar::{Bar, BarValidity};
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub enum BroadcastMessage { pub enum BroadcastMessage {
Asset(asset::BroadcastMessage), Asset((asset::BroadcastMessage, Vec<Asset>)),
} }