Improve asset backfilling

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-09-09 20:35:15 +03:00
parent c57eb7567c
commit 8a88d58192
9 changed files with 129 additions and 302 deletions

View File

@@ -109,8 +109,7 @@ pub async fn backfill(
let bars = bars.into_values().collect::<Vec<Bar>>();
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::upsert_batch(&app_config.postgres_pool, &bars).await;
database::bars_filled::upsert_batch(&app_config.postgres_pool, &bars).await;
database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await;
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&asset.symbol,
@@ -137,7 +136,7 @@ async fn backfill_recent_nulls(
from,
)
.await;
database::bars_filled::upsert_batch(&app_config.postgres_pool, &bars).await;
database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await;
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&asset.symbol,

View File

@@ -230,22 +230,21 @@ async fn handle_message(
incoming::Message::Bars(bar_message) => {
let bar = Bar::from(bar_message);
info!("Incoming bar for {}: {}", bar.asset_symbol, bar.timestamp);
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::upsert(&app_config.postgres_pool, &bar).await;
if *backfilled.read().await.get(&bar.asset_symbol).unwrap() {
database::bars_filled::upsert(&app_config.postgres_pool, &bar).await;
}
transaction.commit().await.unwrap();
database::bars::upsert(
&app_config.postgres_pool,
&bar,
backfilled.read().await[&bar.asset_symbol],
)
.await;
}
incoming::Message::UpdatedBars(bar_message) => {
let bar = Bar::from(bar_message);
info!("Incoming bar for {}: {}", bar.asset_symbol, bar.timestamp);
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::upsert(&app_config.postgres_pool, &bar).await;
if *backfilled.read().await.get(&bar.asset_symbol).unwrap() {
database::bars_filled::upsert(&app_config.postgres_pool, &bar).await;
let backfilled_asset_symbol = backfilled.read().await[&bar.asset_symbol];
database::bars::upsert(&app_config.postgres_pool, &bar, backfilled_asset_symbol).await;
if backfilled_asset_symbol {
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&bar.asset_symbol,
@@ -287,21 +286,20 @@ async fn null_handler(app_config: Arc<Config>, backfilled: Arc<RwLock<HashMap<St
let bar = Bar::empty(timestamp, asset_symbol);
let transaction = app_config.postgres_pool.begin().await.unwrap();
database::bars::insert_or_skip(&app_config.postgres_pool, &bar).await;
if *backfilled.get(&bar.asset_symbol).unwrap() {
database::bars_filled::insert_or_skip(&app_config.postgres_pool, &bar).await;
if state == NullHandlerState::UpdatedBars {
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&bar.asset_symbol,
&bar.timestamp,
)
.await;
}
database::bars::insert_or_skip(
&app_config.postgres_pool,
&bar,
backfilled[&bar.asset_symbol],
)
.await;
if backfilled[&bar.asset_symbol] && state == NullHandlerState::Bars {
database::assets::update_timestamp_last_where_symbol(
&app_config.postgres_pool,
&bar.asset_symbol,
&bar.timestamp,
)
.await;
}
transaction.commit().await.unwrap();
}

View File

@@ -1,9 +1,53 @@
use crate::types::Bar;
use sqlx::{query_as, PgPool, Postgres};
use std::convert::Into;
use time::OffsetDateTime;
pub async fn upsert(postgres_pool: &PgPool, bar: &Bar) -> Bar {
pub async fn select_not_null_where_symbol_where_timestamp_smaller_than_order_by_timestamp_desc_limit_one(
postgres_pool: &PgPool,
symbol: &str,
timestamp: &OffsetDateTime,
) -> Bar {
query_as!(
Bar,
r#"SELECT * FROM bars WHERE asset_symbol = $1 AND timestamp < $2 AND open IS NOT NULL AND high IS NOT NULL AND low IS NOT NULL AND close IS NOT NULL ORDER BY timestamp DESC LIMIT 1"#,
symbol,
timestamp
)
.fetch_one(postgres_pool)
.await
.unwrap()
}
pub async fn select_where_symbol_where_timestamp_larger_than(
postgres_pool: &PgPool,
symbol: &str,
timestamp: &OffsetDateTime,
) -> Vec<Bar> {
query_as!(
Bar,
r#"SELECT * FROM bars WHERE asset_symbol = $1 AND timestamp > $2 ORDER BY timestamp ASC"#,
symbol,
timestamp
)
.fetch_all(postgres_pool)
.await
.unwrap()
}
pub async fn upsert(postgres_pool: &PgPool, bar: &Bar, backfill: bool) -> Bar {
let mut bar = bar.clone();
if backfill
&& (bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none())
{
let filled_bar = select_not_null_where_symbol_where_timestamp_smaller_than_order_by_timestamp_desc_limit_one(
postgres_pool,
&bar.asset_symbol,
&bar.timestamp,
).await;
bar.merge_empty(&filled_bar);
}
query_as!(
Bar,
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
@@ -16,7 +60,20 @@ pub async fn upsert(postgres_pool: &PgPool, bar: &Bar) -> Bar {
.unwrap()
}
pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar) {
pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar, backfill: bool) {
let mut bar = bar.clone();
if backfill
&& (bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none())
{
let filled_bar = select_not_null_where_symbol_where_timestamp_smaller_than_order_by_timestamp_desc_limit_one(
postgres_pool,
&bar.asset_symbol,
&bar.timestamp,
).await;
bar.merge_empty(&filled_bar);
}
query_as!(
Bar,
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
@@ -28,7 +85,27 @@ pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar) {
.unwrap();
}
pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec<Bar> {
pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar], backfill: bool) -> Vec<Bar> {
let mut bars = bars.to_vec();
if bars.is_empty() {
return bars;
}
if backfill
&& (bars[0].open.is_none()
|| bars[0].high.is_none()
|| bars[0].low.is_none()
|| bars[0].close.is_none())
{
let filled_bar = select_not_null_where_symbol_where_timestamp_smaller_than_order_by_timestamp_desc_limit_one(
postgres_pool,
&bars[0].asset_symbol,
&bars[0].timestamp,
).await;
bars[0].merge_empty(&filled_bar);
}
let mut timestamp = Vec::with_capacity(bars.len());
let mut asset_symbol = Vec::with_capacity(bars.len());
let mut open = Vec::with_capacity(bars.len());
@@ -39,7 +116,18 @@ pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec<Bar> {
let mut num_trades = Vec::with_capacity(bars.len());
let mut volume_weighted = Vec::with_capacity(bars.len());
for bar in bars {
let mut last_filled_bar = bars[0].clone();
for mut bar in bars {
if backfill {
if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none()
{
bar.merge_empty(&last_filled_bar);
} else {
last_filled_bar = bar.clone();
}
}
timestamp.push(bar.timestamp);
asset_symbol.push(bar.asset_symbol.clone());
open.push(bar.open);
@@ -71,19 +159,3 @@ pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec<Bar> {
.await
.unwrap()
}
pub async fn select_where_symbol_where_timestamp_larger_than(
postgres_pool: &PgPool,
symbol: &str,
timestamp: &OffsetDateTime,
) -> Vec<Bar> {
query_as!(
Bar,
r#"SELECT * FROM bars WHERE asset_symbol = $1 AND timestamp > $2 ORDER BY timestamp ASC"#,
symbol,
timestamp
)
.fetch_all(postgres_pool)
.await
.unwrap()
}

View File

@@ -1,133 +0,0 @@
use crate::types::Bar;
use sqlx::{query_as, PgPool, Postgres};
use std::convert::Into;
pub async fn upsert(postgres_pool: &PgPool, bar: &Bar) -> Bar {
let mut bar = bar.clone();
if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() {
let filled_bar = query_as!(
Bar,
r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#,
bar.timestamp,
bar.asset_symbol
)
.fetch_one(postgres_pool)
.await
.unwrap();
bar.merge_empty(&filled_bar);
}
query_as!(
Bar,
r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9
RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#,
bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted
)
.fetch_one(postgres_pool)
.await
.unwrap()
}
pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar) {
let mut bar = bar.clone();
if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() {
let filled_bar = query_as!(
Bar,
r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#,
bar.timestamp,
bar.asset_symbol
)
.fetch_one(postgres_pool)
.await
.unwrap();
bar.merge_empty(&filled_bar);
}
query_as!(
Bar,
r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (timestamp, asset_symbol) DO NOTHING"#,
bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted
)
.execute(postgres_pool)
.await
.unwrap();
}
pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec<Bar> {
let mut bars = bars.to_vec();
if bars.is_empty() {
return bars;
}
if bars[0].open.is_none()
|| bars[0].high.is_none()
|| bars[0].low.is_none()
|| bars[0].close.is_none()
{
let filled_bar = &query_as!(
Bar,
r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#,
bars[0].timestamp,
bars[0].asset_symbol
)
.fetch_one(postgres_pool)
.await
.unwrap();
bars[0].merge_empty(filled_bar);
}
let mut timestamp = Vec::with_capacity(bars.len());
let mut asset_symbol = Vec::with_capacity(bars.len());
let mut open = Vec::with_capacity(bars.len());
let mut high = Vec::with_capacity(bars.len());
let mut low = Vec::with_capacity(bars.len());
let mut close = Vec::with_capacity(bars.len());
let mut volume = Vec::with_capacity(bars.len());
let mut num_trades = Vec::with_capacity(bars.len());
let mut volume_weighted = Vec::with_capacity(bars.len());
let mut last_filled_bar = bars[0].clone();
for mut bar in bars {
if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() {
bar.merge_empty(&last_filled_bar);
} else {
last_filled_bar = bar.clone();
}
timestamp.push(bar.timestamp);
asset_symbol.push(bar.asset_symbol.clone());
open.push(bar.open);
high.push(bar.high);
low.push(bar.low);
close.push(bar.close);
volume.push(bar.volume);
num_trades.push(bar.num_trades);
volume_weighted.push(bar.volume_weighted);
}
// No type-safety here because of NULLABLE bulk insert
query_as::<Postgres, Bar>(
r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted)
SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[])
ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, num_trades = EXCLUDED.num_trades, volume_weighted = EXCLUDED.volume_weighted
RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#,
)
.bind(timestamp)
.bind(asset_symbol)
.bind(open)
.bind(high)
.bind(low)
.bind(close)
.bind(volume)
.bind(num_trades)
.bind(volume_weighted)
.fetch_all(postgres_pool)
.await
.unwrap()
}

View File

@@ -1,3 +1,2 @@
pub mod assets;
pub mod bars;
pub mod bars_filled;