327 lines
9.5 KiB
Rust
327 lines
9.5 KiB
Rust
mod backfill;
|
|
mod websocket;
|
|
|
|
use super::clock;
|
|
use crate::{
|
|
config::{
|
|
Config, ALPACA_CRYPTO_DATA_WEBSOCKET_URL, ALPACA_NEWS_DATA_WEBSOCKET_URL, ALPACA_SOURCE,
|
|
ALPACA_STOCK_DATA_WEBSOCKET_URL,
|
|
},
|
|
create_send_await, database,
|
|
types::{alpaca, Asset, Class},
|
|
utils::backoff,
|
|
};
|
|
use futures_util::{future::join_all, StreamExt};
|
|
use itertools::{Either, Itertools};
|
|
use std::sync::Arc;
|
|
use tokio::{
|
|
join, select, spawn,
|
|
sync::{mpsc, oneshot},
|
|
};
|
|
use tokio_tungstenite::connect_async;
|
|
|
|
#[derive(Clone, Copy)]
|
|
#[allow(dead_code)]
|
|
pub enum Action {
|
|
Add,
|
|
Enable,
|
|
Remove,
|
|
Disable,
|
|
}
|
|
|
|
pub struct Message {
|
|
pub action: Action,
|
|
pub assets: Vec<(String, Class)>,
|
|
pub response: oneshot::Sender<()>,
|
|
}
|
|
|
|
impl Message {
|
|
pub fn new(action: Action, assets: Vec<(String, Class)>) -> (Self, oneshot::Receiver<()>) {
|
|
let (sender, receiver) = oneshot::channel();
|
|
(
|
|
Self {
|
|
action,
|
|
assets,
|
|
response: sender,
|
|
},
|
|
receiver,
|
|
)
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy)]
|
|
pub enum ThreadType {
|
|
Bars(Class),
|
|
News,
|
|
}
|
|
|
|
pub async fn run(
|
|
config: Arc<Config>,
|
|
mut receiver: mpsc::Receiver<Message>,
|
|
mut clock_receiver: mpsc::Receiver<clock::Message>,
|
|
) {
|
|
let (bars_us_equity_websocket_sender, bars_us_equity_backfill_sender) =
|
|
init_thread(config.clone(), ThreadType::Bars(Class::UsEquity)).await;
|
|
let (bars_crypto_websocket_sender, bars_crypto_backfill_sender) =
|
|
init_thread(config.clone(), ThreadType::Bars(Class::Crypto)).await;
|
|
let (news_websocket_sender, news_backfill_sender) =
|
|
init_thread(config.clone(), ThreadType::News).await;
|
|
|
|
loop {
|
|
select! {
|
|
Some(message) = receiver.recv() => {
|
|
spawn(handle_message(
|
|
config.clone(),
|
|
bars_us_equity_websocket_sender.clone(),
|
|
bars_us_equity_backfill_sender.clone(),
|
|
bars_crypto_websocket_sender.clone(),
|
|
bars_crypto_backfill_sender.clone(),
|
|
news_websocket_sender.clone(),
|
|
news_backfill_sender.clone(),
|
|
message,
|
|
));
|
|
}
|
|
Some(_) = clock_receiver.recv() => {
|
|
spawn(handle_clock_message(
|
|
config.clone(),
|
|
bars_us_equity_backfill_sender.clone(),
|
|
bars_crypto_backfill_sender.clone(),
|
|
news_backfill_sender.clone(),
|
|
));
|
|
}
|
|
else => panic!("Communication channel unexpectedly closed.")
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn init_thread(
|
|
config: Arc<Config>,
|
|
thread_type: ThreadType,
|
|
) -> (
|
|
mpsc::Sender<websocket::Message>,
|
|
mpsc::Sender<backfill::Message>,
|
|
) {
|
|
let websocket_url = match thread_type {
|
|
ThreadType::Bars(Class::UsEquity) => {
|
|
format!("{}/{}", ALPACA_STOCK_DATA_WEBSOCKET_URL, *ALPACA_SOURCE)
|
|
}
|
|
ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_DATA_WEBSOCKET_URL.into(),
|
|
ThreadType::News => ALPACA_NEWS_DATA_WEBSOCKET_URL.into(),
|
|
};
|
|
|
|
let (websocket, _) = connect_async(websocket_url).await.unwrap();
|
|
let (mut websocket_sink, mut websocket_stream) = websocket.split();
|
|
alpaca::websocket::data::authenticate(&mut websocket_sink, &mut websocket_stream).await;
|
|
|
|
let (backfill_sender, backfill_receiver) = mpsc::channel(100);
|
|
spawn(backfill::run(
|
|
Arc::new(backfill::create_handler(thread_type, config.clone())),
|
|
backfill_receiver,
|
|
));
|
|
|
|
let (websocket_sender, websocket_receiver) = mpsc::channel(100);
|
|
spawn(websocket::run(
|
|
Arc::new(websocket::create_handler(thread_type, config.clone())),
|
|
websocket_receiver,
|
|
websocket_stream,
|
|
websocket_sink,
|
|
));
|
|
|
|
(websocket_sender, backfill_sender)
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[allow(clippy::too_many_lines)]
|
|
async fn handle_message(
|
|
config: Arc<Config>,
|
|
bars_us_equity_websocket_sender: mpsc::Sender<websocket::Message>,
|
|
bars_us_equity_backfill_sender: mpsc::Sender<backfill::Message>,
|
|
bars_crypto_websocket_sender: mpsc::Sender<websocket::Message>,
|
|
bars_crypto_backfill_sender: mpsc::Sender<backfill::Message>,
|
|
news_websocket_sender: mpsc::Sender<websocket::Message>,
|
|
news_backfill_sender: mpsc::Sender<backfill::Message>,
|
|
message: Message,
|
|
) {
|
|
if message.assets.is_empty() {
|
|
message.response.send(()).unwrap();
|
|
return;
|
|
}
|
|
|
|
let (us_equity_symbols, crypto_symbols): (Vec<_>, Vec<_>) = message
|
|
.assets
|
|
.clone()
|
|
.into_iter()
|
|
.partition_map(|asset| match asset.1 {
|
|
Class::UsEquity => Either::Left(asset.0),
|
|
Class::Crypto => Either::Right(asset.0),
|
|
});
|
|
|
|
let symbols = message
|
|
.assets
|
|
.into_iter()
|
|
.map(|(symbol, _)| symbol)
|
|
.collect::<Vec<_>>();
|
|
|
|
let bars_us_equity_future = async {
|
|
if us_equity_symbols.is_empty() {
|
|
return;
|
|
}
|
|
|
|
create_send_await!(
|
|
bars_us_equity_websocket_sender,
|
|
websocket::Message::new,
|
|
message.action.into(),
|
|
us_equity_symbols.clone()
|
|
);
|
|
|
|
create_send_await!(
|
|
bars_us_equity_backfill_sender,
|
|
backfill::Message::new,
|
|
message.action.into(),
|
|
us_equity_symbols
|
|
);
|
|
};
|
|
|
|
let bars_crypto_future = async {
|
|
if crypto_symbols.is_empty() {
|
|
return;
|
|
}
|
|
|
|
create_send_await!(
|
|
bars_crypto_websocket_sender,
|
|
websocket::Message::new,
|
|
message.action.into(),
|
|
crypto_symbols.clone()
|
|
);
|
|
|
|
create_send_await!(
|
|
bars_crypto_backfill_sender,
|
|
backfill::Message::new,
|
|
message.action.into(),
|
|
crypto_symbols
|
|
);
|
|
};
|
|
|
|
let news_future = async {
|
|
create_send_await!(
|
|
news_websocket_sender,
|
|
websocket::Message::new,
|
|
message.action.into(),
|
|
symbols.clone()
|
|
);
|
|
|
|
create_send_await!(
|
|
news_backfill_sender,
|
|
backfill::Message::new,
|
|
message.action.into(),
|
|
symbols.clone()
|
|
);
|
|
};
|
|
|
|
join!(bars_us_equity_future, bars_crypto_future, news_future);
|
|
|
|
match message.action {
|
|
Action::Add => {
|
|
let assets = join_all(symbols.into_iter().map(|symbol| {
|
|
let config = config.clone();
|
|
async move {
|
|
let asset_future = async {
|
|
alpaca::api::incoming::asset::get_by_symbol(
|
|
&config.alpaca_client,
|
|
&config.alpaca_rate_limiter,
|
|
&symbol,
|
|
Some(backoff::infinite()),
|
|
)
|
|
.await
|
|
.unwrap()
|
|
};
|
|
|
|
let position_future = async {
|
|
alpaca::api::incoming::position::get_by_symbol(
|
|
&config.alpaca_client,
|
|
&config.alpaca_rate_limiter,
|
|
&symbol,
|
|
Some(backoff::infinite()),
|
|
)
|
|
.await
|
|
.unwrap()
|
|
};
|
|
|
|
let (asset, position) = join!(asset_future, position_future);
|
|
Asset::from((asset, position))
|
|
}
|
|
}))
|
|
.await;
|
|
|
|
database::assets::upsert_batch(&config.clickhouse_client, &assets)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
Action::Remove => {
|
|
database::assets::delete_where_symbols(&config.clickhouse_client, &symbols)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
_ => {}
|
|
}
|
|
|
|
message.response.send(()).unwrap();
|
|
}
|
|
|
|
async fn handle_clock_message(
|
|
config: Arc<Config>,
|
|
bars_us_equity_backfill_sender: mpsc::Sender<backfill::Message>,
|
|
bars_crypto_backfill_sender: mpsc::Sender<backfill::Message>,
|
|
news_backfill_sender: mpsc::Sender<backfill::Message>,
|
|
) {
|
|
database::cleanup_all(&config.clickhouse_client)
|
|
.await
|
|
.unwrap();
|
|
|
|
let assets = database::assets::select(&config.clickhouse_client)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (us_equity_symbols, crypto_symbols): (Vec<_>, Vec<_>) = assets
|
|
.clone()
|
|
.into_iter()
|
|
.partition_map(|asset| match asset.class {
|
|
Class::UsEquity => Either::Left(asset.symbol),
|
|
Class::Crypto => Either::Right(asset.symbol),
|
|
});
|
|
|
|
let symbols = assets
|
|
.into_iter()
|
|
.map(|asset| asset.symbol)
|
|
.collect::<Vec<_>>();
|
|
|
|
let bars_us_equity_future = async {
|
|
create_send_await!(
|
|
bars_us_equity_backfill_sender,
|
|
backfill::Message::new,
|
|
Some(backfill::Action::Backfill),
|
|
us_equity_symbols.clone()
|
|
);
|
|
};
|
|
|
|
let bars_crypto_future = async {
|
|
create_send_await!(
|
|
bars_crypto_backfill_sender,
|
|
backfill::Message::new,
|
|
Some(backfill::Action::Backfill),
|
|
crypto_symbols.clone()
|
|
);
|
|
};
|
|
|
|
let news_future = async {
|
|
create_send_await!(
|
|
news_backfill_sender,
|
|
backfill::Message::new,
|
|
Some(backfill::Action::Backfill),
|
|
symbols
|
|
);
|
|
};
|
|
|
|
join!(bars_us_equity_future, bars_crypto_future, news_future);
|
|
}
|