Add order/position management

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-02-14 17:07:30 +00:00
parent 6ec71ee144
commit 648d413ac7
44 changed files with 826 additions and 497 deletions

View File

@@ -4,17 +4,13 @@ use crate::{
database,
types::{
alpaca::{
self,
api::{self, outgoing::Sort},
shared::Source,
api,
shared::{Sort, Source},
},
news::Prediction,
Backfill, Bar, Class, News,
},
utils::{
duration_until, last_minute, remove_slash_from_pair, FIFTEEN_MINUTES, ONE_MINUTE,
ONE_SECOND,
},
utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE, ONE_SECOND},
};
use async_trait::async_trait;
use futures_util::future::join_all;
@@ -216,21 +212,12 @@ impl Handler for BarHandler {
&self,
symbol: String,
) -> Result<Option<Backfill>, clickhouse::error::Error> {
database::backfills::select_latest_where_symbol(
&self.config.clickhouse_client,
&database::backfills::Table::Bars,
&symbol,
)
.await
database::backfills_bars::select_where_symbol(&self.config.clickhouse_client, &symbol).await
}
async fn delete_backfills(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> {
database::backfills::delete_where_symbols(
&self.config.clickhouse_client,
&database::backfills::Table::Bars,
symbols,
)
.await
database::backfills_bars::delete_where_symbols(&self.config.clickhouse_client, symbols)
.await
}
async fn delete_data(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> {
@@ -252,7 +239,7 @@ impl Handler for BarHandler {
let mut next_page_token = None;
loop {
let Ok(message) = alpaca::api::incoming::bar::get_historical(
let Ok(message) = api::incoming::bar::get_historical(
&self.config,
self.data_url,
&(self.api_query_constructor)(
@@ -289,16 +276,12 @@ impl Handler for BarHandler {
let backfill = bars.last().unwrap().clone().into();
database::bars::upsert_batch(&self.config.clickhouse_client, bars)
database::bars::upsert_batch(&self.config.clickhouse_client, &bars)
.await
.unwrap();
database::backfills_bars::upsert(&self.config.clickhouse_client, &backfill)
.await
.unwrap();
database::backfills::upsert(
&self.config.clickhouse_client,
&database::backfills::Table::Bars,
&backfill,
)
.await
.unwrap();
info!("Backfilled bars for {}.", symbol);
}
@@ -318,21 +301,12 @@ impl Handler for NewsHandler {
&self,
symbol: String,
) -> Result<Option<Backfill>, clickhouse::error::Error> {
database::backfills::select_latest_where_symbol(
&self.config.clickhouse_client,
&database::backfills::Table::News,
&symbol,
)
.await
database::backfills_news::select_where_symbol(&self.config.clickhouse_client, &symbol).await
}
async fn delete_backfills(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> {
database::backfills::delete_where_symbols(
&self.config.clickhouse_client,
&database::backfills::Table::News,
symbols,
)
.await
database::backfills_news::delete_where_symbols(&self.config.clickhouse_client, symbols)
.await
}
async fn delete_data(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> {
@@ -352,10 +326,10 @@ impl Handler for NewsHandler {
let mut next_page_token = None;
loop {
let Ok(message) = alpaca::api::incoming::news::get_historical(
let Ok(message) = api::incoming::news::get_historical(
&self.config,
&api::outgoing::news::News {
symbols: vec![remove_slash_from_pair(&symbol)],
symbols: vec![symbol.clone()],
start: Some(fetch_from),
end: Some(fetch_to),
limit: Some(50),
@@ -421,16 +395,12 @@ impl Handler for NewsHandler {
let backfill = (news.last().unwrap().clone(), symbol.clone()).into();
database::news::upsert_batch(&self.config.clickhouse_client, news)
database::news::upsert_batch(&self.config.clickhouse_client, &news)
.await
.unwrap();
database::backfills_news::upsert(&self.config.clickhouse_client, &backfill)
.await
.unwrap();
database::backfills::upsert(
&self.config.clickhouse_client,
&database::backfills::Table::News,
&backfill,
)
.await
.unwrap();
info!("Backfilled news for {}.", symbol);
}

View File

@@ -1,5 +1,5 @@
pub mod backfill;
pub mod websocket;
mod backfill;
mod websocket;
use super::clock;
use crate::{
@@ -8,7 +8,7 @@ use crate::{
},
create_send_await, database,
types::{alpaca, Asset, Class},
utils::{backoff, cleanup},
utils::backoff,
};
use futures_util::{future::join_all, StreamExt};
use itertools::{Either, Itertools};
@@ -128,6 +128,7 @@ async fn init_thread(
}
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_lines)]
async fn handle_message(
config: Arc<Config>,
bars_us_equity_websocket_sender: mpsc::Sender<websocket::Message>,
@@ -216,20 +217,33 @@ async fn handle_message(
let assets = join_all(symbols.into_iter().map(|symbol| {
let config = config.clone();
async move {
Asset::from(
let asset_future = async {
alpaca::api::incoming::asset::get_by_symbol(
&config,
&symbol,
Some(backoff::infinite()),
)
.await
.unwrap(),
)
.unwrap()
};
let position_future = async {
alpaca::api::incoming::position::get_by_symbol(
&config,
&symbol,
Some(backoff::infinite()),
)
.await
.unwrap()
};
let (asset, position) = join!(asset_future, position_future);
Asset::from((asset, position))
}
}))
.await;
database::assets::upsert_batch(&config.clickhouse_client, assets)
database::assets::upsert_batch(&config.clickhouse_client, &assets)
.await
.unwrap();
}
@@ -249,7 +263,9 @@ async fn handle_clock_message(
bars_crypto_backfill_sender: mpsc::Sender<backfill::Message>,
news_backfill_sender: mpsc::Sender<backfill::Message>,
) {
cleanup(&config.clickhouse_client).await.unwrap();
database::cleanup_all(&config.clickhouse_client)
.await
.unwrap();
let assets = database::assets::select(&config.clickhouse_client)
.await

View File

@@ -3,7 +3,6 @@ use crate::{
config::Config,
database,
types::{alpaca::websocket, news::Prediction, Bar, Class, News},
utils::add_slash_to_pair,
};
use async_trait::async_trait;
use futures_util::{
@@ -112,9 +111,7 @@ pub async fn run(
async fn handle_message(
handler: Arc<Box<dyn Handler>>,
pending: Arc<RwLock<Pending>>,
websocket_sender: Arc<
Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>,
>,
sink: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>>,
message: Message,
) {
match message.action {
@@ -134,8 +131,7 @@ async fn handle_message(
.subscriptions
.extend(pending_subscriptions);
websocket_sender
.lock()
sink.lock()
.await
.send(tungstenite::Message::Text(
to_string(&websocket::data::outgoing::Message::Subscribe(
@@ -164,8 +160,7 @@ async fn handle_message(
.unsubscriptions
.extend(pending_unsubscriptions);
websocket_sender
.lock()
sink.lock()
.await
.send(tungstenite::Message::Text(
to_string(&websocket::data::outgoing::Message::Unsubscribe(
@@ -186,7 +181,7 @@ async fn handle_message(
async fn handle_websocket_message(
handler: Arc<Box<dyn Handler>>,
pending: Arc<RwLock<Pending>>,
sender: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>>,
sink: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>>,
message: tungstenite::Message,
) {
match message {
@@ -208,11 +203,10 @@ async fn handle_websocket_message(
error!("Failed to deserialize websocket message: {:?}", message);
}
}
tungstenite::Message::Ping(_) => {
sender
.lock()
tungstenite::Message::Ping(payload) => {
sink.lock()
.await
.send(tungstenite::Message::Pong(vec![]))
.send(tungstenite::Message::Pong(payload))
.await
.unwrap();
}
@@ -358,11 +352,6 @@ impl Handler for NewsHandler {
unreachable!()
};
let symbols = symbols
.into_iter()
.map(|symbol| add_slash_to_pair(&symbol))
.collect::<Vec<_>>();
let mut pending = pending.write().await;
let newly_subscribed = pending