Fix the Sin of Man

- Migrate to ClickHouse
- Simplify serde renaming
- Simplify backfill logic
- Compartmentalize database columns

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-01-15 23:51:53 +00:00
parent 63a9ca950f
commit de3989ec35
45 changed files with 1120 additions and 2718 deletions

View File

@@ -5,11 +5,11 @@ use crate::{
},
data::authenticate_websocket,
database,
time::{duration_until, last_minute, next_30s, next_minute, ONE_MINUTE, THIRTY_SECONDS},
time::{duration_until, last_minute, next_minute, ONE_MINUTE},
types::{
api,
api::incoming,
asset::{self, Asset},
websocket, Bar, BroadcastMessage, Class,
websocket, Bar, BarValidity, BroadcastMessage, Class,
},
};
use core::panic;
@@ -17,16 +17,12 @@ use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use http::StatusCode;
use indexmap::IndexMap;
use log::{error, info, warn};
use serde_json::from_str;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Instant,
};
use time::OffsetDateTime;
use tokio::{
net::TcpStream,
spawn,
@@ -34,15 +30,14 @@ use tokio::{
broadcast::{Receiver, Sender},
RwLock,
},
task::spawn_blocking,
time::{interval_at, sleep},
time::sleep,
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
pub async fn run(
app_config: Arc<Config>,
class: Class,
asset_broadcast_sender: Sender<BroadcastMessage>,
broadcast_sender: Sender<BroadcastMessage>,
) {
info!("Running live data threads for {:?}.", class);
@@ -59,38 +54,35 @@ pub async fn run(
authenticate_websocket(&app_config, &mut stream, &mut sink).await;
let sink = Arc::new(RwLock::new(sink));
let backfilled = Arc::new(RwLock::new(HashMap::new()));
spawn(websocket_broadcast_handler(
spawn(broadcast_handler(
class,
sink.clone(),
asset_broadcast_sender.subscribe(),
broadcast_sender.subscribe(),
));
database::assets::select_where_class(&app_config.postgres_pool, class)
database::assets::select_where_class(&app_config.clickhouse_client, class)
.await
.into_iter()
.for_each(|asset| {
asset_broadcast_sender
broadcast_sender
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Added(
asset,
)))
.unwrap();
});
spawn(null_handler(app_config.clone(), backfilled.clone()));
websocket_message_handler(app_config, class, stream, sink, backfilled).await;
websocket_handler(app_config, class, stream, sink).await;
unreachable!()
}
async fn websocket_broadcast_handler(
async fn broadcast_handler(
class: Class,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
mut asset_broadcast_receiver: Receiver<BroadcastMessage>,
mut broadcast_receiver: Receiver<BroadcastMessage>,
) {
loop {
match asset_broadcast_receiver.recv().await.unwrap() {
match broadcast_receiver.recv().await.unwrap() {
BroadcastMessage::Asset(asset::BroadcastMessage::Added(asset))
if asset.class == class =>
{
@@ -128,13 +120,14 @@ async fn websocket_broadcast_handler(
}
}
async fn websocket_message_handler(
async fn websocket_handler(
app_config: Arc<Config>,
class: Class,
mut stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
backfilled: Arc<RwLock<HashMap<String, bool>>>,
) {
let backfilled = Arc::new(RwLock::new(HashMap::new()));
loop {
match stream.next().await {
Some(Ok(Message::Text(data))) => {
@@ -147,7 +140,7 @@ async fn websocket_message_handler(
}
for message in parsed_data.unwrap_or_default() {
websocket_handle_text_message(&app_config, class, message, &backfilled).await;
websocket_handle_message(&app_config, class, &backfilled, message).await;
}
}
Some(Ok(Message::Ping(_))) => sink
@@ -162,11 +155,11 @@ async fn websocket_message_handler(
}
}
async fn websocket_handle_text_message(
async fn websocket_handle_message(
app_config: &Arc<Config>,
class: Class,
message: websocket::data::incoming::Message,
backfilled: &Arc<RwLock<HashMap<String, bool>>>,
message: websocket::data::incoming::Message,
) {
match message {
websocket::data::incoming::Message::Subscription(subscription_message) => {
@@ -185,20 +178,35 @@ async fn websocket_handle_text_message(
let deleted_assets = old_assets.difference(&new_assets).collect::<HashSet<_>>();
for asset_symbol in &added_assets {
let asset =
database::assets::select_where_symbol(&app_config.postgres_pool, asset_symbol)
.await
.unwrap();
let asset = database::assets::select_where_symbol(
&app_config.clickhouse_client,
asset_symbol,
)
.await
.unwrap();
backfilled.write().await.insert(asset.symbol.clone(), false);
let bar_validity = BarValidity::none(asset.symbol.clone());
database::bars::upsert_validity(&app_config.clickhouse_client, &bar_validity).await;
spawn(backfill(
app_config.clone(),
asset.clone(),
backfilled.clone(),
asset.clone(),
));
}
for asset_symbol in &deleted_assets {
database::bars::delete_validity_where_symbol(
&app_config.clickhouse_client,
asset_symbol,
)
.await;
database::bars::delete_where_symbol(&app_config.clickhouse_client, asset_symbol)
.await;
backfilled.write().await.remove(*asset_symbol);
}
@@ -207,156 +215,45 @@ async fn websocket_handle_text_message(
class, added_assets, deleted_assets
);
}
websocket::data::incoming::Message::Bars(bar_message) => {
websocket::data::incoming::Message::Bars(bar_message)
| websocket::data::incoming::Message::UpdatedBars(bar_message) => {
let bar = Bar::from(bar_message);
info!(
"websocket::Incoming bar for {}: {}",
bar.asset_symbol, bar.timestamp
);
database::bars::upsert(
&app_config.postgres_pool,
&bar,
backfilled.read().await[&bar.asset_symbol],
)
.await;
}
websocket::data::incoming::Message::UpdatedBars(bar_message) => {
let bar = Bar::from(bar_message);
info!(
"websocket::Incoming bar for {}: {}",
bar.asset_symbol, bar.timestamp
);
info!("websocket::Incoming bar for {}: {}", bar.symbol, bar.time);
let transaction = app_config.postgres_pool.begin().await.unwrap();
let backfilled_asset_symbol = backfilled.read().await[&bar.asset_symbol];
database::bars::upsert(&app_config.postgres_pool, &bar, backfilled_asset_symbol).await;
if backfilled_asset_symbol {
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&bar.asset_symbol,
&bar.timestamp,
)
.await;
database::bars::upsert(&app_config.clickhouse_client, &bar).await;
if backfilled.read().await[&bar.symbol] {
database::bars::upsert_validity(&app_config.clickhouse_client, &bar.into()).await;
}
transaction.commit().await.unwrap();
}
_ => {}
}
}
#[allow(clippy::significant_drop_in_scrutinee)]
async fn null_handler(app_config: Arc<Config>, backfilled: Arc<RwLock<HashMap<String, bool>>>) {
#[derive(PartialEq)]
enum NullHandlerState {
Bars,
UpdatedBars,
}
let next_30s = next_30s();
let mut state = if next_30s.unix_timestamp() % 30 == 0 {
NullHandlerState::Bars
} else {
NullHandlerState::UpdatedBars
};
let mut interval = interval_at(
(Instant::now() + duration_until(next_30s)).into(),
THIRTY_SECONDS,
);
loop {
interval.tick().await;
let timestamp = last_minute() - ONE_MINUTE;
let backfilled = backfilled.read().await;
for asset_symbol in backfilled.keys().cloned() {
let bar = Bar::empty(timestamp, asset_symbol);
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::insert_or_skip(
&app_config.postgres_pool,
&bar,
backfilled[&bar.asset_symbol],
)
.await;
if backfilled[&bar.asset_symbol] && state == NullHandlerState::Bars {
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&bar.asset_symbol,
&bar.timestamp,
)
.await;
}
transaction.commit().await.unwrap();
}
state = match state {
NullHandlerState::Bars => NullHandlerState::UpdatedBars,
NullHandlerState::UpdatedBars => NullHandlerState::Bars,
};
websocket::data::incoming::Message::Success(_) => {}
}
}
pub async fn backfill(
app_config: Arc<Config>,
asset: Asset,
backfilled: Arc<RwLock<HashMap<String, bool>>>,
asset: Asset,
) {
info!("Backfilling historical data for {}...", asset.symbol);
let bar_validity =
database::bars::select_validity_where_symbol(&app_config.clickhouse_client, &asset.symbol)
.await
.unwrap();
let task_run_offsetdatetime = next_minute() + app_config.alpaca_historical_offset;
let fetch_from = asset.timestamp_last + ONE_MINUTE;
let fetch_until = task_run_offsetdatetime - app_config.alpaca_historical_offset - ONE_MINUTE;
let fetch_from = bar_validity.time_last + ONE_MINUTE;
let fetch_until = last_minute();
if fetch_from > fetch_until {
return;
}
let wait_duration = task_run_offsetdatetime - OffsetDateTime::now_utc();
if wait_duration.is_positive() {
sleep(wait_duration.unsigned_abs()).await;
}
info!("Queing historical data backfill for {}...", asset.symbol);
let task_run_offsetdatetime = next_minute() + app_config.alpaca_historical_offset;
sleep(duration_until(task_run_offsetdatetime)).await;
let bars = backfill_bars_from_api(&app_config, &asset, fetch_from, fetch_until).await;
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await;
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&asset.symbol,
&fetch_until,
)
.await;
derive_recent_nulls(&app_config, &asset, &fetch_until, &backfilled).await;
transaction.commit().await.unwrap();
info!("Backfilled historical data for {}.", asset.symbol);
}
fn generate_per_minute_bars(
from: OffsetDateTime,
until: OffsetDateTime,
asset: &Asset,
) -> IndexMap<OffsetDateTime, Bar> {
let mut bars = IndexMap::new();
let mut current_time = from;
while current_time <= until {
bars.insert(current_time, Bar::empty(current_time, asset.symbol.clone()));
current_time += ONE_MINUTE;
}
bars
}
async fn backfill_bars_from_api(
app_config: &Arc<Config>,
asset: &Asset,
from: OffsetDateTime,
until: OffsetDateTime,
) -> Vec<Bar> {
let asset_clone = asset.clone();
let mut bars = spawn_blocking(move || generate_per_minute_bars(from, until, &asset_clone))
.await
.unwrap();
info!("Running historical data backfill for {}...", asset.symbol);
let mut bars = Vec::new();
let mut next_page_token = None;
loop {
let request = app_config
.alpaca_client
@@ -369,11 +266,17 @@ async fn backfill_bars_from_api(
("timeframe", &String::from("1Min")),
(
"start",
&from.format(ALPACA_TIMESTAMP_FORMAT).unwrap().to_string(),
&fetch_from
.format(ALPACA_TIMESTAMP_FORMAT)
.unwrap()
.to_string(),
),
(
"end",
&until.format(ALPACA_TIMESTAMP_FORMAT).unwrap().to_string(),
&fetch_until
.format(ALPACA_TIMESTAMP_FORMAT)
.unwrap()
.to_string(),
),
("limit", &String::from("10000")),
("page_token", &next_page_token.clone().unwrap_or_default()),
@@ -381,17 +284,14 @@ async fn backfill_bars_from_api(
app_config.alpaca_rate_limit.until_ready().await;
let response = request.send().await.unwrap();
let mut response = if response.status() == StatusCode::OK {
response
.json::<api::incoming::bar::Message>()
.await
.unwrap()
let mut response = if response.status() == reqwest::StatusCode::OK {
response.json::<incoming::bar::Message>().await.unwrap()
} else {
error!(
"Failed to backfill historical data for {} from {} to {}: {}",
asset.symbol,
from,
until,
fetch_from,
fetch_until,
response.text().await.unwrap()
);
break;
@@ -403,7 +303,7 @@ async fn backfill_bars_from_api(
.unwrap_or_default()
.unwrap_or_default()
{
bars.insert(bar.timestamp, Bar::from((bar, asset.symbol.clone())));
bars.push(Bar::from((bar, asset.symbol.clone())));
}
if response.next_page_token.is_none() {
@@ -412,29 +312,13 @@ async fn backfill_bars_from_api(
next_page_token = response.next_page_token;
}
bars.into_values().collect::<Vec<Bar>>()
}
database::bars::upsert_batch(&app_config.clickhouse_client, &bars).await;
if let Some(last_bar) = bars.last() {
database::bars::upsert_validity(&app_config.clickhouse_client, &last_bar.clone().into())
.await;
}
#[allow(clippy::significant_drop_tightening)]
async fn derive_recent_nulls(
app_config: &Arc<Config>,
asset: &Asset,
from: &OffsetDateTime,
backfilled: &Arc<RwLock<HashMap<String, bool>>>,
) {
let mut backfilled = backfilled.write().await;
let bars = database::bars::select_where_symbol_where_timestamp_larger_than(
&app_config.postgres_pool,
&asset.symbol,
from,
)
.await;
database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await;
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&asset.symbol,
&bars.last().unwrap().timestamp,
)
.await;
backfilled.insert(asset.symbol.clone(), true);
backfilled.write().await.insert(asset.symbol.clone(), true);
info!("Backfilled historical data for {}.", asset.symbol);
}