Add websocket infinite inserting

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-03-14 01:46:18 +00:00
parent 1707d74cf7
commit 0d276d537c
9 changed files with 154 additions and 93 deletions

View File

@@ -25,11 +25,11 @@ lazy_static! {
env::var("ALPACA_API_KEY").expect("ALPACA_API_KEY must be set."); env::var("ALPACA_API_KEY").expect("ALPACA_API_KEY must be set.");
pub static ref ALPACA_API_SECRET: String = pub static ref ALPACA_API_SECRET: String =
env::var("ALPACA_API_SECRET").expect("ALPACA_API_SECRET must be set."); env::var("ALPACA_API_SECRET").expect("ALPACA_API_SECRET must be set.");
pub static ref BATCH_BACKFILL_BARS_SIZE: usize = env::var("BATCH_BACKFILL_BARS_SIZE") pub static ref CLICKHOUSE_BATCH_BARS_SIZE: usize = env::var("BATCH_BACKFILL_BARS_SIZE")
.expect("BATCH_BACKFILL_BARS_SIZE must be set.") .expect("BATCH_BACKFILL_BARS_SIZE must be set.")
.parse() .parse()
.expect("BATCH_BACKFILL_BARS_SIZE must be a positive integer."); .expect("BATCH_BACKFILL_BARS_SIZE must be a positive integer.");
pub static ref BATCH_BACKFILL_NEWS_SIZE: usize = env::var("BATCH_BACKFILL_NEWS_SIZE") pub static ref CLICKHOUSE_BATCH_NEWS_SIZE: usize = env::var("BATCH_BACKFILL_NEWS_SIZE")
.expect("BATCH_BACKFILL_NEWS_SIZE must be set.") .expect("BATCH_BACKFILL_NEWS_SIZE must be set.")
.parse() .parse()
.expect("BATCH_BACKFILL_NEWS_SIZE must be a positive integer."); .expect("BATCH_BACKFILL_NEWS_SIZE must be a positive integer.");

View File

@@ -8,8 +8,8 @@ mod routes;
mod threads; mod threads;
use config::{ use config::{
Config, ALPACA_API_BASE, ALPACA_MODE, ALPACA_SOURCE, BATCH_BACKFILL_BARS_SIZE, Config, ALPACA_API_BASE, ALPACA_MODE, ALPACA_SOURCE, CLICKHOUSE_BATCH_BARS_SIZE,
BATCH_BACKFILL_NEWS_SIZE, CLICKHOUSE_MAX_CONNECTIONS, CLICKHOUSE_BATCH_NEWS_SIZE, CLICKHOUSE_MAX_CONNECTIONS,
}; };
use dotenv::dotenv; use dotenv::dotenv;
use log4rs::config::Deserializers; use log4rs::config::Deserializers;
@@ -25,8 +25,8 @@ async fn main() {
let _ = *ALPACA_MODE; let _ = *ALPACA_MODE;
let _ = *ALPACA_API_BASE; let _ = *ALPACA_API_BASE;
let _ = *ALPACA_SOURCE; let _ = *ALPACA_SOURCE;
let _ = *BATCH_BACKFILL_BARS_SIZE; let _ = *CLICKHOUSE_BATCH_BARS_SIZE;
let _ = *BATCH_BACKFILL_NEWS_SIZE; let _ = *CLICKHOUSE_BATCH_NEWS_SIZE;
let _ = *CLICKHOUSE_MAX_CONNECTIONS; let _ = *CLICKHOUSE_MAX_CONNECTIONS;
try_join!( try_join!(

View File

@@ -1,7 +1,8 @@
use super::Job; use super::Job;
use crate::{ use crate::{
config::{Config, ALPACA_SOURCE, BATCH_BACKFILL_BARS_SIZE}, config::{Config, ALPACA_SOURCE, CLICKHOUSE_BATCH_BARS_SIZE},
database, database,
threads::data::ThreadType,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use log::{error, info}; use log::{error, info};
@@ -9,8 +10,11 @@ use qrust::{
alpaca, alpaca,
types::{ types::{
self, self,
alpaca::shared::{Sort, Source}, alpaca::{
Backfill, Bar, api::{ALPACA_CRYPTO_DATA_API_URL, ALPACA_US_EQUITY_DATA_API_URL},
shared::{Sort, Source},
},
Backfill, Bar, Class,
}, },
utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE}, utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE},
}; };
@@ -116,7 +120,7 @@ impl super::Handler for Handler {
let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap();
let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap();
let mut bars = Vec::with_capacity(*BATCH_BACKFILL_BARS_SIZE); let mut bars = Vec::with_capacity(*CLICKHOUSE_BATCH_BARS_SIZE);
let mut last_times = HashMap::new(); let mut last_times = HashMap::new();
let mut next_page_token = None; let mut next_page_token = None;
@@ -154,7 +158,7 @@ impl super::Handler for Handler {
} }
} }
if bars.len() < *BATCH_BACKFILL_BARS_SIZE && message.next_page_token.is_some() { if bars.len() < *CLICKHOUSE_BATCH_BARS_SIZE && message.next_page_token.is_some() {
continue; continue;
} }
@@ -198,3 +202,23 @@ impl super::Handler for Handler {
"bars" "bars"
} }
} }
pub fn create_handler(config: Arc<Config>, thread_type: ThreadType) -> Box<dyn super::Handler> {
let data_url = match thread_type {
ThreadType::Bars(Class::UsEquity) => ALPACA_US_EQUITY_DATA_API_URL,
ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_DATA_API_URL,
_ => unreachable!(),
};
let api_query_constructor = match thread_type {
ThreadType::Bars(Class::UsEquity) => us_equity_query_constructor,
ThreadType::Bars(Class::Crypto) => crypto_query_constructor,
_ => unreachable!(),
};
Box::new(Handler {
config,
data_url,
api_query_constructor,
})
}

