From 0d276d537c14f0ea1814d2c9efbc6e8e0e7ca537 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Thu, 14 Mar 2024 01:46:18 +0000 Subject: [PATCH] Add websocket infinite inserting Signed-off-by: Nikolaos Karaolidis --- src/config.rs | 4 +-- src/main.rs | 8 ++--- src/threads/data/backfill/bars.rs | 34 +++++++++++++++--- src/threads/data/backfill/mod.rs | 27 ++------------- src/threads/data/backfill/news.rs | 10 ++++-- src/threads/data/mod.rs | 28 ++++++++++----- src/threads/data/websocket/bars.rs | 55 ++++++++++++++++++++++++------ src/threads/data/websocket/mod.rs | 43 +++++++++++------------ src/threads/data/websocket/news.rs | 38 ++++++++++++++------- 9 files changed, 154 insertions(+), 93 deletions(-) diff --git a/src/config.rs b/src/config.rs index dcac036..e21dd66 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,11 +25,11 @@ lazy_static! { env::var("ALPACA_API_KEY").expect("ALPACA_API_KEY must be set."); pub static ref ALPACA_API_SECRET: String = env::var("ALPACA_API_SECRET").expect("ALPACA_API_SECRET must be set."); - pub static ref BATCH_BACKFILL_BARS_SIZE: usize = env::var("BATCH_BACKFILL_BARS_SIZE") + pub static ref CLICKHOUSE_BATCH_BARS_SIZE: usize = env::var("BATCH_BACKFILL_BARS_SIZE") .expect("BATCH_BACKFILL_BARS_SIZE must be set.") .parse() .expect("BATCH_BACKFILL_BARS_SIZE must be a positive integer."); - pub static ref BATCH_BACKFILL_NEWS_SIZE: usize = env::var("BATCH_BACKFILL_NEWS_SIZE") + pub static ref CLICKHOUSE_BATCH_NEWS_SIZE: usize = env::var("BATCH_BACKFILL_NEWS_SIZE") .expect("BATCH_BACKFILL_NEWS_SIZE must be set.") .parse() .expect("BATCH_BACKFILL_NEWS_SIZE must be a positive integer."); diff --git a/src/main.rs b/src/main.rs index 81554fd..2d05246 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,8 +8,8 @@ mod routes; mod threads; use config::{ - Config, ALPACA_API_BASE, ALPACA_MODE, ALPACA_SOURCE, BATCH_BACKFILL_BARS_SIZE, - BATCH_BACKFILL_NEWS_SIZE, CLICKHOUSE_MAX_CONNECTIONS, + Config, ALPACA_API_BASE, ALPACA_MODE, ALPACA_SOURCE, CLICKHOUSE_BATCH_BARS_SIZE, + CLICKHOUSE_BATCH_NEWS_SIZE, CLICKHOUSE_MAX_CONNECTIONS, }; use dotenv::dotenv; use log4rs::config::Deserializers; @@ -25,8 +25,8 @@ async fn main() { let _ = *ALPACA_MODE; let _ = *ALPACA_API_BASE; let _ = *ALPACA_SOURCE; - let _ = *BATCH_BACKFILL_BARS_SIZE; - let _ = *BATCH_BACKFILL_NEWS_SIZE; + let _ = *CLICKHOUSE_BATCH_BARS_SIZE; + let _ = *CLICKHOUSE_BATCH_NEWS_SIZE; let _ = *CLICKHOUSE_MAX_CONNECTIONS; try_join!( diff --git a/src/threads/data/backfill/bars.rs b/src/threads/data/backfill/bars.rs index 09f12ae..a5d0014 100644 --- a/src/threads/data/backfill/bars.rs +++ b/src/threads/data/backfill/bars.rs @@ -1,7 +1,8 @@ use super::Job; use crate::{ - config::{Config, ALPACA_SOURCE, BATCH_BACKFILL_BARS_SIZE}, + config::{Config, ALPACA_SOURCE, CLICKHOUSE_BATCH_BARS_SIZE}, database, + threads::data::ThreadType, }; use async_trait::async_trait; use log::{error, info}; @@ -9,8 +10,11 @@ use qrust::{ alpaca, types::{ self, - alpaca::shared::{Sort, Source}, - Backfill, Bar, + alpaca::{ + api::{ALPACA_CRYPTO_DATA_API_URL, ALPACA_US_EQUITY_DATA_API_URL}, + shared::{Sort, Source}, + }, + Backfill, Bar, Class, }, utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE}, }; @@ -116,7 +120,7 @@ impl super::Handler for Handler { let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); - let mut bars = Vec::with_capacity(*BATCH_BACKFILL_BARS_SIZE); + let mut bars = Vec::with_capacity(*CLICKHOUSE_BATCH_BARS_SIZE); let mut last_times = HashMap::new(); let mut next_page_token = None; @@ -154,7 +158,7 @@ impl super::Handler for Handler { } } - if bars.len() < *BATCH_BACKFILL_BARS_SIZE && message.next_page_token.is_some() { + if bars.len() < *CLICKHOUSE_BATCH_BARS_SIZE && message.next_page_token.is_some() { continue; } @@ -198,3 +202,23 @@ impl super::Handler for Handler { "bars" } } + +pub fn create_handler(config: Arc, thread_type: ThreadType) -> Box { + let data_url = match thread_type { + ThreadType::Bars(Class::UsEquity) => ALPACA_US_EQUITY_DATA_API_URL, + ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_DATA_API_URL, + _ => unreachable!(), + }; + + let api_query_constructor = match thread_type { + ThreadType::Bars(Class::UsEquity) => us_equity_query_constructor, + ThreadType::Bars(Class::Crypto) => crypto_query_constructor, + _ => unreachable!(), + }; + + Box::new(Handler { + config, + data_url, + api_query_constructor, + }) +} diff --git a/src/threads/data/backfill/mod.rs b/src/threads/data/backfill/mod.rs index 0a38dba..e5e7590 100644 --- a/src/threads/data/backfill/mod.rs +++ b/src/threads/data/backfill/mod.rs @@ -1,16 +1,11 @@ -mod bars; -mod news; +pub mod bars; +pub mod news; -use super::ThreadType; -use crate::config::Config; use async_trait::async_trait; use itertools::Itertools; use log::{info, warn}; use qrust::{ - types::{ - alpaca::api::{ALPACA_CRYPTO_DATA_API_URL, ALPACA_US_EQUITY_DATA_API_URL}, - Backfill, Class, - }, + types::Backfill, utils::{last_minute, ONE_SECOND}, }; use std::{collections::HashMap, hash::Hash, sync::Arc}; @@ -236,19 +231,3 @@ async fn handle_backfill_message( message.response.send(()).unwrap(); } - -pub fn create_handler(thread_type: ThreadType, config: Arc) -> Box { - match thread_type { - ThreadType::Bars(Class::UsEquity) => Box::new(bars::Handler { - config, - data_url: ALPACA_US_EQUITY_DATA_API_URL, - api_query_constructor: bars::us_equity_query_constructor, - }), - ThreadType::Bars(Class::Crypto) => Box::new(bars::Handler { - config, - data_url: ALPACA_CRYPTO_DATA_API_URL, - api_query_constructor: bars::crypto_query_constructor, - }), - ThreadType::News => Box::new(news::Handler { config }), - } -} diff --git a/src/threads/data/backfill/news.rs b/src/threads/data/backfill/news.rs index a161aec..a6787ad 100644 --- a/src/threads/data/backfill/news.rs +++ b/src/threads/data/backfill/news.rs @@ -1,6 +1,6 @@ use super::Job; use crate::{ - config::{Config, ALPACA_SOURCE, BATCH_BACKFILL_NEWS_SIZE}, + config::{Config, ALPACA_SOURCE, CLICKHOUSE_BATCH_NEWS_SIZE}, database, }; use async_trait::async_trait; @@ -81,7 +81,7 @@ impl super::Handler for Handler { let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); - let mut news = Vec::with_capacity(*BATCH_BACKFILL_NEWS_SIZE); + let mut news = Vec::with_capacity(*CLICKHOUSE_BATCH_NEWS_SIZE); let mut last_times = HashMap::new(); let mut next_page_token = None; @@ -122,7 +122,7 @@ impl super::Handler for Handler { news.push(news_item); } - if news.len() < *BATCH_BACKFILL_NEWS_SIZE && message.next_page_token.is_some() { + if news.len() < *CLICKHOUSE_BATCH_NEWS_SIZE && message.next_page_token.is_some() { continue; } @@ -166,3 +166,7 @@ impl super::Handler for Handler { "news" } } + +pub fn create_handler(config: Arc) -> Box { + Box::new(Handler { config }) +} diff --git a/src/threads/data/mod.rs b/src/threads/data/mod.rs index 940ef0b..7ff0587 100644 --- a/src/threads/data/mod.rs +++ b/src/threads/data/mod.rs @@ -65,10 +65,11 @@ pub async fn run( mut clock_receiver: mpsc::Receiver, ) { let (bars_us_equity_websocket_sender, bars_us_equity_backfill_sender) = - init_thread(&config, ThreadType::Bars(Class::UsEquity)); + init_thread(config.clone(), ThreadType::Bars(Class::UsEquity)); let (bars_crypto_websocket_sender, bars_crypto_backfill_sender) = - init_thread(&config, ThreadType::Bars(Class::Crypto)); - let (news_websocket_sender, news_backfill_sender) = init_thread(&config, ThreadType::News); + init_thread(config.clone(), ThreadType::Bars(Class::Crypto)); + let (news_websocket_sender, news_backfill_sender) = + init_thread(config.clone(), ThreadType::News); loop { select! { @@ -98,7 +99,7 @@ pub async fn run( } fn init_thread( - config: &Arc, + config: Arc, thread_type: ThreadType, ) -> ( mpsc::Sender, @@ -112,15 +113,24 @@ fn init_thread( ThreadType::News => ALPACA_NEWS_DATA_WEBSOCKET_URL.into(), }; + let backfill_handler = match thread_type { + ThreadType::Bars(_) => backfill::bars::create_handler(config.clone(), thread_type), + ThreadType::News => backfill::news::create_handler(config.clone()), + }; + let (backfill_sender, backfill_receiver) = mpsc::channel(100); - spawn(backfill::run( - Arc::new(backfill::create_handler(thread_type, config.clone())), - backfill_receiver, - )); + + spawn(backfill::run(backfill_handler.into(), backfill_receiver)); + + let websocket_handler = match thread_type { + ThreadType::Bars(_) => websocket::bars::create_handler(config, thread_type), + ThreadType::News => websocket::news::create_handler(config), + }; let (websocket_sender, websocket_receiver) = mpsc::channel(100); + spawn(websocket::run( - Arc::new(websocket::create_handler(thread_type, config.clone())), + websocket_handler.into(), websocket_receiver, websocket_url, )); diff --git a/src/threads/data/websocket/bars.rs b/src/threads/data/websocket/bars.rs index ef5fc4a..166c1f4 100644 --- a/src/threads/data/websocket/bars.rs +++ b/src/threads/data/websocket/bars.rs @@ -1,16 +1,25 @@ use super::State; -use crate::{config::Config, database}; +use crate::{ + config::{Config, CLICKHOUSE_BATCH_BARS_SIZE}, + database, + threads::data::ThreadType, +}; use async_trait::async_trait; +use clickhouse::inserter::Inserter; use log::{debug, error, info}; -use qrust::types::{alpaca::websocket, Bar}; +use qrust::{ + types::{alpaca::websocket, Bar, Class}, + utils::ONE_SECOND, +}; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; pub struct Handler { pub config: Arc, + pub inserter: Arc>>, pub subscription_message_constructor: fn(Vec) -> websocket::data::outgoing::subscribe::Message, } @@ -83,14 +92,7 @@ impl super::Handler for Handler { | websocket::data::incoming::Message::UpdatedBar(message) => { let bar = Bar::from(message); debug!("Received bar for {}: {}.", bar.symbol, bar.time); - - database::bars::upsert( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &bar, - ) - .await - .unwrap(); + self.inserter.lock().await.write(&bar).await.unwrap(); } websocket::data::incoming::Message::Status(message) => { debug!( @@ -134,4 +136,35 @@ impl super::Handler for Handler { fn log_string(&self) -> &'static str { "bars" } + + async fn run_inserter(&self) { + super::run_inserter(self.inserter.clone()).await; + } +} + +pub fn create_handler(config: Arc, thread_type: ThreadType) -> Box { + let inserter = Arc::new(Mutex::new( + config + .clickhouse_client + .inserter("bars") + .unwrap() + .with_period(Some(ONE_SECOND)) + .with_max_entries((*CLICKHOUSE_BATCH_BARS_SIZE).try_into().unwrap()), + )); + + let subscription_message_constructor = match thread_type { + ThreadType::Bars(Class::UsEquity) => { + websocket::data::outgoing::subscribe::Message::new_market_us_equity + } + ThreadType::Bars(Class::Crypto) => { + websocket::data::outgoing::subscribe::Message::new_market_crypto + } + _ => unreachable!(), + }; + + Box::new(Handler { + config, + inserter, + subscription_message_constructor, + }) } diff --git a/src/threads/data/websocket/mod.rs b/src/threads/data/websocket/mod.rs index 995145e..b9818ad 100644 --- a/src/threads/data/websocket/mod.rs +++ b/src/threads/data/websocket/mod.rs @@ -1,16 +1,14 @@ -mod bars; -mod news; +pub mod bars; +pub mod news; -use super::ThreadType; -use crate::config::{Config, ALPACA_API_KEY, ALPACA_API_SECRET}; +use crate::config::{ALPACA_API_KEY, ALPACA_API_SECRET}; use async_trait::async_trait; use backoff::{future::retry_notify, ExponentialBackoff}; +use clickhouse::{inserter::Inserter, Row}; use futures_util::{future::join_all, SinkExt, StreamExt}; use log::error; -use qrust::types::{ - alpaca::{self, websocket}, - Class, -}; +use qrust::types::alpaca::{self, websocket}; +use serde::Serialize; use serde_json::{from_str, to_string}; use std::{ collections::{HashMap, HashSet}, @@ -20,7 +18,7 @@ use std::{ use tokio::{ net::TcpStream, select, spawn, - sync::{mpsc, oneshot, RwLock}, + sync::{mpsc, oneshot, Mutex, RwLock}, }; use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream}; @@ -65,7 +63,7 @@ pub struct State { } #[async_trait] -pub trait Handler: Send + Sync { +pub trait Handler: Send + Sync + 'static { fn create_subscription_message( &self, symbols: Vec, @@ -76,6 +74,7 @@ pub trait Handler: Send + Sync { message: websocket::data::incoming::Message, ); fn log_string(&self) -> &'static str; + async fn run_inserter(&self); } pub async fn run( @@ -89,6 +88,9 @@ pub async fn run( pending_unsubscriptions: HashMap::new(), })); + let handler_clone = handler.clone(); + spawn(async move { handler_clone.run_inserter().await }); + let (sink_sender, sink_receiver) = mpsc::channel(100); let (stream_sender, mut stream_receiver) = mpsc::channel(10_000); @@ -343,18 +345,13 @@ async fn handle_message( message.response.send(()).unwrap(); } -pub fn create_handler(thread_type: ThreadType, config: Arc) -> Box { - match thread_type { - ThreadType::Bars(Class::UsEquity) => Box::new(bars::Handler { - config, - subscription_message_constructor: - websocket::data::outgoing::subscribe::Message::new_market_us_equity, - }), - ThreadType::Bars(Class::Crypto) => Box::new(bars::Handler { - config, - subscription_message_constructor: - websocket::data::outgoing::subscribe::Message::new_market_crypto, - }), - ThreadType::News => Box::new(news::Handler { config }), +async fn run_inserter(inserter: Arc>>) +where + T: Row + Serialize, +{ + loop { + let time_left = inserter.lock().await.time_left().unwrap(); + tokio::time::sleep(time_left).await; + inserter.lock().await.commit().await.unwrap(); } } diff --git a/src/threads/data/websocket/news.rs b/src/threads/data/websocket/news.rs index 9dfc0e1..2b36300 100644 --- a/src/threads/data/websocket/news.rs +++ b/src/threads/data/websocket/news.rs @@ -1,13 +1,18 @@ use super::State; -use crate::{config::Config, database}; +use crate::config::{Config, CLICKHOUSE_BATCH_NEWS_SIZE}; use async_trait::async_trait; +use clickhouse::inserter::Inserter; use log::{debug, error, info}; -use qrust::types::{alpaca::websocket, News}; +use qrust::{ + types::{alpaca::websocket, News}, + utils::ONE_SECOND, +}; use std::{collections::HashMap, sync::Arc}; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; pub struct Handler { pub config: Arc, + pub inserter: Arc>>, } #[async_trait] @@ -74,19 +79,11 @@ impl super::Handler for Handler { } websocket::data::incoming::Message::News(message) => { let news = News::from(message); - debug!( "Received news for {:?}: {}.", news.symbols, news.time_created ); - - database::news::upsert( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &news, - ) - .await - .unwrap(); + self.inserter.lock().await.write(&news).await.unwrap(); } websocket::data::incoming::Message::Error(message) => { error!("Received error message: {}.", message.message); @@ -98,4 +95,21 @@ impl super::Handler for Handler { fn log_string(&self) -> &'static str { "news" } + + async fn run_inserter(&self) { + super::run_inserter(self.inserter.clone()).await; + } +} + +pub fn create_handler(config: Arc) -> Box { + let inserter = Arc::new(Mutex::new( + config + .clickhouse_client + .inserter("news") + .unwrap() + .with_period(Some(ONE_SECOND)) + .with_max_entries((*CLICKHOUSE_BATCH_NEWS_SIZE).try_into().unwrap()), + )); + + Box::new(Handler { config, inserter }) }