Fix backfill freshness

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-03-14 12:47:52 +00:00
parent 0d276d537c
commit 8202255132
9 changed files with 101 additions and 74 deletions

View File

@@ -34,15 +34,13 @@ pub async fn check_account(config: &Arc<Config>) {
warn!("Account cash is zero, qrust will not be able to trade.");
}
warn!(
"qrust active on {} account with {} {}, avoid transferring funds without shutting down.",
info!(
"qrust running on {} account with {} {}, avoid transferring funds without shutting down.",
*ALPACA_API_BASE, account.currency, account.cash
);
}
pub async fn rehydrate_orders(config: &Arc<Config>) {
info!("Rehydrating order data.");
let mut orders = vec![];
let mut after = OffsetDateTime::UNIX_EPOCH;
@@ -77,13 +75,9 @@ pub async fn rehydrate_orders(config: &Arc<Config>) {
)
.await
.unwrap();
info!("Rehydrated order data.");
}
pub async fn rehydrate_positions(config: &Arc<Config>) {
info!("Rehydrating position data.");
let positions_future = async {
alpaca::positions::get(
&config.alpaca_client,
@@ -135,6 +129,4 @@ pub async fn rehydrate_positions(config: &Arc<Config>) {
position.symbol, position.qty
);
}
info!("Rehydrated position data.");
}

View File

@@ -1,21 +1,11 @@
use std::sync::Arc;
use crate::{
cleanup, delete_where_symbols, optimize, select_where_symbols, types::Backfill, upsert_batch,
cleanup, delete_where_symbols, optimize, select_where_symbols, set_fresh_where_symbols,
types::Backfill, upsert_batch,
};
use clickhouse::{error::Error, Client};
use tokio::sync::Semaphore;
select_where_symbols!(Backfill, "backfills_bars");
upsert_batch!(Backfill, "backfills_bars");
delete_where_symbols!("backfills_bars");
cleanup!("backfills_bars");
optimize!("backfills_bars");
pub async fn unfresh(client: &Client, concurrency_limiter: &Arc<Semaphore>) -> Result<(), Error> {
let _ = concurrency_limiter.acquire().await.unwrap();
client
.query("ALTER TABLE backfills_bars UPDATE fresh = false WHERE true")
.execute()
.await
}
set_fresh_where_symbols!("backfills_bars");

View File

@@ -1,21 +1,11 @@
use std::sync::Arc;
use crate::{
cleanup, delete_where_symbols, optimize, select_where_symbols, types::Backfill, upsert_batch,
cleanup, delete_where_symbols, optimize, select_where_symbols, set_fresh_where_symbols,
types::Backfill, upsert_batch,
};
use clickhouse::{error::Error, Client};
use tokio::sync::Semaphore;
select_where_symbols!(Backfill, "backfills_news");
upsert_batch!(Backfill, "backfills_news");
delete_where_symbols!("backfills_news");
cleanup!("backfills_news");
optimize!("backfills_news");
pub async fn unfresh(client: &Client, concurrency_limiter: &Arc<Semaphore>) -> Result<(), Error> {
let _ = concurrency_limiter.acquire().await.unwrap();
client
.query("ALTER TABLE backfills_news UPDATE fresh = false WHERE true")
.execute()
.await
}
set_fresh_where_symbols!("backfills_news");

View File

@@ -7,7 +7,6 @@ pub mod news;
pub mod orders;
use clickhouse::{error::Error, Client};
use log::info;
use tokio::try_join;
#[macro_export]
@@ -168,11 +167,36 @@ macro_rules! optimize {
};
}
#[macro_export]
macro_rules! set_fresh_where_symbols {
($table_name:expr) => {
pub async fn set_fresh_where_symbols<T>(
client: &clickhouse::Client,
concurrency_limiter: &std::sync::Arc<tokio::sync::Semaphore>,
fresh: bool,
symbols: &[T],
) -> Result<(), clickhouse::error::Error>
where
T: AsRef<str> + serde::Serialize + Send + Sync,
{
let _ = concurrency_limiter.acquire().await.unwrap();
client
.query(&format!(
"ALTER TABLE {} UPDATE fresh = ? WHERE symbol IN ?",
$table_name
))
.bind(fresh)
.bind(symbols)
.execute()
.await
}
};
}
pub async fn cleanup_all(
clickhouse_client: &Client,
concurrency_limiter: &std::sync::Arc<tokio::sync::Semaphore>,
) -> Result<(), Error> {
info!("Cleaning up database.");
try_join!(
bars::cleanup(clickhouse_client, concurrency_limiter),
news::cleanup(clickhouse_client, concurrency_limiter),
@@ -186,7 +210,6 @@ pub async fn optimize_all(
clickhouse_client: &Client,
concurrency_limiter: &std::sync::Arc<tokio::sync::Semaphore>,
) -> Result<(), Error> {
info!("Optimizing database.");
try_join!(
assets::optimize(clickhouse_client, concurrency_limiter),
bars::optimize(clickhouse_client, concurrency_limiter),

View File

@@ -1,4 +1,3 @@
use super::{Bar, News};
use clickhouse::Row;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
@@ -8,22 +7,5 @@ pub struct Backfill {
pub symbol: String,
#[serde(with = "clickhouse::serde::time::datetime")]
pub time: OffsetDateTime,
}
impl From<Bar> for Backfill {
fn from(bar: Bar) -> Self {
Self {
symbol: bar.symbol,
time: bar.time,
}
}
}
impl From<(News, String)> for Backfill {
fn from((news, symbol): (News, String)) -> Self {
Self {
symbol,
time: news.time_created,
}
}
pub fresh: bool,
}

View File

@@ -12,6 +12,7 @@ use config::{
CLICKHOUSE_BATCH_NEWS_SIZE, CLICKHOUSE_MAX_CONNECTIONS,
};
use dotenv::dotenv;
use log::info;
use log4rs::config::Deserializers;
use qrust::{create_send_await, database};
use tokio::{join, spawn, sync::mpsc, try_join};
@@ -29,24 +30,47 @@ async fn main() {
let _ = *CLICKHOUSE_BATCH_NEWS_SIZE;
let _ = *CLICKHOUSE_MAX_CONNECTIONS;
info!("Marking all assets as stale.");
let assets = database::assets::select(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter,
)
.await
.unwrap()
.into_iter()
.map(|asset| (asset.symbol, asset.class))
.collect::<Vec<_>>();
let symbols = assets.iter().map(|(symbol, _)| symbol).collect::<Vec<_>>();
try_join!(
database::backfills_bars::unfresh(
database::backfills_bars::set_fresh_where_symbols(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter
&config.clickhouse_concurrency_limiter,
false,
&symbols
),
database::backfills_news::unfresh(
database::backfills_news::set_fresh_where_symbols(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter
&config.clickhouse_concurrency_limiter,
false,
&symbols
)
)
.unwrap();
info!("Cleaning up database.");
database::cleanup_all(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter,
)
.await
.unwrap();
info!("Optimizing database.");
database::optimize_all(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter,
@@ -54,12 +78,16 @@ async fn main() {
.await
.unwrap();
info!("Rehydrating account data.");
init::check_account(&config).await;
join!(
init::rehydrate_orders(&config),
init::rehydrate_positions(&config)
);
info!("Starting threads.");
spawn(threads::trading::run(config.clone()));
let (data_sender, data_receiver) = mpsc::channel::<threads::data::Message>(100);
@@ -73,16 +101,6 @@ async fn main() {
spawn(threads::clock::run(config.clone(), clock_sender));
let assets = database::assets::select(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter,
)
.await
.unwrap()
.into_iter()
.map(|asset| (asset.symbol, asset.class))
.collect::<Vec<_>>();
create_send_await!(
data_sender,
threads::data::Message::new,

View File

@@ -172,7 +172,11 @@ impl super::Handler for Handler {
let backfilled = last_times
.drain()
.map(|(symbol, time)| Backfill { symbol, time })
.map(|(symbol, time)| Backfill {
fresh: jobs[&symbol].fresh,
symbol,
time,
})
.collect::<Vec<_>>();
database::backfills_bars::upsert_batch(
@@ -191,6 +195,15 @@ impl super::Handler for Handler {
bars.clear();
}
database::backfills_bars::set_fresh_where_symbols(
&self.config.clickhouse_client,
&self.config.clickhouse_concurrency_limiter,
true,
&symbols,
)
.await
.unwrap();
info!("Backfilled bars for {:?}.", symbols);
}

View File

@@ -47,6 +47,7 @@ impl Message {
pub struct Job {
pub fetch_from: OffsetDateTime,
pub fetch_to: OffsetDateTime,
pub fresh: bool,
}
#[async_trait]
@@ -160,11 +161,16 @@ async fn handle_backfill_message(
return;
}
let fresh = backfills
.get(&symbol)
.map_or(false, |backfill| backfill.fresh);
jobs.push((
symbol,
Job {
fetch_from,
fetch_to,
fresh,
},
));
}

View File

@@ -136,7 +136,11 @@ impl super::Handler for Handler {
let backfilled = last_times
.drain()
.map(|(symbol, time)| Backfill { symbol, time })
.map(|(symbol, time)| Backfill {
fresh: jobs[&symbol].fresh,
symbol,
time,
})
.collect::<Vec<_>>();
database::backfills_news::upsert_batch(
@@ -155,6 +159,15 @@ impl super::Handler for Handler {
news.clear();
}
database::backfills_news::set_fresh_where_symbols(
&self.config.clickhouse_client,
&self.config.clickhouse_concurrency_limiter,
true,
&symbols,
)
.await
.unwrap();
info!("Backfilled news for {:?}.", symbols);
}