View File

@@ -1,16 +1,11 @@
mod bars; pub mod bars;
mod news; pub mod news;
use super::ThreadType;
use crate::config::Config;
use async_trait::async_trait; use async_trait::async_trait;
use itertools::Itertools; use itertools::Itertools;
use log::{info, warn}; use log::{info, warn};
use qrust::{ use qrust::{
types::{ types::Backfill,
alpaca::api::{ALPACA_CRYPTO_DATA_API_URL, ALPACA_US_EQUITY_DATA_API_URL},
Backfill, Class,
},
utils::{last_minute, ONE_SECOND}, utils::{last_minute, ONE_SECOND},
}; };
use std::{collections::HashMap, hash::Hash, sync::Arc}; use std::{collections::HashMap, hash::Hash, sync::Arc};
@@ -236,19 +231,3 @@ async fn handle_backfill_message(
message.response.send(()).unwrap(); message.response.send(()).unwrap();
} }
pub fn create_handler(thread_type: ThreadType, config: Arc<Config>) -> Box<dyn Handler> {
match thread_type {
ThreadType::Bars(Class::UsEquity) => Box::new(bars::Handler {
config,
data_url: ALPACA_US_EQUITY_DATA_API_URL,
api_query_constructor: bars::us_equity_query_constructor,
}),
ThreadType::Bars(Class::Crypto) => Box::new(bars::Handler {
config,
data_url: ALPACA_CRYPTO_DATA_API_URL,
api_query_constructor: bars::crypto_query_constructor,
}),
ThreadType::News => Box::new(news::Handler { config }),
}
}

View File

