diff --git a/.sqlx/query-08656bb2e5424ab67014cc40c6cdcbf852e3e30364c3ea23269f29a331f807be.json b/.sqlx/query-08656bb2e5424ab67014cc40c6cdcbf852e3e30364c3ea23269f29a331f807be.json deleted file mode 100644 index e3708d1..0000000 --- a/.sqlx/query-08656bb2e5424ab67014cc40c6cdcbf852e3e30364c3ea23269f29a331f807be.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)\n ON CONFLICT (timestamp, asset_symbol) DO NOTHING", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Timestamptz", - "Text", - "Float8", - "Float8", - "Float8", - "Float8", - "Float8", - "Int8", - "Float8" - ] - }, - "nullable": [] - }, - "hash": "08656bb2e5424ab67014cc40c6cdcbf852e3e30364c3ea23269f29a331f807be" -} diff --git a/.sqlx/query-26f7ea563429e395d270cfae4993059aa8ddab93e52c95761de2de022af91ad8.json b/.sqlx/query-26f7ea563429e395d270cfae4993059aa8ddab93e52c95761de2de022af91ad8.json deleted file mode 100644 index 41a238b..0000000 --- a/.sqlx/query-26f7ea563429e395d270cfae4993059aa8ddab93e52c95761de2de022af91ad8.json +++ /dev/null @@ -1,78 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)\n ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9\n RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "timestamp", - "type_info": "Timestamptz" - }, - { - "ordinal": 1, - "name": "asset_symbol", - "type_info": "Text" - }, - { - "ordinal": 2, - "name": "open", - "type_info": "Float8" - }, - { - "ordinal": 3, - "name": "high", - "type_info": "Float8" - }, - { - "ordinal": 4, - "name": "low", - "type_info": "Float8" - }, - { - "ordinal": 5, - "name": "close", - "type_info": "Float8" - }, - { - "ordinal": 6, - "name": "volume", - "type_info": "Float8" - }, - { - "ordinal": 7, - "name": "num_trades", - "type_info": "Int8" - }, - { - "ordinal": 8, - "name": "volume_weighted", - "type_info": "Float8" - } - ], - "parameters": { - "Left": [ - "Timestamptz", - "Text", - "Float8", - "Float8", - "Float8", - "Float8", - "Float8", - "Int8", - "Float8" - ] - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false, - false - ] - }, - "hash": "26f7ea563429e395d270cfae4993059aa8ddab93e52c95761de2de022af91ad8" -} diff --git a/.sqlx/query-615dcbdc8f624ee990566b21f61b495fb273194a8278435cf4ed84028a55dbc0.json b/.sqlx/query-6d9509cd482fbc022bfd157af8e59a1a32f0fbd8802cfec980e05706fb697b58.json similarity index 75% rename from .sqlx/query-615dcbdc8f624ee990566b21f61b495fb273194a8278435cf4ed84028a55dbc0.json rename to .sqlx/query-6d9509cd482fbc022bfd157af8e59a1a32f0fbd8802cfec980e05706fb697b58.json index 1222540..24a317d 100644 --- a/.sqlx/query-615dcbdc8f624ee990566b21f61b495fb273194a8278435cf4ed84028a55dbc0.json +++ b/.sqlx/query-6d9509cd482fbc022bfd157af8e59a1a32f0fbd8802cfec980e05706fb697b58.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1", + "query": "SELECT * FROM bars WHERE asset_symbol = $1 AND timestamp < $2 AND open IS NOT NULL AND high IS NOT NULL AND low IS NOT NULL AND close IS NOT NULL ORDER BY timestamp DESC LIMIT 1", "describe": { "columns": [ { @@ -51,21 +51,21 @@ ], "parameters": { "Left": [ - "Timestamptz", - "Text" + "Text", + "Timestamptz" ] }, "nullable": [ false, false, - false, - false, - false, - false, + true, + true, + true, + true, false, false, false ] }, - "hash": "615dcbdc8f624ee990566b21f61b495fb273194a8278435cf4ed84028a55dbc0" + "hash": "6d9509cd482fbc022bfd157af8e59a1a32f0fbd8802cfec980e05706fb697b58" } diff --git a/src/data/historical.rs b/src/data/historical.rs index b02d145..7a3c0c0 100644 --- a/src/data/historical.rs +++ b/src/data/historical.rs @@ -109,8 +109,7 @@ pub async fn backfill( let bars = bars.into_values().collect::>(); let transaction = app_config.postgres_pool.begin().await.unwrap(); - database::bars::upsert_batch(&app_config.postgres_pool, &bars).await; - database::bars_filled::upsert_batch(&app_config.postgres_pool, &bars).await; + database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await; database::assets::update_timestamp_last_where_symbol( &app_config.postgres_pool, &asset.symbol, @@ -137,7 +136,7 @@ async fn backfill_recent_nulls( from, ) .await; - database::bars_filled::upsert_batch(&app_config.postgres_pool, &bars).await; + database::bars::upsert_batch(&app_config.postgres_pool, &bars, true).await; database::assets::update_timestamp_last_where_symbol( &app_config.postgres_pool, &asset.symbol, diff --git a/src/data/live.rs b/src/data/live.rs index 512f27f..6d12650 100644 --- a/src/data/live.rs +++ b/src/data/live.rs @@ -230,22 +230,21 @@ async fn handle_message( incoming::Message::Bars(bar_message) => { let bar = Bar::from(bar_message); info!("Incoming bar for {}: {}", bar.asset_symbol, bar.timestamp); - - let transaction = app_config.postgres_pool.begin().await.unwrap(); - database::bars::upsert(&app_config.postgres_pool, &bar).await; - if *backfilled.read().await.get(&bar.asset_symbol).unwrap() { - database::bars_filled::upsert(&app_config.postgres_pool, &bar).await; - } - transaction.commit().await.unwrap(); + database::bars::upsert( + &app_config.postgres_pool, + &bar, + backfilled.read().await[&bar.asset_symbol], + ) + .await; } incoming::Message::UpdatedBars(bar_message) => { let bar = Bar::from(bar_message); info!("Incoming bar for {}: {}", bar.asset_symbol, bar.timestamp); let transaction = app_config.postgres_pool.begin().await.unwrap(); - database::bars::upsert(&app_config.postgres_pool, &bar).await; - if *backfilled.read().await.get(&bar.asset_symbol).unwrap() { - database::bars_filled::upsert(&app_config.postgres_pool, &bar).await; + let backfilled_asset_symbol = backfilled.read().await[&bar.asset_symbol]; + database::bars::upsert(&app_config.postgres_pool, &bar, backfilled_asset_symbol).await; + if backfilled_asset_symbol { database::assets::update_timestamp_last_where_symbol( &app_config.postgres_pool, &bar.asset_symbol, @@ -287,21 +286,20 @@ async fn null_handler(app_config: Arc, backfilled: Arc Bar { +pub async fn select_not_null_where_symbol_where_timestamp_smaller_than_order_by_timestamp_desc_limit_one( + postgres_pool: &PgPool, + symbol: &str, + timestamp: &OffsetDateTime, +) -> Bar { + query_as!( + Bar, + r#"SELECT * FROM bars WHERE asset_symbol = $1 AND timestamp < $2 AND open IS NOT NULL AND high IS NOT NULL AND low IS NOT NULL AND close IS NOT NULL ORDER BY timestamp DESC LIMIT 1"#, + symbol, + timestamp + ) + .fetch_one(postgres_pool) + .await + .unwrap() +} + +pub async fn select_where_symbol_where_timestamp_larger_than( + postgres_pool: &PgPool, + symbol: &str, + timestamp: &OffsetDateTime, +) -> Vec { + query_as!( + Bar, + r#"SELECT * FROM bars WHERE asset_symbol = $1 AND timestamp > $2 ORDER BY timestamp ASC"#, + symbol, + timestamp + ) + .fetch_all(postgres_pool) + .await + .unwrap() +} + +pub async fn upsert(postgres_pool: &PgPool, bar: &Bar, backfill: bool) -> Bar { + let mut bar = bar.clone(); + + if backfill + && (bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none()) + { + let filled_bar = select_not_null_where_symbol_where_timestamp_smaller_than_order_by_timestamp_desc_limit_one( + postgres_pool, + &bar.asset_symbol, + &bar.timestamp, + ).await; + bar.merge_empty(&filled_bar); + } + query_as!( Bar, r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) @@ -16,7 +60,20 @@ pub async fn upsert(postgres_pool: &PgPool, bar: &Bar) -> Bar { .unwrap() } -pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar) { +pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar, backfill: bool) { + let mut bar = bar.clone(); + + if backfill + && (bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none()) + { + let filled_bar = select_not_null_where_symbol_where_timestamp_smaller_than_order_by_timestamp_desc_limit_one( + postgres_pool, + &bar.asset_symbol, + &bar.timestamp, + ).await; + bar.merge_empty(&filled_bar); + } + query_as!( Bar, r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) @@ -28,7 +85,27 @@ pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar) { .unwrap(); } -pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec { +pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar], backfill: bool) -> Vec { + let mut bars = bars.to_vec(); + + if bars.is_empty() { + return bars; + } + + if backfill + && (bars[0].open.is_none() + || bars[0].high.is_none() + || bars[0].low.is_none() + || bars[0].close.is_none()) + { + let filled_bar = select_not_null_where_symbol_where_timestamp_smaller_than_order_by_timestamp_desc_limit_one( + postgres_pool, + &bars[0].asset_symbol, + &bars[0].timestamp, + ).await; + bars[0].merge_empty(&filled_bar); + } + let mut timestamp = Vec::with_capacity(bars.len()); let mut asset_symbol = Vec::with_capacity(bars.len()); let mut open = Vec::with_capacity(bars.len()); @@ -39,7 +116,18 @@ pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec { let mut num_trades = Vec::with_capacity(bars.len()); let mut volume_weighted = Vec::with_capacity(bars.len()); - for bar in bars { + let mut last_filled_bar = bars[0].clone(); + + for mut bar in bars { + if backfill { + if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() + { + bar.merge_empty(&last_filled_bar); + } else { + last_filled_bar = bar.clone(); + } + } + timestamp.push(bar.timestamp); asset_symbol.push(bar.asset_symbol.clone()); open.push(bar.open); @@ -71,19 +159,3 @@ pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec { .await .unwrap() } - -pub async fn select_where_symbol_where_timestamp_larger_than( - postgres_pool: &PgPool, - symbol: &str, - timestamp: &OffsetDateTime, -) -> Vec { - query_as!( - Bar, - r#"SELECT * FROM bars WHERE asset_symbol = $1 AND timestamp > $2 ORDER BY timestamp ASC"#, - symbol, - timestamp - ) - .fetch_all(postgres_pool) - .await - .unwrap() -} diff --git a/src/database/bars_filled.rs b/src/database/bars_filled.rs deleted file mode 100644 index 4e9c2b8..0000000 --- a/src/database/bars_filled.rs +++ /dev/null @@ -1,133 +0,0 @@ -use crate::types::Bar; -use sqlx::{query_as, PgPool, Postgres}; -use std::convert::Into; - -pub async fn upsert(postgres_pool: &PgPool, bar: &Bar) -> Bar { - let mut bar = bar.clone(); - - if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() { - let filled_bar = query_as!( - Bar, - r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#, - bar.timestamp, - bar.asset_symbol - ) - .fetch_one(postgres_pool) - .await - .unwrap(); - bar.merge_empty(&filled_bar); - } - - query_as!( - Bar, - r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9 - RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, - bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted - ) - .fetch_one(postgres_pool) - .await - .unwrap() -} - -pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar) { - let mut bar = bar.clone(); - - if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() { - let filled_bar = query_as!( - Bar, - r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#, - bar.timestamp, - bar.asset_symbol - ) - .fetch_one(postgres_pool) - .await - .unwrap(); - bar.merge_empty(&filled_bar); - } - - query_as!( - Bar, - r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (timestamp, asset_symbol) DO NOTHING"#, - bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted - ) - .execute(postgres_pool) - .await - .unwrap(); -} - -pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec { - let mut bars = bars.to_vec(); - - if bars.is_empty() { - return bars; - } - - if bars[0].open.is_none() - || bars[0].high.is_none() - || bars[0].low.is_none() - || bars[0].close.is_none() - { - let filled_bar = &query_as!( - Bar, - r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#, - bars[0].timestamp, - bars[0].asset_symbol - ) - .fetch_one(postgres_pool) - .await - .unwrap(); - bars[0].merge_empty(filled_bar); - } - - let mut timestamp = Vec::with_capacity(bars.len()); - let mut asset_symbol = Vec::with_capacity(bars.len()); - let mut open = Vec::with_capacity(bars.len()); - let mut high = Vec::with_capacity(bars.len()); - let mut low = Vec::with_capacity(bars.len()); - let mut close = Vec::with_capacity(bars.len()); - let mut volume = Vec::with_capacity(bars.len()); - let mut num_trades = Vec::with_capacity(bars.len()); - let mut volume_weighted = Vec::with_capacity(bars.len()); - - let mut last_filled_bar = bars[0].clone(); - - for mut bar in bars { - if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() { - bar.merge_empty(&last_filled_bar); - } else { - last_filled_bar = bar.clone(); - } - - timestamp.push(bar.timestamp); - asset_symbol.push(bar.asset_symbol.clone()); - open.push(bar.open); - high.push(bar.high); - low.push(bar.low); - close.push(bar.close); - volume.push(bar.volume); - num_trades.push(bar.num_trades); - volume_weighted.push(bar.volume_weighted); - } - - // No type-safety here because of NULLABLE bulk insert - query_as::( - r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) - SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[]) - ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, num_trades = EXCLUDED.num_trades, volume_weighted = EXCLUDED.volume_weighted - RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, - ) - .bind(timestamp) - .bind(asset_symbol) - .bind(open) - .bind(high) - .bind(low) - .bind(close) - .bind(volume) - .bind(num_trades) - .bind(volume_weighted) - .fetch_all(postgres_pool) - .await - .unwrap() -} diff --git a/src/database/mod.rs b/src/database/mod.rs index 1c4a99e..5ac2df4 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,3 +1,2 @@ pub mod assets; pub mod bars; -pub mod bars_filled; diff --git a/support/timescaledb/999_init.sh b/support/timescaledb/999_init.sh index 6253f98..640c3bd 100644 --- a/support/timescaledb/999_init.sh +++ b/support/timescaledb/999_init.sh @@ -39,18 +39,10 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL SELECT create_hypertable('bars', 'timestamp', 'asset_symbol', 15); - CREATE TABLE bars_filled ( - timestamp TIMESTAMPTZ, - asset_symbol TEXT REFERENCES assets(symbol) ON DELETE CASCADE ON UPDATE CASCADE, - open DOUBLE PRECISION NOT NULL, - high DOUBLE PRECISION NOT NULL, - low DOUBLE PRECISION NOT NULL, - close DOUBLE PRECISION NOT NULL, - volume DOUBLE PRECISION NOT NULL, - num_trades BIGINT NOT NULL, - volume_weighted DOUBLE PRECISION NOT NULL, - PRIMARY KEY (asset_symbol, timestamp) + ALTER TABLE bars SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'asset_symbol' ); - SELECT create_hypertable('bars_filled', 'timestamp', 'asset_symbol', 15); + SELECT add_compression_policy('bars', INTERVAL '30 days'); EOSQL