Add backfill early saving

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-03-11 16:53:12 +00:00
parent 2de86b46f7
commit b60cbc891d
7 changed files with 87 additions and 90 deletions

View File

@@ -7,7 +7,6 @@ use serde::Deserialize;
use std::{collections::HashMap, time::Duration}; use std::{collections::HashMap, time::Duration};
use time::OffsetDateTime; use time::OffsetDateTime;
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct Bar { pub struct Bar {
#[serde(rename = "t")] #[serde(rename = "t")]

View File

@@ -1,4 +1,3 @@
pub mod api; pub mod api;
pub mod shared; pub mod shared;
pub mod websocket; pub mod websocket;

View File

@@ -87,8 +87,9 @@ pub async fn add(
.map(|asset| (asset.symbol.clone(), asset)) .map(|asset| (asset.symbol.clone(), asset))
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
let num_symbols = request.symbols.len();
let (assets, skipped, failed) = request.symbols.into_iter().fold( let (assets, skipped, failed) = request.symbols.into_iter().fold(
(vec![], vec![], vec![]), (Vec::with_capacity(num_symbols), vec![], vec![]),
|(mut assets, mut skipped, mut failed), symbol| { |(mut assets, mut skipped, mut failed), symbol| {
if database_symbols.contains(&symbol) { if database_symbols.contains(&symbol) {
skipped.push(symbol); skipped.push(symbol);

View File

@@ -4,7 +4,6 @@ use crate::{
database, database,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use itertools::{Either, Itertools};
use log::{error, info}; use log::{error, info};
use qrust::{ use qrust::{
types::{ types::{
@@ -116,7 +115,7 @@ impl super::Handler for Handler {
info!("Backfilling bars for {:?}.", symbols); info!("Backfilling bars for {:?}.", symbols);
let mut bars = vec![]; let mut bars = Vec::with_capacity(database::bars::BATCH_FLUSH_SIZE);
let mut last_times = symbols let mut last_times = symbols
.iter() .iter()
.map(|symbol| (symbol.clone(), None)) .map(|symbol| (symbol.clone(), None))
@@ -163,44 +162,39 @@ impl super::Handler for Handler {
) )
.await .await
.unwrap(); .unwrap();
bars = vec![];
}
if message.next_page_token.is_none() { let backfilled = last_times
break; .into_iter()
} .filter_map(|(symbol, time)| {
next_page_token = message.next_page_token; if let Some(time) = time {
} return Some(Backfill { symbol, time });
}
None
})
.collect::<Vec<_>>();
let (backfilled, skipped): (Vec<_>, Vec<_>) = database::backfills_bars::upsert_batch(
last_times.into_iter().partition_map(|(symbol, time)| { &self.config.clickhouse_client,
if let Some(time) = time { &self.config.clickhouse_concurrency_limiter,
Either::Left(Backfill { symbol, time }) &backfilled,
} else { )
Either::Right(symbol) .await
.unwrap();
if message.next_page_token.is_none() {
break;
} }
});
database::backfills_bars::upsert_batch( next_page_token = message.next_page_token;
&self.config.clickhouse_client, bars = Vec::with_capacity(database::bars::BATCH_FLUSH_SIZE);
&self.config.clickhouse_concurrency_limiter, last_times = symbols
&backfilled, .iter()
) .map(|symbol| (symbol.clone(), None))
.await .collect::<HashMap<_, _>>();
.unwrap(); }
let backfilled = backfilled
.into_iter()
.map(|backfill| backfill.symbol)
.collect::<Vec<_>>();
if !skipped.is_empty() {
info!("No bars to backfill for {:?}.", skipped);
} }
if !backfilled.is_empty() { info!("Backfilled bars for {:?}.", symbols);
info!("Backfilled bars for {:?}.", backfilled);
}
} }
fn max_limit(&self) -> i64 { fn max_limit(&self) -> i64 {

View File

@@ -13,7 +13,7 @@ use qrust::{
}, },
utils::{last_minute, ONE_SECOND}, utils::{last_minute, ONE_SECOND},
}; };
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, hash::Hash, sync::Arc};
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::{ use tokio::{
spawn, spawn,
@@ -82,10 +82,8 @@ impl Jobs {
self.uuid_to_job.insert(uuid, fut); self.uuid_to_job.insert(uuid, fut);
} }
pub fn get(&self, symbol: &str) -> Option<&JoinHandle<()>> { pub fn contains_key(&self, symbol: &str) -> bool {
self.symbol_to_uuid self.symbol_to_uuid.contains_key(symbol)
.get(symbol)
.and_then(|uuid| self.uuid_to_job.get(uuid))
} }
pub fn remove(&mut self, symbol: &str) -> Option<JoinHandle<()>> { pub fn remove(&mut self, symbol: &str) -> Option<JoinHandle<()>> {
@@ -93,6 +91,17 @@ impl Jobs {
.remove(symbol) .remove(symbol)
.and_then(|uuid| self.uuid_to_job.remove(&uuid)) .and_then(|uuid| self.uuid_to_job.remove(&uuid))
} }
pub fn remove_many<T>(&mut self, symbols: &[T])
where
T: AsRef<str> + Hash + Eq,
{
for symbol in symbols {
self.symbol_to_uuid
.remove(symbol.as_ref())
.and_then(|uuid| self.uuid_to_job.remove(&uuid));
}
}
} }
pub async fn run(handler: Arc<Box<dyn Handler>>, mut receiver: mpsc::Receiver<Message>) { pub async fn run(handler: Arc<Box<dyn Handler>>, mut receiver: mpsc::Receiver<Message>) {
@@ -116,6 +125,7 @@ async fn handle_backfill_message(
backfill_jobs: Arc<Mutex<Jobs>>, backfill_jobs: Arc<Mutex<Jobs>>,
message: Message, message: Message,
) { ) {
let backfill_jobs_clone = backfill_jobs.clone();
let mut backfill_jobs = backfill_jobs.lock().await; let mut backfill_jobs = backfill_jobs.lock().await;
match message.action { match message.action {
@@ -131,17 +141,15 @@ async fn handle_backfill_message(
.map(|backfill| (backfill.symbol.clone(), backfill)) .map(|backfill| (backfill.symbol.clone(), backfill))
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
let mut jobs = vec![]; let mut jobs = Vec::with_capacity(message.symbols.len());
for symbol in message.symbols { for symbol in message.symbols {
if let Some(job) = backfill_jobs.get(&symbol) { if backfill_jobs.contains_key(&symbol) {
if !job.is_finished() { warn!(
warn!( "Backfill for {} {} is already running, skipping.",
"Backfill for {} {} is already running, skipping.", symbol, log_string
symbol, log_string );
); continue;
continue;
}
} }
let fetch_from = backfills let fetch_from = backfills
@@ -168,7 +176,7 @@ async fn handle_backfill_message(
let jobs = jobs let jobs = jobs
.into_iter() .into_iter()
.sorted_by_key(|job| job.1.fetch_from) .sorted_unstable_by_key(|job| job.1.fetch_from)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut job_groups = vec![HashMap::new()]; let mut job_groups = vec![HashMap::new()];
@@ -194,9 +202,13 @@ async fn handle_backfill_message(
let symbols = job_group.keys().cloned().collect::<Vec<_>>(); let symbols = job_group.keys().cloned().collect::<Vec<_>>();
let handler = handler.clone(); let handler = handler.clone();
let symbols_clone = symbols.clone();
let backfill_jobs_clone = backfill_jobs_clone.clone();
let fut = spawn(async move { let fut = spawn(async move {
handler.queue_backfill(&job_group).await; handler.queue_backfill(&job_group).await;
handler.backfill(job_group).await; handler.backfill(job_group).await;
backfill_jobs_clone.lock().await.remove_many(&symbols_clone);
}); });
backfill_jobs.insert(symbols, fut); backfill_jobs.insert(symbols, fut);
@@ -205,9 +217,7 @@ async fn handle_backfill_message(
Action::Purge => { Action::Purge => {
for symbol in &message.symbols { for symbol in &message.symbols {
if let Some(job) = backfill_jobs.remove(symbol) { if let Some(job) = backfill_jobs.remove(symbol) {
if !job.is_finished() { job.abort();
job.abort();
}
let _ = job.await; let _ = job.await;
} }
} }

View File

@@ -5,7 +5,6 @@ use crate::{
}; };
use async_trait::async_trait; use async_trait::async_trait;
use futures_util::future::join_all; use futures_util::future::join_all;
use itertools::{Either, Itertools};
use log::{error, info}; use log::{error, info};
use qrust::{ use qrust::{
types::{ types::{
@@ -78,7 +77,7 @@ impl super::Handler for Handler {
info!("Backfilling news for {:?}.", symbols); info!("Backfilling news for {:?}.", symbols);
let mut news = vec![]; let mut news = Vec::with_capacity(database::news::BATCH_FLUSH_SIZE);
let mut last_times = symbols let mut last_times = symbols
.iter() .iter()
.map(|symbol| (symbol.clone(), None)) .map(|symbol| (symbol.clone(), None))
@@ -160,44 +159,39 @@ impl super::Handler for Handler {
) )
.await .await
.unwrap(); .unwrap();
news = vec![];
}
if message.next_page_token.is_none() { let backfilled = last_times
break; .into_iter()
} .filter_map(|(symbol, time)| {
next_page_token = message.next_page_token; if let Some(time) = time {
} return Some(Backfill { symbol, time });
}
None
})
.collect::<Vec<_>>();
let (backfilled, skipped): (Vec<_>, Vec<_>) = database::backfills_news::upsert_batch(
last_times.into_iter().partition_map(|(symbol, time)| { &self.config.clickhouse_client,
if let Some(time) = time { &self.config.clickhouse_concurrency_limiter,
Either::Left(Backfill { symbol, time }) &backfilled,
} else { )
Either::Right(symbol) .await
.unwrap();
if message.next_page_token.is_none() {
break;
} }
});
database::backfills_news::upsert_batch( next_page_token = message.next_page_token;
&self.config.clickhouse_client, news = Vec::with_capacity(database::news::BATCH_FLUSH_SIZE);
&self.config.clickhouse_concurrency_limiter, last_times = symbols
&backfilled, .iter()
) .map(|symbol| (symbol.clone(), None))
.await .collect::<HashMap<_, _>>();
.unwrap(); }
let backfilled = backfilled
.into_iter()
.map(|backfill| backfill.symbol)
.collect::<Vec<_>>();
if !skipped.is_empty() {
info!("No news to backfill for {:?}.", skipped);
} }
if !backfilled.is_empty() { info!("Backfilled news for {:?}.", symbols);
info!("Backfilled news for {:?}.", backfilled);
}
} }
fn max_limit(&self) -> i64 { fn max_limit(&self) -> i64 {

View File

@@ -245,7 +245,7 @@ async fn handle_message(
let (mut assets, mut positions) = join!(assets, positions); let (mut assets, mut positions) = join!(assets, positions);
let mut batch = vec![]; let mut batch = Vec::with_capacity(symbols.len());
for symbol in &symbols { for symbol in &symbols {
if let Some(asset) = assets.remove(symbol) { if let Some(asset) = assets.remove(symbol) {