@@ -1,6 +1,6 @@
use super::Job; use super::Job;
use crate::{ use crate::{
config::{Config, ALPACA_SOURCE, BATCH_BACKFILL_NEWS_SIZE}, config::{Config, ALPACA_SOURCE, CLICKHOUSE_BATCH_NEWS_SIZE},
database, database,
}; };
use async_trait::async_trait; use async_trait::async_trait;
@@ -81,7 +81,7 @@ impl super::Handler for Handler {
let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap();
let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap();
let mut news = Vec::with_capacity(*BATCH_BACKFILL_NEWS_SIZE); let mut news = Vec::with_capacity(*CLICKHOUSE_BATCH_NEWS_SIZE);
let mut last_times = HashMap::new(); let mut last_times = HashMap::new();
let mut next_page_token = None; let mut next_page_token = None;
@@ -122,7 +122,7 @@ impl super::Handler for Handler {
news.push(news_item); news.push(news_item);
} }
if news.len() < *BATCH_BACKFILL_NEWS_SIZE && message.next_page_token.is_some() { if news.len() < *CLICKHOUSE_BATCH_NEWS_SIZE && message.next_page_token.is_some() {
continue; continue;
} }
@@ -166,3 +166,7 @@ impl super::Handler for Handler {
"news" "news"
} }
} }
pub fn create_handler(config: Arc<Config>) -> Box<dyn super::Handler> {
Box::new(Handler { config })
}

View File

