Improve borrow handling

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-09-02 19:56:47 +03:00
parent 203b028d7c
commit 4c98ed715d
9 changed files with 26 additions and 18 deletions

View File

@@ -68,7 +68,7 @@ pub async fn run_data_live(
_ => panic!(),
}
let symbols = get_assets_with_class(&app_config.postgres_pool, class.clone())
let symbols = get_assets_with_class(&app_config.postgres_pool, &class)
.await?
.into_iter()
.map(|asset| asset.symbol)
@@ -88,17 +88,18 @@ pub async fn run_data_live(
spawn(broadcast_handler(
class,
asset_broadcast_receiver,
sink.clone(),
asset_broadcast_receiver,
));
websocket_handler(app_config, sink, stream).await?;
websocket_handler(app_config, class, sink, stream).await?;
unreachable!()
}
pub async fn websocket_handler(
app_config: Arc<AppConfig>,
class: Class,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
stream: Arc<RwLock<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
@@ -111,12 +112,15 @@ pub async fn websocket_handler(
for message in parsed_data {
match message {
IncomingMessage::Subscription(subscription_message) => {
info!("Current subscriptions: {:?}", subscription_message.bars);
info!(
"Current {:?} subscriptions: {:?}",
class, subscription_message.bars
);
}
IncomingMessage::Bars(bar_message)
| IncomingMessage::UpdatedBars(bar_message) => {
debug!("Incoming bar: {:?}", bar_message);
add_bar(&app_config.postgres_pool, Bar::from(bar_message)).await?;
add_bar(&app_config.postgres_pool, &Bar::from(bar_message)).await?;
}
message => {
warn!("Unhandled incoming message: {:?}", message);
@@ -141,8 +145,8 @@ pub async fn websocket_handler(
pub async fn broadcast_handler(
class: Class,
mut asset_broadcast_receiver: Receiver<AssetBroadcastMessage>,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
mut asset_broadcast_receiver: Receiver<AssetBroadcastMessage>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
loop {
match asset_broadcast_receiver.recv().await? {

View File

@@ -16,11 +16,11 @@ pub async fn get_assets(
pub async fn get_assets_with_class(
postgres_pool: &PgPool,
class: Class,
class: &Class,
) -> Result<Vec<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = $1::CLASS"#, class as Class
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = $1::CLASS"#, &class as &Class
)
.fetch_all(postgres_pool)
.await
@@ -42,7 +42,7 @@ pub async fn get_asset(
pub async fn add_asset(
postgres_pool: &PgPool,
asset: Asset,
asset: &Asset,
) -> Result<Asset, Box<dyn Error + Send + Sync>> {
query_as!(
Asset,
@@ -58,7 +58,7 @@ pub async fn add_asset(
pub async fn update_asset_trading(
postgres_pool: &PgPool,
symbol: &str,
trading: bool,
trading: &bool,
) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(
Asset,

View File

@@ -4,7 +4,7 @@ use std::error::Error;
pub async fn add_bar(
postgres_pool: &PgPool,
bar: Bar,
bar: &Bar,
) -> Result<Bar, Box<dyn Error + Send + Sync>> {
query_as!(
Bar,

View File

@@ -40,7 +40,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
threads.push(spawn(run_api(app_config.clone(), asset_broadcast_sender)));
for thread in threads {
let _ = thread.await?;
thread.await??;
}
Ok(())

View File

@@ -83,7 +83,7 @@ pub async fn add_asset(
asset.trading = trading;
}
let asset = database::assets::add_asset(&app_config.postgres_pool, asset)
database::assets::add_asset(&app_config.postgres_pool, &asset)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -107,7 +107,7 @@ pub async fn update_asset(
Path(symbol): Path<String>,
Json(request): Json<UpdateAssetRequest>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> {
let asset = update_asset_trading(&app_config.postgres_pool, &symbol, request.trading)
let asset = update_asset_trading(&app_config.postgres_pool, &symbol, &request.trading)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

View File

@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use sqlx::Type;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Type)]
pub enum Class {
#[sqlx(rename = "us_equity")]
#[serde(rename = "us_equity")]

View File

@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use sqlx::Type;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Type)]
pub enum Exchange {
#[sqlx(rename = "AMEX")]
#[serde(rename = "AMEX")]

View File

@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use sqlx::Type;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Type)]
pub enum Status {
#[sqlx(rename = "active")]
#[serde(rename = "active")]

View File

@@ -31,12 +31,16 @@ pub struct SuccessMessage {
pub struct SubscriptionMessage {
pub trades: Vec<String>,
pub quotes: Vec<String>,
pub orderbooks: Vec<String>,
pub bars: Vec<String>,
#[serde(rename = "updatedBars")]
pub updated_bars: Vec<String>,
#[serde(rename = "dailyBars")]
pub daily_bars: Vec<String>,
pub orderbooks: Option<Vec<String>>,
pub statuses: Option<Vec<String>>,
pub lulds: Option<Vec<String>>,
#[serde(rename = "cancelErrors")]
pub cancel_errors: Option<Vec<String>>,
}
#[derive(Debug, PartialEq, Deserialize)]