Remove manual pongs

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-02-15 14:17:47 +00:00
parent cdaa2d20a9
commit a1781cdf29
3 changed files with 46 additions and 83 deletions

View File

@@ -66,7 +66,7 @@ pub trait Handler: Send + Sync {
&self, &self,
symbols: Vec<String>, symbols: Vec<String>,
) -> websocket::data::outgoing::subscribe::Message; ) -> websocket::data::outgoing::subscribe::Message;
async fn handle_parsed_websocket_message( async fn handle_websocket_message(
&self, &self,
pending: Arc<RwLock<Pending>>, pending: Arc<RwLock<Pending>>,
message: websocket::data::incoming::Message, message: websocket::data::incoming::Message,
@@ -96,12 +96,26 @@ pub async fn run(
)); ));
} }
Some(Ok(message)) = websocket_stream.next() => { Some(Ok(message)) = websocket_stream.next() => {
spawn(handle_websocket_message( match message {
handler.clone(), tungstenite::Message::Text(message) => {
pending.clone(), let parsed_message = from_str::<Vec<websocket::data::incoming::Message>>(&message);
websocket_sink.clone(),
message, if parsed_message.is_err() {
)); error!("Failed to deserialize websocket message: {:?}", message);
continue;
}
for message in parsed_message.unwrap() {
let handler = handler.clone();
let pending = pending.clone();
spawn(async move {
handler.handle_websocket_message(pending, message).await;
});
}
}
tungstenite::Message::Ping(_) => {}
_ => error!("Unexpected websocket message: {:?}", message),
}
} }
else => panic!("Communication channel unexpectedly closed.") else => panic!("Communication channel unexpectedly closed.")
} }
@@ -179,40 +193,6 @@ async fn handle_message(
message.response.send(()).unwrap(); message.response.send(()).unwrap();
} }
async fn handle_websocket_message(
handler: Arc<Box<dyn Handler>>,
pending: Arc<RwLock<Pending>>,
sink: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>>,
message: tungstenite::Message,
) {
match message {
tungstenite::Message::Text(message) => {
if let Ok(message) = from_str::<Vec<websocket::data::incoming::Message>>(&message) {
for message in message {
let handler = handler.clone();
let pending = pending.clone();
spawn(async move {
handler
.handle_parsed_websocket_message(pending, message)
.await;
});
}
} else {
error!("Failed to deserialize websocket message: {:?}", message);
}
}
tungstenite::Message::Ping(payload) => {
sink.lock()
.await
.send(tungstenite::Message::Pong(payload))
.await
.unwrap();
}
_ => error!("Unexpected websocket message: {:?}", message),
}
}
struct BarsHandler { struct BarsHandler {
config: Arc<Config>, config: Arc<Config>,
subscription_message_constructor: subscription_message_constructor:
@@ -228,7 +208,7 @@ impl Handler for BarsHandler {
(self.subscription_message_constructor)(symbols) (self.subscription_message_constructor)(symbols)
} }
async fn handle_parsed_websocket_message( async fn handle_websocket_message(
&self, &self,
pending: Arc<RwLock<Pending>>, pending: Arc<RwLock<Pending>>,
message: websocket::data::incoming::Message, message: websocket::data::incoming::Message,
@@ -338,7 +318,7 @@ impl Handler for NewsHandler {
websocket::data::outgoing::subscribe::Message::new_news(symbols) websocket::data::outgoing::subscribe::Message::new_news(symbols)
} }
async fn handle_parsed_websocket_message( async fn handle_websocket_message(
&self, &self,
pending: Arc<RwLock<Pending>>, pending: Arc<RwLock<Pending>>,
message: websocket::data::incoming::Message, message: websocket::data::incoming::Message,

View File

@@ -16,5 +16,5 @@ pub async fn run(config: Arc<Config>) {
alpaca::websocket::trading::authenticate(&mut websocket_sink, &mut websocket_stream).await; alpaca::websocket::trading::authenticate(&mut websocket_sink, &mut websocket_stream).await;
alpaca::websocket::trading::subscribe(&mut websocket_sink, &mut websocket_stream).await; alpaca::websocket::trading::subscribe(&mut websocket_sink, &mut websocket_stream).await;
spawn(websocket::run(config, websocket_stream, websocket_sink)); spawn(websocket::run(config, websocket_stream));
} }

View File

@@ -3,60 +3,43 @@ use crate::{
database, database,
types::{alpaca::websocket, Order}, types::{alpaca::websocket, Order},
}; };
use futures_util::{ use futures_util::{stream::SplitStream, StreamExt};
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use log::{debug, error}; use log::{debug, error};
use serde_json::from_str; use serde_json::from_str;
use std::sync::Arc; use std::sync::Arc;
use tokio::{net::TcpStream, spawn, sync::Mutex}; use tokio::{net::TcpStream, spawn};
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
pub async fn run( pub async fn run(
config: Arc<Config>, config: Arc<Config>,
mut websocket_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, mut websocket_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
websocket_sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
) { ) {
let websocket_sink = Arc::new(Mutex::new(websocket_sink));
loop { loop {
let message = websocket_stream.next().await.unwrap().unwrap(); let message = websocket_stream.next().await.unwrap().unwrap();
match message {
tungstenite::Message::Binary(message) => {
let parsed_message = from_str::<websocket::trading::incoming::Message>(
&String::from_utf8_lossy(&message),
);
if parsed_message.is_err() {
error!("Failed to deserialize websocket message: {:?}", message);
continue;
}
spawn(handle_websocket_message( spawn(handle_websocket_message(
config.clone(), config.clone(),
websocket_sink.clone(), parsed_message.unwrap(),
message,
)); ));
} }
tungstenite::Message::Ping(_) => {}
_ => error!("Unexpected websocket message: {:?}", message),
}
}
} }
async fn handle_websocket_message( async fn handle_websocket_message(
config: Arc<Config>,
sink: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>>,
message: tungstenite::Message,
) {
match message {
tungstenite::Message::Binary(message) => {
if let Ok(message) = from_str::<websocket::trading::incoming::Message>(
&String::from_utf8_lossy(&message),
) {
handle_parsed_websocket_message(config, message).await;
} else {
error!("Failed to deserialize websocket message: {:?}", message);
}
}
tungstenite::Message::Ping(payload) => {
sink.lock()
.await
.send(tungstenite::Message::Pong(payload))
.await
.unwrap();
}
_ => error!("Unexpected websocket message: {:?}", message),
}
}
async fn handle_parsed_websocket_message(
config: Arc<Config>, config: Arc<Config>,
message: websocket::trading::incoming::Message, message: websocket::trading::incoming::Message,
) { ) {