Organize websocket codebase

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-01-14 10:29:32 +00:00
parent 687fbb909f
commit 63a9ca950f
18 changed files with 138 additions and 128 deletions

View File

@@ -3,6 +3,7 @@ use crate::{
Config, ALPACA_CRYPTO_DATA_URL, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_DATA_URL,
ALPACA_STOCK_WEBSOCKET_URL, ALPACA_TIMESTAMP_FORMAT,
},
data::authenticate_websocket,
database,
time::{duration_until, last_minute, next_30s, next_minute, ONE_MINUTE, THIRTY_SECONDS},
types::{
@@ -19,7 +20,7 @@ use futures_util::{
use http::StatusCode;
use indexmap::IndexMap;
use log::{error, info, warn};
use serde_json::{from_str, to_string};
use serde_json::from_str;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
@@ -83,50 +84,6 @@ pub async fn run(
unreachable!()
}
async fn authenticate_websocket(
app_config: &Arc<Config>,
stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
sink: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
match stream.next().await {
Some(Ok(Message::Text(data)))
if from_str::<Vec<websocket::incoming::Message>>(&data)
.unwrap()
.get(0)
== Some(&websocket::incoming::Message::Success(
websocket::incoming::success::Message {
msg: websocket::incoming::success::MessageType::Connected,
},
)) => {}
_ => panic!(),
}
sink.send(Message::Text(
to_string(&websocket::outgoing::Message::Auth(
websocket::outgoing::auth::Message::new(
app_config.alpaca_api_key.clone(),
app_config.alpaca_api_secret.clone(),
),
))
.unwrap(),
))
.await
.unwrap();
match stream.next().await {
Some(Ok(Message::Text(data)))
if from_str::<Vec<websocket::incoming::Message>>(&data)
.unwrap()
.get(0)
== Some(&websocket::incoming::Message::Success(
websocket::incoming::success::Message {
msg: websocket::incoming::success::MessageType::Authenticated,
},
)) => {}
_ => panic!(),
};
}
async fn websocket_broadcast_handler(
class: Class,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
@@ -140,8 +97,10 @@ async fn websocket_broadcast_handler(
sink.write()
.await
.send(Message::Text(
serde_json::to_string(&websocket::outgoing::Message::Subscribe(
websocket::outgoing::subscribe::Message::new(asset.clone().symbol),
serde_json::to_string(&websocket::data::outgoing::Message::Subscribe(
websocket::data::outgoing::subscribe::Message::new(
asset.clone().symbol,
),
))
.unwrap(),
))
@@ -154,8 +113,10 @@ async fn websocket_broadcast_handler(
sink.write()
.await
.send(Message::Text(
serde_json::to_string(&websocket::outgoing::Message::Unsubscribe(
websocket::outgoing::subscribe::Message::new(asset.clone().symbol),
serde_json::to_string(&websocket::data::outgoing::Message::Unsubscribe(
websocket::data::outgoing::subscribe::Message::new(
asset.clone().symbol,
),
))
.unwrap(),
))
@@ -177,9 +138,12 @@ async fn websocket_message_handler(
loop {
match stream.next().await {
Some(Ok(Message::Text(data))) => {
let parsed_data = from_str::<Vec<websocket::incoming::Message>>(&data);
let parsed_data = from_str::<Vec<websocket::data::incoming::Message>>(&data);
if let Err(e) = &parsed_data {
warn!("Unparsed websocket::incoming message: {:?}: {}", data, e);
warn!(
"Unparsed websocket::data::incoming message: {:?}: {}",
data, e
);
}
for message in parsed_data.unwrap_or_default() {
@@ -192,7 +156,7 @@ async fn websocket_message_handler(
.send(Message::Pong(vec![]))
.await
.unwrap(),
Some(unknown) => error!("Unknown websocket::incoming message: {:?}", unknown),
Some(unknown) => error!("Unknown websocket::data::incoming message: {:?}", unknown),
None => panic!(),
}
}
@@ -201,11 +165,11 @@ async fn websocket_message_handler(
async fn websocket_handle_text_message(
app_config: &Arc<Config>,
class: Class,
message: websocket::incoming::Message,
message: websocket::data::incoming::Message,
backfilled: &Arc<RwLock<HashMap<String, bool>>>,
) {
match message {
websocket::incoming::Message::Subscription(subscription_message) => {
websocket::data::incoming::Message::Subscription(subscription_message) => {
let old_assets = backfilled
.read()
.await
@@ -243,7 +207,7 @@ async fn websocket_handle_text_message(
class, added_assets, deleted_assets
);
}
websocket::incoming::Message::Bars(bar_message) => {
websocket::data::incoming::Message::Bars(bar_message) => {
let bar = Bar::from(bar_message);
info!(
"websocket::Incoming bar for {}: {}",
@@ -256,7 +220,7 @@ async fn websocket_handle_text_message(
)
.await;
}
websocket::incoming::Message::UpdatedBars(bar_message) => {
websocket::data::incoming::Message::UpdatedBars(bar_message) => {
let bar = Bar::from(bar_message);
info!(
"websocket::Incoming bar for {}: {}",
@@ -276,7 +240,7 @@ async fn websocket_handle_text_message(
}
transaction.commit().await.unwrap();
}
websocket::incoming::Message::Success(_) => {}
_ => {}
}
}