pub mod assets; pub mod backfills_bars; pub mod backfills_news; pub mod bars; pub mod calendar; pub mod news; pub mod orders; use clickhouse::{error::Error, Client}; use log::info; use tokio::try_join; #[macro_export] macro_rules! select { ($record:ty, $table_name:expr) => { pub async fn select( client: &clickhouse::Client, ) -> Result, clickhouse::error::Error> { client .query(&format!("SELECT ?fields FROM {} FINAL", $table_name)) .fetch_all::<$record>() .await } }; } #[macro_export] macro_rules! select_where_symbol { ($record:ty, $table_name:expr) => { pub async fn select_where_symbol( client: &clickhouse::Client, symbol: &T, ) -> Result, clickhouse::error::Error> where T: AsRef + serde::Serialize + Send + Sync, { client .query(&format!( "SELECT ?fields FROM {} FINAL WHERE symbol = ?", $table_name )) .bind(symbol) .fetch_optional::<$record>() .await } }; } #[macro_export] macro_rules! upsert { ($record:ty, $table_name:expr) => { pub async fn upsert( client: &clickhouse::Client, record: &$record, ) -> Result<(), clickhouse::error::Error> { let mut insert = client.insert($table_name)?; insert.write(record).await?; insert.end().await } }; } #[macro_export] macro_rules! upsert_batch { ($record:ty, $table_name:expr) => { pub async fn upsert_batch<'a, T>( client: &clickhouse::Client, records: T, ) -> Result<(), clickhouse::error::Error> where T: IntoIterator + Send + Sync, T::IntoIter: Send, { let mut insert = client.insert($table_name)?; for record in records { insert.write(record).await?; } insert.end().await } }; } #[macro_export] macro_rules! delete_where_symbols { ($table_name:expr) => { pub async fn delete_where_symbols( client: &clickhouse::Client, symbols: &[T], ) -> Result<(), clickhouse::error::Error> where T: AsRef + serde::Serialize + Send + Sync, { client .query(&format!("DELETE FROM {} WHERE symbol IN ?", $table_name)) .bind(symbols) .execute() .await } }; } #[macro_export] macro_rules! cleanup { ($table_name:expr) => { pub async fn cleanup(client: &clickhouse::Client) -> Result<(), clickhouse::error::Error> { client .query(&format!( "DELETE FROM {} WHERE symbol NOT IN (SELECT symbol FROM assets)", $table_name )) .execute() .await } }; } #[macro_export] macro_rules! optimize { ($table_name:expr) => { pub async fn optimize(client: &clickhouse::Client) -> Result<(), clickhouse::error::Error> { client .query(&format!("OPTIMIZE TABLE {} FINAL", $table_name)) .execute() .await } }; } pub async fn cleanup_all(clickhouse_client: &Client) -> Result<(), Error> { info!("Cleaning up database."); try_join!( bars::cleanup(clickhouse_client), news::cleanup(clickhouse_client), backfills_bars::cleanup(clickhouse_client), backfills_news::cleanup(clickhouse_client) ) .map(|_| ()) } pub async fn optimize_all(clickhouse_client: &Client) -> Result<(), Error> { info!("Optimizing database."); try_join!( assets::optimize(clickhouse_client), bars::optimize(clickhouse_client), news::optimize(clickhouse_client), backfills_bars::optimize(clickhouse_client), backfills_news::optimize(clickhouse_client), orders::optimize(clickhouse_client), calendar::optimize(clickhouse_client) ) .map(|_| ()) }