From 82022551328788c449c3cdb7d7e95fcd2ee760a8 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Thu, 14 Mar 2024 12:47:52 +0000 Subject: [PATCH] Fix backfill freshness Signed-off-by: Nikolaos Karaolidis --- src/init.rs | 12 ++------ src/lib/database/backfills_bars.rs | 16 ++--------- src/lib/database/backfills_news.rs | 16 ++--------- src/lib/database/mod.rs | 29 +++++++++++++++++-- src/lib/types/backfill.rs | 20 +------------ src/main.rs | 46 +++++++++++++++++++++--------- src/threads/data/backfill/bars.rs | 15 +++++++++- src/threads/data/backfill/mod.rs | 6 ++++ src/threads/data/backfill/news.rs | 15 +++++++++- 9 files changed, 101 insertions(+), 74 deletions(-) diff --git a/src/init.rs b/src/init.rs index e8d48bb..524d8c7 100644 --- a/src/init.rs +++ b/src/init.rs @@ -34,15 +34,13 @@ pub async fn check_account(config: &Arc) { warn!("Account cash is zero, qrust will not be able to trade."); } - warn!( - "qrust active on {} account with {} {}, avoid transferring funds without shutting down.", + info!( + "qrust running on {} account with {} {}, avoid transferring funds without shutting down.", *ALPACA_API_BASE, account.currency, account.cash ); } pub async fn rehydrate_orders(config: &Arc) { - info!("Rehydrating order data."); - let mut orders = vec![]; let mut after = OffsetDateTime::UNIX_EPOCH; @@ -77,13 +75,9 @@ pub async fn rehydrate_orders(config: &Arc) { ) .await .unwrap(); - - info!("Rehydrated order data."); } pub async fn rehydrate_positions(config: &Arc) { - info!("Rehydrating position data."); - let positions_future = async { alpaca::positions::get( &config.alpaca_client, @@ -135,6 +129,4 @@ pub async fn rehydrate_positions(config: &Arc) { position.symbol, position.qty ); } - - info!("Rehydrated position data."); } diff --git a/src/lib/database/backfills_bars.rs b/src/lib/database/backfills_bars.rs index 75f191f..fc66855 100644 --- a/src/lib/database/backfills_bars.rs +++ b/src/lib/database/backfills_bars.rs @@ -1,21 +1,11 @@ -use std::sync::Arc; - use crate::{ - cleanup, delete_where_symbols, optimize, select_where_symbols, types::Backfill, upsert_batch, + cleanup, delete_where_symbols, optimize, select_where_symbols, set_fresh_where_symbols, + types::Backfill, upsert_batch, }; -use clickhouse::{error::Error, Client}; -use tokio::sync::Semaphore; select_where_symbols!(Backfill, "backfills_bars"); upsert_batch!(Backfill, "backfills_bars"); delete_where_symbols!("backfills_bars"); cleanup!("backfills_bars"); optimize!("backfills_bars"); - -pub async fn unfresh(client: &Client, concurrency_limiter: &Arc) -> Result<(), Error> { - let _ = concurrency_limiter.acquire().await.unwrap(); - client - .query("ALTER TABLE backfills_bars UPDATE fresh = false WHERE true") - .execute() - .await -} +set_fresh_where_symbols!("backfills_bars"); diff --git a/src/lib/database/backfills_news.rs b/src/lib/database/backfills_news.rs index 92d5040..ddffa06 100644 --- a/src/lib/database/backfills_news.rs +++ b/src/lib/database/backfills_news.rs @@ -1,21 +1,11 @@ -use std::sync::Arc; - use crate::{ - cleanup, delete_where_symbols, optimize, select_where_symbols, types::Backfill, upsert_batch, + cleanup, delete_where_symbols, optimize, select_where_symbols, set_fresh_where_symbols, + types::Backfill, upsert_batch, }; -use clickhouse::{error::Error, Client}; -use tokio::sync::Semaphore; select_where_symbols!(Backfill, "backfills_news"); upsert_batch!(Backfill, "backfills_news"); delete_where_symbols!("backfills_news"); cleanup!("backfills_news"); optimize!("backfills_news"); - -pub async fn unfresh(client: &Client, concurrency_limiter: &Arc) -> Result<(), Error> { - let _ = concurrency_limiter.acquire().await.unwrap(); - client - .query("ALTER TABLE backfills_news UPDATE fresh = false WHERE true") - .execute() - .await -} +set_fresh_where_symbols!("backfills_news"); diff --git a/src/lib/database/mod.rs b/src/lib/database/mod.rs index 136f353..c4816fd 100644 --- a/src/lib/database/mod.rs +++ b/src/lib/database/mod.rs @@ -7,7 +7,6 @@ pub mod news; pub mod orders; use clickhouse::{error::Error, Client}; -use log::info; use tokio::try_join; #[macro_export] @@ -168,11 +167,36 @@ macro_rules! optimize { }; } +#[macro_export] +macro_rules! set_fresh_where_symbols { + ($table_name:expr) => { + pub async fn set_fresh_where_symbols( + client: &clickhouse::Client, + concurrency_limiter: &std::sync::Arc, + fresh: bool, + symbols: &[T], + ) -> Result<(), clickhouse::error::Error> + where + T: AsRef + serde::Serialize + Send + Sync, + { + let _ = concurrency_limiter.acquire().await.unwrap(); + client + .query(&format!( + "ALTER TABLE {} UPDATE fresh = ? WHERE symbol IN ?", + $table_name + )) + .bind(fresh) + .bind(symbols) + .execute() + .await + } + }; +} + pub async fn cleanup_all( clickhouse_client: &Client, concurrency_limiter: &std::sync::Arc, ) -> Result<(), Error> { - info!("Cleaning up database."); try_join!( bars::cleanup(clickhouse_client, concurrency_limiter), news::cleanup(clickhouse_client, concurrency_limiter), @@ -186,7 +210,6 @@ pub async fn optimize_all( clickhouse_client: &Client, concurrency_limiter: &std::sync::Arc, ) -> Result<(), Error> { - info!("Optimizing database."); try_join!( assets::optimize(clickhouse_client, concurrency_limiter), bars::optimize(clickhouse_client, concurrency_limiter), diff --git a/src/lib/types/backfill.rs b/src/lib/types/backfill.rs index 92ad587..1af8afd 100644 --- a/src/lib/types/backfill.rs +++ b/src/lib/types/backfill.rs @@ -1,4 +1,3 @@ -use super::{Bar, News}; use clickhouse::Row; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -8,22 +7,5 @@ pub struct Backfill { pub symbol: String, #[serde(with = "clickhouse::serde::time::datetime")] pub time: OffsetDateTime, -} - -impl From for Backfill { - fn from(bar: Bar) -> Self { - Self { - symbol: bar.symbol, - time: bar.time, - } - } -} - -impl From<(News, String)> for Backfill { - fn from((news, symbol): (News, String)) -> Self { - Self { - symbol, - time: news.time_created, - } - } + pub fresh: bool, } diff --git a/src/main.rs b/src/main.rs index 2d05246..3dba781 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use config::{ CLICKHOUSE_BATCH_NEWS_SIZE, CLICKHOUSE_MAX_CONNECTIONS, }; use dotenv::dotenv; +use log::info; use log4rs::config::Deserializers; use qrust::{create_send_await, database}; use tokio::{join, spawn, sync::mpsc, try_join}; @@ -29,24 +30,47 @@ async fn main() { let _ = *CLICKHOUSE_BATCH_NEWS_SIZE; let _ = *CLICKHOUSE_MAX_CONNECTIONS; + info!("Marking all assets as stale."); + + let assets = database::assets::select( + &config.clickhouse_client, + &config.clickhouse_concurrency_limiter, + ) + .await + .unwrap() + .into_iter() + .map(|asset| (asset.symbol, asset.class)) + .collect::>(); + + let symbols = assets.iter().map(|(symbol, _)| symbol).collect::>(); + try_join!( - database::backfills_bars::unfresh( + database::backfills_bars::set_fresh_where_symbols( &config.clickhouse_client, - &config.clickhouse_concurrency_limiter + &config.clickhouse_concurrency_limiter, + false, + &symbols ), - database::backfills_news::unfresh( + database::backfills_news::set_fresh_where_symbols( &config.clickhouse_client, - &config.clickhouse_concurrency_limiter + &config.clickhouse_concurrency_limiter, + false, + &symbols ) ) .unwrap(); + info!("Cleaning up database."); + database::cleanup_all( &config.clickhouse_client, &config.clickhouse_concurrency_limiter, ) .await .unwrap(); + + info!("Optimizing database."); + database::optimize_all( &config.clickhouse_client, &config.clickhouse_concurrency_limiter, @@ -54,12 +78,16 @@ async fn main() { .await .unwrap(); + info!("Rehydrating account data."); + init::check_account(&config).await; join!( init::rehydrate_orders(&config), init::rehydrate_positions(&config) ); + info!("Starting threads."); + spawn(threads::trading::run(config.clone())); let (data_sender, data_receiver) = mpsc::channel::(100); @@ -73,16 +101,6 @@ async fn main() { spawn(threads::clock::run(config.clone(), clock_sender)); - let assets = database::assets::select( - &config.clickhouse_client, - &config.clickhouse_concurrency_limiter, - ) - .await - .unwrap() - .into_iter() - .map(|asset| (asset.symbol, asset.class)) - .collect::>(); - create_send_await!( data_sender, threads::data::Message::new, diff --git a/src/threads/data/backfill/bars.rs b/src/threads/data/backfill/bars.rs index a5d0014..1f03994 100644 --- a/src/threads/data/backfill/bars.rs +++ b/src/threads/data/backfill/bars.rs @@ -172,7 +172,11 @@ impl super::Handler for Handler { let backfilled = last_times .drain() - .map(|(symbol, time)| Backfill { symbol, time }) + .map(|(symbol, time)| Backfill { + fresh: jobs[&symbol].fresh, + symbol, + time, + }) .collect::>(); database::backfills_bars::upsert_batch( @@ -191,6 +195,15 @@ impl super::Handler for Handler { bars.clear(); } + database::backfills_bars::set_fresh_where_symbols( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + true, + &symbols, + ) + .await + .unwrap(); + info!("Backfilled bars for {:?}.", symbols); } diff --git a/src/threads/data/backfill/mod.rs b/src/threads/data/backfill/mod.rs index e5e7590..3f599f7 100644 --- a/src/threads/data/backfill/mod.rs +++ b/src/threads/data/backfill/mod.rs @@ -47,6 +47,7 @@ impl Message { pub struct Job { pub fetch_from: OffsetDateTime, pub fetch_to: OffsetDateTime, + pub fresh: bool, } #[async_trait] @@ -160,11 +161,16 @@ async fn handle_backfill_message( return; } + let fresh = backfills + .get(&symbol) + .map_or(false, |backfill| backfill.fresh); + jobs.push(( symbol, Job { fetch_from, fetch_to, + fresh, }, )); } diff --git a/src/threads/data/backfill/news.rs b/src/threads/data/backfill/news.rs index a6787ad..cd07b5d 100644 --- a/src/threads/data/backfill/news.rs +++ b/src/threads/data/backfill/news.rs @@ -136,7 +136,11 @@ impl super::Handler for Handler { let backfilled = last_times .drain() - .map(|(symbol, time)| Backfill { symbol, time }) + .map(|(symbol, time)| Backfill { + fresh: jobs[&symbol].fresh, + symbol, + time, + }) .collect::>(); database::backfills_news::upsert_batch( @@ -155,6 +159,15 @@ impl super::Handler for Handler { news.clear(); } + database::backfills_news::set_fresh_where_symbols( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + true, + &symbols, + ) + .await + .unwrap(); + info!("Backfilled news for {:?}.", symbols); }