@@ -65,10 +65,11 @@ pub async fn run(
mut clock_receiver: mpsc::Receiver<clock::Message>, mut clock_receiver: mpsc::Receiver<clock::Message>,
) { ) {
let (bars_us_equity_websocket_sender, bars_us_equity_backfill_sender) = let (bars_us_equity_websocket_sender, bars_us_equity_backfill_sender) =
init_thread(&config, ThreadType::Bars(Class::UsEquity)); init_thread(config.clone(), ThreadType::Bars(Class::UsEquity));
let (bars_crypto_websocket_sender, bars_crypto_backfill_sender) = let (bars_crypto_websocket_sender, bars_crypto_backfill_sender) =
init_thread(&config, ThreadType::Bars(Class::Crypto)); init_thread(config.clone(), ThreadType::Bars(Class::Crypto));
let (news_websocket_sender, news_backfill_sender) = init_thread(&config, ThreadType::News); let (news_websocket_sender, news_backfill_sender) =
init_thread(config.clone(), ThreadType::News);
loop { loop {
select! { select! {
@@ -98,7 +99,7 @@ pub async fn run(
} }
fn init_thread( fn init_thread(
config: &Arc<Config>, config: Arc<Config>,
thread_type: ThreadType, thread_type: ThreadType,
) -> ( ) -> (
mpsc::Sender<websocket::Message>, mpsc::Sender<websocket::Message>,
@@ -112,15 +113,24 @@ fn init_thread(
ThreadType::News => ALPACA_NEWS_DATA_WEBSOCKET_URL.into(), ThreadType::News => ALPACA_NEWS_DATA_WEBSOCKET_URL.into(),
}; };
let backfill_handler = match thread_type {
ThreadType::Bars(_) => backfill::bars::create_handler(config.clone(), thread_type),
ThreadType::News => backfill::news::create_handler(config.clone()),
};
let (backfill_sender, backfill_receiver) = mpsc::channel(100); let (backfill_sender, backfill_receiver) = mpsc::channel(100);
spawn(backfill::run(
Arc::new(backfill::create_handler(thread_type, config.clone())), spawn(backfill::run(backfill_handler.into(), backfill_receiver));
backfill_receiver,
)); let websocket_handler = match thread_type {
ThreadType::Bars(_) => websocket::bars::create_handler(config, thread_type),
ThreadType::News => websocket::news::create_handler(config),
};
let (websocket_sender, websocket_receiver) = mpsc::channel(100); let (websocket_sender, websocket_receiver) = mpsc::channel(100);
spawn(websocket::run( spawn(websocket::run(
Arc::new(websocket::create_handler(thread_type, config.clone())), websocket_handler.into(),
websocket_receiver, websocket_receiver,
websocket_url, websocket_url,
)); ));

View File

@@ -1,16 +1,25 @@
use super::State; use super::State;
use crate::{config::Config, database}; use crate::{
config::{Config, CLICKHOUSE_BATCH_BARS_SIZE},
database,
threads::data::ThreadType,
};
use async_trait::async_trait; use async_trait::async_trait;
use clickhouse::inserter::Inserter;
use log::{debug, error, info}; use log::{debug, error, info};
use qrust::types::{alpaca::websocket, Bar}; use qrust::{
types::{alpaca::websocket, Bar, Class},
utils::ONE_SECOND,
};
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
sync::Arc, sync::Arc,
}; };
use tokio::sync::RwLock; use tokio::sync::{Mutex, RwLock};
pub struct Handler { pub struct Handler {
pub config: Arc<Config>, pub config: Arc<Config>,
pub inserter: Arc<Mutex<Inserter<Bar>>>,
pub subscription_message_constructor: pub subscription_message_constructor:
fn(Vec<String>) -> websocket::data::outgoing::subscribe::Message, fn(Vec<String>) -> websocket::data::outgoing::subscribe::Message,
} }
@@ -83,14 +92,7 @@ impl super::Handler for Handler {
| websocket::data::incoming::Message::UpdatedBar(message) => { | websocket::data::incoming::Message::UpdatedBar(message) => {
let bar = Bar::from(message); let bar = Bar::from(message);
debug!("Received bar for {}: {}.", bar.symbol, bar.time); debug!("Received bar for {}: {}.", bar.symbol, bar.time);
self.inserter.lock().await.write(&bar).await.unwrap();
database::bars::upsert(
&self.config.clickhouse_client,
&self.config.clickhouse_concurrency_limiter,
&bar,
)
.await
.unwrap();
} }
websocket::data::incoming::Message::Status(message) => { websocket::data::incoming::Message::Status(message) => {
debug!( debug!(
@@ -134,4 +136,35 @@ impl super::Handler for Handler {
fn log_string(&self) -> &'static str { fn log_string(&self) -> &'static str {
"bars" "bars"
} }
async fn run_inserter(&self) {
super::run_inserter(self.inserter.clone()).await;
}
}
pub fn create_handler(config: Arc<Config>, thread_type: ThreadType) -> Box<dyn super::Handler> {
let inserter = Arc::new(Mutex::new(
config
.clickhouse_client
.inserter("bars")
.unwrap()
.with_period(Some(ONE_SECOND))
.with_max_entries((*CLICKHOUSE_BATCH_BARS_SIZE).try_into().unwrap()),
));
let subscription_message_constructor = match thread_type {
ThreadType::Bars(Class::UsEquity) => {
websocket::data::outgoing::subscribe::Message::new_market_us_equity
}
ThreadType::Bars(Class::Crypto) => {
websocket::data::outgoing::subscribe::Message::new_market_crypto
}
_ => unreachable!(),
};
Box::new(Handler {
config,
inserter,
subscription_message_constructor,
})
} }

View File

@@ -1,16 +1,14 @@
mod bars; pub mod bars;
mod news; pub mod news;
use super::ThreadType; use crate::config::{ALPACA_API_KEY, ALPACA_API_SECRET};
use crate::config::{Config, ALPACA_API_KEY, ALPACA_API_SECRET};
use async_trait::async_trait; use async_trait::async_trait;
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use clickhouse::{inserter::Inserter, Row};
use futures_util::{future::join_all, SinkExt, StreamExt}; use futures_util::{future::join_all, SinkExt, StreamExt};
use log::error; use log::error;
use qrust::types::{ use qrust::types::alpaca::{self, websocket};
alpaca::{self, websocket}, use serde::Serialize;
Class,
};
use serde_json::{from_str, to_string}; use serde_json::{from_str, to_string};
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
@@ -20,7 +18,7 @@ use std::{
use tokio::{ use tokio::{
net::TcpStream, net::TcpStream,
select, spawn, select, spawn,
sync::{mpsc, oneshot, RwLock}, sync::{mpsc, oneshot, Mutex, RwLock},
}; };
use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream}; use tokio_tungstenite::{connect_async, tungstenite, MaybeTlsStream, WebSocketStream};
@@ -65,7 +63,7 @@ pub struct State {
} }
#[async_trait] #[async_trait]
pub trait Handler: Send + Sync { pub trait Handler: Send + Sync + 'static {
fn create_subscription_message( fn create_subscription_message(
&self, &self,
symbols: Vec<String>, symbols: Vec<String>,
@@ -76,6 +74,7 @@ pub trait Handler: Send + Sync {
message: websocket::data::incoming::Message, message: websocket::data::incoming::Message,
); );
fn log_string(&self) -> &'static str; fn log_string(&self) -> &'static str;
async fn run_inserter(&self);
} }
pub async fn run( pub async fn run(
@@ -89,6 +88,9 @@ pub async fn run(
pending_unsubscriptions: HashMap::new(), pending_unsubscriptions: HashMap::new(),
})); }));
let handler_clone = handler.clone();
spawn(async move { handler_clone.run_inserter().await });
let (sink_sender, sink_receiver) = mpsc::channel(100); let (sink_sender, sink_receiver) = mpsc::channel(100);
let (stream_sender, mut stream_receiver) = mpsc::channel(10_000); let (stream_sender, mut stream_receiver) = mpsc::channel(10_000);
@@ -343,18 +345,13 @@ async fn handle_message(
message.response.send(()).unwrap(); message.response.send(()).unwrap();
} }
pub fn create_handler(thread_type: ThreadType, config: Arc<Config>) -> Box<dyn Handler> { async fn run_inserter<T>(inserter: Arc<Mutex<Inserter<T>>>)
match thread_type { where
ThreadType::Bars(Class::UsEquity) => Box::new(bars::Handler { T: Row + Serialize,
config, {
subscription_message_constructor: loop {
websocket::data::outgoing::subscribe::Message::new_market_us_equity, let time_left = inserter.lock().await.time_left().unwrap();
}), tokio::time::sleep(time_left).await;
ThreadType::Bars(Class::Crypto) => Box::new(bars::Handler { inserter.lock().await.commit().await.unwrap();
config,
subscription_message_constructor:
websocket::data::outgoing::subscribe::Message::new_market_crypto,
}),
ThreadType::News => Box::new(news::Handler { config }),
} }
} }

View File

@@ -1,13 +1,18 @@
use super::State; use super::State;
use crate::{config::Config, database}; use crate::config::{Config, CLICKHOUSE_BATCH_NEWS_SIZE};
use async_trait::async_trait; use async_trait::async_trait;
use clickhouse::inserter::Inserter;
use log::{debug, error, info}; use log::{debug, error, info};
use qrust::types::{alpaca::websocket, News}; use qrust::{
types::{alpaca::websocket, News},
utils::ONE_SECOND,
};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock; use tokio::sync::{Mutex, RwLock};
pub struct Handler { pub struct Handler {
pub config: Arc<Config>, pub config: Arc<Config>,
pub inserter: Arc<Mutex<Inserter<News>>>,
} }
#[async_trait] #[async_trait]
@@ -74,19 +79,11 @@ impl super::Handler for Handler {
} }
websocket::data::incoming::Message::News(message) => { websocket::data::incoming::Message::News(message) => {
let news = News::from(message); let news = News::from(message);
debug!( debug!(
"Received news for {:?}: {}.", "Received news for {:?}: {}.",
news.symbols, news.time_created news.symbols, news.time_created
); );
self.inserter.lock().await.write(&news).await.unwrap();
database::news::upsert(
&self.config.clickhouse_client,
&self.config.clickhouse_concurrency_limiter,
&news,
)
.await
.unwrap();
} }
websocket::data::incoming::Message::Error(message) => { websocket::data::incoming::Message::Error(message) => {
error!("Received error message: {}.", message.message); error!("Received error message: {}.", message.message);
@@ -98,4 +95,21 @@ impl super::Handler for Handler {
fn log_string(&self) -> &'static str { fn log_string(&self) -> &'static str {
"news" "news"
} }
async fn run_inserter(&self) {
super::run_inserter(self.inserter.clone()).await;
}
}
pub fn create_handler(config: Arc<Config>) -> Box<dyn super::Handler> {
let inserter = Arc::new(Mutex::new(
config
.clickhouse_client
.inserter("news")
.unwrap()
.with_period(Some(ONE_SECOND))
.with_max_entries((*CLICKHOUSE_BATCH_NEWS_SIZE).try_into().unwrap()),
));
Box::new(Handler { config, inserter })
} }