Merge live & historical handlers

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-09-10 16:56:27 +03:00
parent 8a88d58192
commit 687fbb909f
20 changed files with 231 additions and 217 deletions

476
src/data/market.rs Normal file
View File

@@ -0,0 +1,476 @@
use crate::{
config::{
Config, ALPACA_CRYPTO_DATA_URL, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_DATA_URL,
ALPACA_STOCK_WEBSOCKET_URL, ALPACA_TIMESTAMP_FORMAT,
},
database,
time::{duration_until, last_minute, next_30s, next_minute, ONE_MINUTE, THIRTY_SECONDS},
types::{
api,
asset::{self, Asset},
websocket, Bar, BroadcastMessage, Class,
},
};
use core::panic;
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use http::StatusCode;
use indexmap::IndexMap;
use log::{error, info, warn};
use serde_json::{from_str, to_string};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Instant,
};
use time::OffsetDateTime;
use tokio::{
net::TcpStream,
spawn,
sync::{
broadcast::{Receiver, Sender},
RwLock,
},
task::spawn_blocking,
time::{interval_at, sleep},
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
pub async fn run(
app_config: Arc<Config>,
class: Class,
asset_broadcast_sender: Sender<BroadcastMessage>,
) {
info!("Running live data threads for {:?}.", class);
let websocket_url = match class {
Class::UsEquity => format!(
"{}/{}",
ALPACA_STOCK_WEBSOCKET_URL, app_config.alpaca_source
),
Class::Crypto => ALPACA_CRYPTO_WEBSOCKET_URL.to_string(),
};
let (stream, _) = connect_async(websocket_url).await.unwrap();
let (mut sink, mut stream) = stream.split();
authenticate_websocket(&app_config, &mut stream, &mut sink).await;
let sink = Arc::new(RwLock::new(sink));
let backfilled = Arc::new(RwLock::new(HashMap::new()));
spawn(websocket_broadcast_handler(
class,
sink.clone(),
asset_broadcast_sender.subscribe(),
));
database::assets::select_where_class(&app_config.postgres_pool, class)
.await
.into_iter()
.for_each(|asset| {
asset_broadcast_sender
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Added(
asset,
)))
.unwrap();
});
spawn(null_handler(app_config.clone(), backfilled.clone()));
websocket_message_handler(app_config, class, stream, sink, backfilled).await;
unreachable!()
}
async fn authenticate_websocket(
app_config: &Arc<Config>,
stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
sink: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
match stream.next().await {
Some(Ok(Message::Text(data)))
if from_str::<Vec<websocket::incoming::Message>>(&data)
.unwrap()
.get(0)
== Some(&websocket::incoming::Message::Success(
websocket::incoming::success::Message {
msg: websocket::incoming::success::MessageType::Connected,
},
)) => {}
_ => panic!(),
}
sink.send(Message::Text(
to_string(&websocket::outgoing::Message::Auth(
websocket::outgoing::auth::Message::new(
app_config.alpaca_api_key.clone(),
app_config.alpaca_api_secret.clone(),
),
))
.unwrap(),
))
.await
.unwrap();
match stream.next().await {
Some(Ok(Message::Text(data)))
if from_str::<Vec<websocket::incoming::Message>>(&data)
.unwrap()
.get(0)
== Some(&websocket::incoming::Message::Success(
websocket::incoming::success::Message {
msg: websocket::incoming::success::MessageType::Authenticated,
},
)) => {}
_ => panic!(),
};
}
async fn websocket_broadcast_handler(
class: Class,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
mut asset_broadcast_receiver: Receiver<BroadcastMessage>,
) {
loop {
match asset_broadcast_receiver.recv().await.unwrap() {
BroadcastMessage::Asset(asset::BroadcastMessage::Added(asset))
if asset.class == class =>
{
sink.write()
.await
.send(Message::Text(
serde_json::to_string(&websocket::outgoing::Message::Subscribe(
websocket::outgoing::subscribe::Message::new(asset.clone().symbol),
))
.unwrap(),
))
.await
.unwrap();
}
BroadcastMessage::Asset(asset::BroadcastMessage::Deleted(asset))
if asset.class == class =>
{
sink.write()
.await
.send(Message::Text(
serde_json::to_string(&websocket::outgoing::Message::Unsubscribe(
websocket::outgoing::subscribe::Message::new(asset.clone().symbol),
))
.unwrap(),
))
.await
.unwrap();
}
BroadcastMessage::Asset(_) => {}
}
}
}
async fn websocket_message_handler(
app_config: Arc<Config>,
class: Class,
mut stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
backfilled: Arc<RwLock<HashMap<String, bool>>>,
) {
loop {
match stream.next().await {
Some(Ok(Message::Text(data))) => {
let parsed_data = from_str::<Vec<websocket::incoming::Message>>(&data);
if let Err(e) = &parsed_data {
warn!("Unparsed websocket::incoming message: {:?}: {}", data, e);
}
for message in parsed_data.unwrap_or_default() {
websocket_handle_text_message(&app_config, class, message, &backfilled).await;
}
}
Some(Ok(Message::Ping(_))) => sink
.write()
.await
.send(Message::Pong(vec![]))
.await
.unwrap(),
Some(unknown) => error!("Unknown websocket::incoming message: {:?}", unknown),
None => panic!(),
}
}
}
async fn websocket_handle_text_message(
app_config: &Arc<Config>,
class: Class,
message: websocket::incoming::Message,
backfilled: &Arc<RwLock<HashMap<String, bool>>>,
) {
match message {
websocket::incoming::Message::Subscription(subscription_message) => {
let old_assets = backfilled
.read()
.await
.keys()
.cloned()
.collect::<HashSet<_>>();
let new_assets = subscription_message
.bars
.into_iter()
.collect::<HashSet<_>>();
let added_assets = new_assets.difference(&old_assets).collect::<HashSet<_>>();
let deleted_assets = old_assets.difference(&new_assets).collect::<HashSet<_>>();
for asset_symbol in &added_assets {
let asset =
database::assets::select_where_symbol(&app_config.postgres_pool, asset_symbol)
.await
.unwrap();
backfilled.write().await.insert(asset.symbol.clone(), false);
spawn(backfill(
app_config.clone(),
asset.clone(),
backfilled.clone(),
));
}
for asset_symbol in &deleted_assets {
backfilled.write().await.remove(*asset_symbol);
}
info!(
"Subscription update for {:?}: {:?} added, {:?} deleted.",
class, added_assets, deleted_assets
);
}
websocket::incoming::Message::Bars(bar_message) => {
let bar = Bar::from(bar_message);
info!(
"websocket::Incoming bar for {}: {}",
bar.asset_symbol, bar.timestamp
);
database::bars::upsert(
&app_config.postgres_pool,
&bar,
backfilled.read().await[&bar.asset_symbol],
)
.await;
}
websocket::incoming::Message::UpdatedBars(bar_message) => {
let bar = Bar::from(bar_message);
info!(
"websocket::Incoming bar for {}: {}",
bar.asset_symbol, bar.timestamp
);
let transaction = app_config.postgres_pool.begin().await.unwrap();
let backfilled_asset_symbol = backfilled.read().await[&bar.asset_symbol];
database::bars::upsert(&app_config.postgres_pool, &bar, backfilled_asset_symbol).await;
if backfilled_asset_symbol {
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&bar.asset_symbol,
&bar.timestamp,
)
.await;
}
transaction.commit().await.unwrap();
}
websocket::incoming::Message::Success(_) => {}
}
}
#[allow(clippy::significant_drop_in_scrutinee)]
async fn null_handler(app_config: Arc<Config>, backfilled: Arc<RwLock<HashMap<String, bool>>>) {
#[derive(PartialEq)]
enum NullHandlerState {
Bars,
UpdatedBars,
}
let next_30s = next_30s();
let mut state = if next_30s.unix_timestamp() % 30 == 0 {
NullHandlerState::Bars
} else {
NullHandlerState::UpdatedBars
};
let mut interval = interval_at(
(Instant::now() + duration_until(next_30s)).into(),
THIRTY_SECONDS,
);
loop {
interval.tick().await;
let timestamp = last_minute() - ONE_MINUTE;
let backfilled = backfilled.read().await;
for asset_symbol in backfilled.keys().cloned() {
let bar = Bar::empty(timestamp, asset_symbol);
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::insert_or_skip(
&app_config.postgres_pool,
&bar,
backfilled[&bar.asset_symbol],
)
.await;
if backfilled[&bar.asset_symbol] && state == NullHandlerState::Bars {
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&bar.asset_symbol,
&bar.timestamp,
)
.await;
}
transaction.commit().await.unwrap();
}
state = match state {
NullHandlerState::Bars => NullHandlerState::UpdatedBars,
NullHandlerState::UpdatedBars => NullHandlerState::Bars,
};
}
}
pub async fn backfill(
app_config: Arc<Config>,
asset: Asset,
backfilled: Arc<RwLock<HashMap<String, bool>>>,
) {
info!("Backfilling historical data for {}...", asset.symbol);
let task_run_offsetdatetime = next_minute() + app_config.alpaca_historical_offset;
let fetch_from = asset.timestamp_last + ONE_MINUTE;
let fetch_until = task_run_offsetdatetime - app_config.alpaca_historical_offset - ONE_MINUTE;
if fetch_from > fetch_until {
return;
}
let wait_duration = task_run_offsetdatetime - OffsetDateTime::now_utc();
if wait_duration.is_positive() {
sleep(wait_duration.unsigned_abs()).await;
}
let bars = backfill_bars_from_api(&app_config, &asset, fetch_from, fetch_until).await;
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await;
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&asset.symbol,
&fetch_until,
)
.await;
derive_recent_nulls(&app_config, &asset, &fetch_until, &backfilled).await;
transaction.commit().await.unwrap();
info!("Backfilled historical data for {}.", asset.symbol);
}
fn generate_per_minute_bars(
from: OffsetDateTime,
until: OffsetDateTime,
asset: &Asset,
) -> IndexMap<OffsetDateTime, Bar> {
let mut bars = IndexMap::new();
let mut current_time = from;
while current_time <= until {
bars.insert(current_time, Bar::empty(current_time, asset.symbol.clone()));
current_time += ONE_MINUTE;
}
bars
}
async fn backfill_bars_from_api(
app_config: &Arc<Config>,
asset: &Asset,
from: OffsetDateTime,
until: OffsetDateTime,
) -> Vec<Bar> {
let asset_clone = asset.clone();
let mut bars = spawn_blocking(move || generate_per_minute_bars(from, until, &asset_clone))
.await
.unwrap();
let mut next_page_token = None;
loop {
let request = app_config
.alpaca_client
.get(match asset.class {
Class::UsEquity => ALPACA_STOCK_DATA_URL,
Class::Crypto => ALPACA_CRYPTO_DATA_URL,
})
.query(&[
("symbols", &asset.symbol),
("timeframe", &String::from("1Min")),
(
"start",
&from.format(ALPACA_TIMESTAMP_FORMAT).unwrap().to_string(),
),
(
"end",
&until.format(ALPACA_TIMESTAMP_FORMAT).unwrap().to_string(),
),
("limit", &String::from("10000")),
("page_token", &next_page_token.clone().unwrap_or_default()),
]);
app_config.alpaca_rate_limit.until_ready().await;
let response = request.send().await.unwrap();
let mut response = if response.status() == StatusCode::OK {
response
.json::<api::incoming::bar::Message>()
.await
.unwrap()
} else {
error!(
"Failed to backfill historical data for {} from {} to {}: {}",
asset.symbol,
from,
until,
response.text().await.unwrap()
);
break;
};
for bar in response
.bars
.remove(&asset.symbol)
.unwrap_or_default()
.unwrap_or_default()
{
bars.insert(bar.timestamp, Bar::from((bar, asset.symbol.clone())));
}
if response.next_page_token.is_none() {
break;
}
next_page_token = response.next_page_token;
}
bars.into_values().collect::<Vec<Bar>>()
}
#[allow(clippy::significant_drop_tightening)]
async fn derive_recent_nulls(
app_config: &Arc<Config>,
asset: &Asset,
from: &OffsetDateTime,
backfilled: &Arc<RwLock<HashMap<String, bool>>>,
) {
let mut backfilled = backfilled.write().await;
let bars = database::bars::select_where_symbol_where_timestamp_larger_than(
&app_config.postgres_pool,
&asset.symbol,
from,
)
.await;
database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await;
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&asset.symbol,
&bars.last().unwrap().timestamp,
)
.await;
backfilled.insert(asset.symbol.clone(), true);
}