Add order types

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-02-12 16:45:11 +00:00
parent dee21d5324
commit 5961717520
47 changed files with 840 additions and 145 deletions

View File

@@ -6,7 +6,7 @@ use crate::{
alpaca::{
self,
api::{self, outgoing::Sort},
Source,
shared::Source,
},
news::Prediction,
Backfill, Bar, Class, News,
@@ -248,7 +248,7 @@ impl Handler for BarHandler {
async fn backfill(&self, symbol: String, fetch_from: OffsetDateTime, fetch_to: OffsetDateTime) {
info!("Backfilling bars for {}.", symbol);
let mut bars = Vec::new();
let mut bars = vec![];
let mut next_page_token = None;
loop {
@@ -348,7 +348,7 @@ impl Handler for NewsHandler {
async fn backfill(&self, symbol: String, fetch_from: OffsetDateTime, fetch_to: OffsetDateTime) {
info!("Backfilling news for {}.", symbol);
let mut news = Vec::new();
let mut news = vec![];
let mut next_page_token = None;
loop {

View File

@@ -8,7 +8,7 @@ use crate::{
},
create_send_await, database,
types::{alpaca, Asset, Class},
utils::{authenticate, backoff, cleanup},
utils::{backoff, cleanup},
};
use futures_util::{future::join_all, StreamExt};
use itertools::{Either, Itertools};
@@ -19,7 +19,7 @@ use tokio::{
};
use tokio_tungstenite::connect_async;
#[derive(Clone)]
#[derive(Clone, Copy)]
pub enum Action {
Add,
Remove,
@@ -45,7 +45,7 @@ impl Message {
}
}
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy)]
pub enum ThreadType {
Bars(Class),
News,
@@ -107,7 +107,8 @@ async fn init_thread(
let (websocket, _) = connect_async(websocket_url).await.unwrap();
let (mut websocket_sink, mut websocket_stream) = websocket.split();
authenticate(&config, &mut websocket_sink, &mut websocket_stream).await;
alpaca::websocket::data::authenticate(&config, &mut websocket_sink, &mut websocket_stream)
.await;
let (backfill_sender, backfill_receiver) = mpsc::channel(100);
spawn(backfill::run(
@@ -160,14 +161,14 @@ async fn handle_message(
create_send_await!(
bars_us_equity_websocket_sender,
websocket::Message::new,
message.action.clone().into(),
message.action.into(),
us_equity_symbols.clone()
);
create_send_await!(
bars_us_equity_backfill_sender,
backfill::Message::new,
message.action.clone().into(),
message.action.into(),
us_equity_symbols
);
};
@@ -180,14 +181,14 @@ async fn handle_message(
create_send_await!(
bars_crypto_websocket_sender,
websocket::Message::new,
message.action.clone().into(),
message.action.into(),
crypto_symbols.clone()
);
create_send_await!(
bars_crypto_backfill_sender,
backfill::Message::new,
message.action.clone().into(),
message.action.into(),
crypto_symbols
);
};
@@ -196,14 +197,14 @@ async fn handle_message(
create_send_await!(
news_websocket_sender,
websocket::Message::new,
message.action.clone().into(),
message.action.into(),
symbols.clone()
);
create_send_await!(
news_backfill_sender,
backfill::Message::new,
message.action.clone().into(),
message.action.into(),
symbols.clone()
);
};

View File

@@ -66,11 +66,11 @@ pub trait Handler: Send + Sync {
fn create_subscription_message(
&self,
symbols: Vec<String>,
) -> websocket::outgoing::subscribe::Message;
) -> websocket::data::outgoing::subscribe::Message;
async fn handle_parsed_websocket_message(
&self,
pending: Arc<RwLock<Pending>>,
message: websocket::incoming::Message,
message: websocket::data::incoming::Message,
);
}
@@ -138,7 +138,7 @@ async fn handle_message(
.lock()
.await
.send(tungstenite::Message::Text(
to_string(&websocket::outgoing::Message::Subscribe(
to_string(&websocket::data::outgoing::Message::Subscribe(
handler.create_subscription_message(message.symbols),
))
.unwrap(),
@@ -168,7 +168,7 @@ async fn handle_message(
.lock()
.await
.send(tungstenite::Message::Text(
to_string(&websocket::outgoing::Message::Unsubscribe(
to_string(&websocket::data::outgoing::Message::Unsubscribe(
handler.create_subscription_message(message.symbols.clone()),
))
.unwrap(),
@@ -191,7 +191,7 @@ async fn handle_websocket_message(
) {
match message {
tungstenite::Message::Text(message) => {
let message = from_str::<Vec<websocket::incoming::Message>>(&message);
let message = from_str::<Vec<websocket::data::incoming::Message>>(&message);
if let Ok(message) = message {
for message in message {
@@ -222,7 +222,8 @@ async fn handle_websocket_message(
struct BarsHandler {
config: Arc<Config>,
subscription_message_constructor: fn(Vec<String>) -> websocket::outgoing::subscribe::Message,
subscription_message_constructor:
fn(Vec<String>) -> websocket::data::outgoing::subscribe::Message,
}
#[async_trait]
@@ -230,19 +231,20 @@ impl Handler for BarsHandler {
fn create_subscription_message(
&self,
symbols: Vec<String>,
) -> websocket::outgoing::subscribe::Message {
) -> websocket::data::outgoing::subscribe::Message {
(self.subscription_message_constructor)(symbols)
}
async fn handle_parsed_websocket_message(
&self,
pending: Arc<RwLock<Pending>>,
message: websocket::incoming::Message,
message: websocket::data::incoming::Message,
) {
match message {
websocket::incoming::Message::Subscription(message) => {
let websocket::incoming::subscription::Message::Market { bars: symbols, .. } =
message
websocket::data::incoming::Message::Subscription(message) => {
let websocket::data::incoming::subscription::Message::Market {
bars: symbols, ..
} = message
else {
unreachable!()
};
@@ -283,8 +285,8 @@ impl Handler for BarsHandler {
}
}
}
websocket::incoming::Message::Bar(message)
| websocket::incoming::Message::UpdatedBar(message) => {
websocket::data::incoming::Message::Bar(message)
| websocket::data::incoming::Message::UpdatedBar(message) => {
let bar = Bar::from(message);
debug!("Received bar for {}: {}.", bar.symbol, bar.time);
@@ -292,15 +294,15 @@ impl Handler for BarsHandler {
.await
.unwrap();
}
websocket::incoming::Message::Status(message) => {
websocket::data::incoming::Message::Status(message) => {
debug!(
"Received status message for {}: {}.",
message.symbol, message.status_message
);
match message.status {
websocket::incoming::status::Status::TradingHalt
| websocket::incoming::status::Status::VolatilityTradingPause => {
websocket::data::incoming::status::Status::TradingHalt
| websocket::data::incoming::status::Status::VolatilityTradingPause => {
database::assets::update_status_where_symbol(
&self.config.clickhouse_client,
&message.symbol,
@@ -309,8 +311,8 @@ impl Handler for BarsHandler {
.await
.unwrap();
}
websocket::incoming::status::Status::Resume
| websocket::incoming::status::Status::TradingResumption => {
websocket::data::incoming::status::Status::Resume
| websocket::data::incoming::status::Status::TradingResumption => {
database::assets::update_status_where_symbol(
&self.config.clickhouse_client,
&message.symbol,
@@ -322,7 +324,7 @@ impl Handler for BarsHandler {
_ => {}
}
}
websocket::incoming::Message::Error(message) => {
websocket::data::incoming::Message::Error(message) => {
error!("Received error message: {}.", message.message);
}
_ => unreachable!(),
@@ -339,18 +341,19 @@ impl Handler for NewsHandler {
fn create_subscription_message(
&self,
symbols: Vec<String>,
) -> websocket::outgoing::subscribe::Message {
websocket::outgoing::subscribe::Message::new_news(symbols)
) -> websocket::data::outgoing::subscribe::Message {
websocket::data::outgoing::subscribe::Message::new_news(symbols)
}
async fn handle_parsed_websocket_message(
&self,
pending: Arc<RwLock<Pending>>,
message: websocket::incoming::Message,
message: websocket::data::incoming::Message,
) {
match message {
websocket::incoming::Message::Subscription(message) => {
let websocket::incoming::subscription::Message::News { news: symbols } = message
websocket::data::incoming::Message::Subscription(message) => {
let websocket::data::incoming::subscription::Message::News { news: symbols } =
message
else {
unreachable!()
};
@@ -396,7 +399,7 @@ impl Handler for NewsHandler {
}
}
}
websocket::incoming::Message::News(message) => {
websocket::data::incoming::Message::News(message) => {
let news = News::from(message);
debug!(
@@ -426,7 +429,7 @@ impl Handler for NewsHandler {
.await
.unwrap();
}
websocket::incoming::Message::Error(message) => {
websocket::data::incoming::Message::Error(message) => {
error!("Received error message: {}.", message.message);
}
_ => unreachable!(),
@@ -439,12 +442,12 @@ pub fn create_handler(thread_type: ThreadType, config: Arc<Config>) -> Box<dyn H
ThreadType::Bars(Class::UsEquity) => Box::new(BarsHandler {
config,
subscription_message_constructor:
websocket::outgoing::subscribe::Message::new_market_us_equity,
websocket::data::outgoing::subscribe::Message::new_market_us_equity,
}),
ThreadType::Bars(Class::Crypto) => Box::new(BarsHandler {
config,
subscription_message_constructor:
websocket::outgoing::subscribe::Message::new_market_crypto,
websocket::data::outgoing::subscribe::Message::new_market_crypto,
}),
ThreadType::News => Box::new(NewsHandler { config }),
}