From 4fbd7f0e6d80413acecb203bcea5099701ae7f02 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Thu, 31 Aug 2023 09:33:56 +0300 Subject: [PATCH] Add live data threads Signed-off-by: Nikolaos Karaolidis --- ...a36045ebe6a2d19d925bc490f606ff01b440.json} | 4 +- ...2fff68b4bf970c1508ce7038004d6404d7f4e.json | 71 ++++++++++ ...eae7f3eaa3147ac5b5e616471ea293cb6469.json} | 4 +- ...44071e3750f6d9ddee8cdc2e29f3f207e2f2.json} | 4 +- ...d864b5b1087c2d07f385a309a7b0fcb4c9c6d.json | 64 +++++++++ ...481327371c97175939a58de8cf54f72fa57ad.json | 64 +++++++++ ...2b0d96742546d15943c85c594187139516d0b.json | 71 ++++++++++ backend/src/data/live/crypto.rs | 57 ++++++++ backend/src/data/live/mod.rs | 133 ++++++++++++++++++ backend/src/data/live/stocks.rs | 46 ++++++ backend/src/data/mod.rs | 1 + backend/src/database/assets.rs | 57 +++++++- backend/src/database/bars.rs | 53 +++++++ backend/src/database/mod.rs | 1 + backend/src/main.rs | 47 ++++++- backend/src/routes/assets.rs | 59 +++++++- backend/src/routes/mod.rs | 14 +- backend/src/types.rs | 2 +- support/timescaledb/999_init.sh | 5 +- 19 files changed, 729 insertions(+), 28 deletions(-) rename backend/.sqlx/{query-edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c.json => query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json} (84%) create mode 100644 backend/.sqlx/query-826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e.json rename backend/.sqlx/{query-f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c.json => query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json} (88%) rename backend/.sqlx/{query-053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8.json => query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json} (83%) create mode 100644 backend/.sqlx/query-e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d.json create mode 100644 backend/.sqlx/query-e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad.json create mode 100644 backend/.sqlx/query-f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b.json create mode 100644 backend/src/data/live/crypto.rs create mode 100644 backend/src/data/live/mod.rs create mode 100644 backend/src/data/live/stocks.rs create mode 100644 backend/src/data/mod.rs create mode 100644 backend/src/database/bars.rs diff --git a/backend/.sqlx/query-edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c.json b/backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json similarity index 84% rename from backend/.sqlx/query-edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c.json rename to backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json index be21401..ef34ae4 100644 --- a/backend/.sqlx/query-edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c.json +++ b/backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "DELETE FROM assets WHERE symbol = $1 RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "query": "DELETE FROM assets WHERE symbol = $1\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "describe": { "columns": [ { @@ -69,5 +69,5 @@ false ] }, - "hash": "edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c" + "hash": "515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440" } diff --git a/backend/.sqlx/query-826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e.json b/backend/.sqlx/query-826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e.json new file mode 100644 index 0000000..4e8f3f3 --- /dev/null +++ b/backend/.sqlx/query-826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e.json @@ -0,0 +1,71 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = 'crypto'", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "symbol", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "class: Class", + "type_info": { + "Custom": { + "name": "class", + "kind": { + "Enum": [ + "us_equity", + "crypto", + "unknown" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "exchange: Exchange", + "type_info": { + "Custom": { + "name": "exchange", + "kind": { + "Enum": [ + "AMEX", + "ARCA", + "BATS", + "NASDAQ", + "NYSE", + "NYSEARCA", + "OTC", + "unknown" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "trading", + "type_info": "Bool" + }, + { + "ordinal": 4, + "name": "date_added", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e" +} diff --git a/backend/.sqlx/query-f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c.json b/backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json similarity index 88% rename from backend/.sqlx/query-f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c.json rename to backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json index dab780c..5195d90 100644 --- a/backend/.sqlx/query-f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c.json +++ b/backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5) RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "query": "INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5)\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "describe": { "columns": [ { @@ -100,5 +100,5 @@ false ] }, - "hash": "f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c" + "hash": "987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469" } diff --git a/backend/.sqlx/query-053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8.json b/backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json similarity index 83% rename from backend/.sqlx/query-053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8.json rename to backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json index 11e3884..174372c 100644 --- a/backend/.sqlx/query-053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8.json +++ b/backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "query": "UPDATE assets SET trading = $1 WHERE symbol = $2\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "describe": { "columns": [ { @@ -70,5 +70,5 @@ false ] }, - "hash": "053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8" + "hash": "cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2" } diff --git a/backend/.sqlx/query-e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d.json b/backend/.sqlx/query-e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d.json new file mode 100644 index 0000000..7d233f0 --- /dev/null +++ b/backend/.sqlx/query-e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d.json @@ -0,0 +1,64 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume)\n SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[])\n RETURNING timestamp, asset_symbol, open, high, low, close, volume", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "timestamp", + "type_info": "Timestamptz" + }, + { + "ordinal": 1, + "name": "asset_symbol", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "open", + "type_info": "Float8" + }, + { + "ordinal": 3, + "name": "high", + "type_info": "Float8" + }, + { + "ordinal": 4, + "name": "low", + "type_info": "Float8" + }, + { + "ordinal": 5, + "name": "close", + "type_info": "Float8" + }, + { + "ordinal": 6, + "name": "volume", + "type_info": "Float8" + } + ], + "parameters": { + "Left": [ + "TimestamptzArray", + "TextArray", + "Float8Array", + "Float8Array", + "Float8Array", + "Float8Array", + "Float8Array" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d" +} diff --git a/backend/.sqlx/query-e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad.json b/backend/.sqlx/query-e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad.json new file mode 100644 index 0000000..67eb4b4 --- /dev/null +++ b/backend/.sqlx/query-e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad.json @@ -0,0 +1,64 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume) VALUES ($1, $2, $3, $4, $5, $6, $7)\n RETURNING timestamp, asset_symbol, open, high, low, close, volume", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "timestamp", + "type_info": "Timestamptz" + }, + { + "ordinal": 1, + "name": "asset_symbol", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "open", + "type_info": "Float8" + }, + { + "ordinal": 3, + "name": "high", + "type_info": "Float8" + }, + { + "ordinal": 4, + "name": "low", + "type_info": "Float8" + }, + { + "ordinal": 5, + "name": "close", + "type_info": "Float8" + }, + { + "ordinal": 6, + "name": "volume", + "type_info": "Float8" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Text", + "Float8", + "Float8", + "Float8", + "Float8", + "Float8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad" +} diff --git a/backend/.sqlx/query-f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b.json b/backend/.sqlx/query-f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b.json new file mode 100644 index 0000000..3947cbb --- /dev/null +++ b/backend/.sqlx/query-f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b.json @@ -0,0 +1,71 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = 'us_equity'", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "symbol", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "class: Class", + "type_info": { + "Custom": { + "name": "class", + "kind": { + "Enum": [ + "us_equity", + "crypto", + "unknown" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "exchange: Exchange", + "type_info": { + "Custom": { + "name": "exchange", + "kind": { + "Enum": [ + "AMEX", + "ARCA", + "BATS", + "NASDAQ", + "NYSE", + "NYSEARCA", + "OTC", + "unknown" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "trading", + "type_info": "Bool" + }, + { + "ordinal": 4, + "name": "date_added", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b" +} diff --git a/backend/src/data/live/crypto.rs b/backend/src/data/live/crypto.rs new file mode 100644 index 0000000..1e324f9 --- /dev/null +++ b/backend/src/data/live/crypto.rs @@ -0,0 +1,57 @@ +use super::{AssetMPSC, StockStreamSubscription}; +use crate::{ + database::assets::get_assets_crypto, + pool::{alpaca::create_alpaca_client_from_env, postgres::PostgresPool}, +}; +use apca::data::v2::stream::{ + drive, Bar, CustomUrl, MarketData, Quote, RealtimeData, SymbolList, Symbols, Trade, IEX, +}; +use futures_util::FutureExt; +use std::{error::Error, sync::Arc}; +use tokio::sync::{mpsc, Mutex}; + +#[derive(Default)] +pub struct CryptoUrl; + +impl ToString for CryptoUrl { + fn to_string(&self) -> String { + "wss://stream.data.alpaca.markets/v1beta3/crypto/us".into() + } +} + +pub type Crypto = CustomUrl; + +pub async fn init_stream_subscription_mpsc( + postgres_pool: PostgresPool, +) -> Result<(Arc>>, AssetMPSC), Box> { + let client = create_alpaca_client_from_env().await?; + + let (mut stream, mut subscription) = client + .subscribe::>() + .await?; + + let symbols = get_assets_crypto(&postgres_pool) + .await? + .iter() + .map(|asset| asset.symbol.clone()) + .collect::>(); + + if !symbols.is_empty() { + let data = MarketData { + bars: Symbols::List(SymbolList::from(symbols)), + ..Default::default() + }; + + drive(subscription.subscribe(&data).boxed(), &mut stream) + .await + .unwrap() + .unwrap() + .unwrap(); + } + + let stream_subscription_mutex = Arc::new(Mutex::new((stream, subscription))); + let (sender, receiver) = mpsc::channel(50); + let asset_mpcs = AssetMPSC { sender, receiver }; + + Ok((stream_subscription_mutex, asset_mpcs)) +} diff --git a/backend/src/data/live/mod.rs b/backend/src/data/live/mod.rs new file mode 100644 index 0000000..4958c39 --- /dev/null +++ b/backend/src/data/live/mod.rs @@ -0,0 +1,133 @@ +pub mod crypto; +pub mod stocks; + +use crate::{database::bars::add_bar, pool::postgres::PostgresPool, types::Asset}; +use apca::{ + data::v2::stream::{drive, Data, MarketData, RealtimeData, Source, SymbolList, Symbols}, + Subscribable, +}; +use futures_util::{FutureExt, StreamExt}; +use log::{debug, error, info, warn}; +use std::{any::type_name, error::Error, sync::Arc, time::Duration}; +use time::OffsetDateTime; +use tokio::{ + spawn, + sync::{ + mpsc::{Receiver, Sender}, + Mutex, + }, + time::timeout, +}; + +pub enum AssetMPSCMessage { + Added(Asset), + Removed(Asset), +} + +pub struct AssetMPSC { + pub sender: Sender, + pub receiver: Receiver, +} + +pub type StockStreamSubscription = ( + as Subscribable>::Stream, + as Subscribable>::Subscription, +); + +pub const TIMEOUT_DURATION: Duration = Duration::from_millis(100); + +pub async fn run_data_live( + postgres_pool: PostgresPool, + stream_subscription_mutex: Arc>>, + asset_mpsc_receiver: Receiver, +) -> Result<(), Box> +where + S: Source + 'static, +{ + info!("Running live data thread for {}.", type_name::()); + + spawn(mpsc_handler::( + stream_subscription_mutex.clone(), + asset_mpsc_receiver, + )); + + loop { + let (stream, _) = &mut *stream_subscription_mutex.lock().await; + match timeout(TIMEOUT_DURATION, stream.next()).await { + Ok(Some(Ok(Ok(Data::Bar(bar))))) => { + let bar = add_bar( + &postgres_pool, + crate::types::Bar { + timestamp: match OffsetDateTime::from_unix_timestamp( + bar.timestamp.timestamp(), + ) { + Ok(timestamp) => timestamp, + Err(_) => { + warn!( + "Failed to parse timestamp for {}: {}.", + bar.symbol, bar.timestamp + ); + continue; + } + }, + asset_symbol: bar.symbol, + open: bar.open_price.to_f64().unwrap_or_default(), + high: bar.high_price.to_f64().unwrap_or_default(), + low: bar.low_price.to_f64().unwrap_or_default(), + close: bar.close_price.to_f64().unwrap_or_default(), + volume: bar.volume.to_f64().unwrap_or_default(), + }, + ) + .await?; + debug!( + "Saved timestamp for {}: {}.", + bar.asset_symbol, bar.timestamp + ); + } + Ok(Some(Ok(Ok(_)))) | Ok(Some(Ok(Err(_)))) | Err(_) => continue, + _ => panic!(), + } + } +} + +pub async fn mpsc_handler( + stream_subscription_mutex: Arc>>, + mut asset_mpsc_receiver: Receiver, +) -> Result<(), Box> { + while let Some(message) = asset_mpsc_receiver.recv().await { + let (stream, subscription) = &mut *stream_subscription_mutex.lock().await; + + match message { + AssetMPSCMessage::Added(asset) => { + let data = MarketData { + bars: Symbols::List(SymbolList::from(vec![asset.symbol.clone()])), + ..Default::default() + }; + + match drive(subscription.subscribe(&data).boxed(), stream).await { + Ok(_) => info!("Successfully subscribed to {}", asset.symbol), + Err(e) => { + error!("Failed to subscribe to {}: {:?}", asset.symbol, e); + continue; + } + } + } + AssetMPSCMessage::Removed(asset) => { + let data = MarketData { + bars: Symbols::List(SymbolList::from(vec![asset.symbol.clone()])), + ..Default::default() + }; + + match drive(subscription.unsubscribe(&data).boxed(), stream).await { + Ok(_) => info!("Successfully unsubscribed from {}", asset.symbol), + Err(e) => { + error!("Failed to unsubscribe from {}: {:?}", asset.symbol, e); + continue; + } + } + } + } + } + + Ok(()) +} diff --git a/backend/src/data/live/stocks.rs b/backend/src/data/live/stocks.rs new file mode 100644 index 0000000..d4e3497 --- /dev/null +++ b/backend/src/data/live/stocks.rs @@ -0,0 +1,46 @@ +use super::{AssetMPSC, StockStreamSubscription}; +use crate::{ + database::assets::get_assets_stocks, + pool::{alpaca::create_alpaca_client_from_env, postgres::PostgresPool}, +}; +use apca::data::v2::stream::{ + drive, Bar, MarketData, Quote, RealtimeData, SymbolList, Symbols, Trade, IEX, +}; +use futures_util::FutureExt; +use std::{error::Error, sync::Arc}; +use tokio::sync::{mpsc, Mutex}; + +pub async fn init_stream_subscription_mpsc( + postgres_pool: PostgresPool, +) -> Result<(Arc>>, AssetMPSC), Box> { + let client = create_alpaca_client_from_env().await?; + + let (mut stream, mut subscription) = client + .subscribe::>() + .await?; + + let symbols = get_assets_stocks(&postgres_pool) + .await? + .iter() + .map(|asset| asset.symbol.clone()) + .collect::>(); + + if !symbols.is_empty() { + let data = MarketData { + bars: Symbols::List(SymbolList::from(symbols)), + ..Default::default() + }; + + drive(subscription.subscribe(&data).boxed(), &mut stream) + .await + .unwrap() + .unwrap() + .unwrap(); + } + + let stream_subscription_mutex = Arc::new(Mutex::new((stream, subscription))); + let (sender, receiver) = mpsc::channel(50); + let asset_mpcs = AssetMPSC { sender, receiver }; + + Ok((stream_subscription_mutex, asset_mpcs)) +} diff --git a/backend/src/data/mod.rs b/backend/src/data/mod.rs new file mode 100644 index 0000000..e95c262 --- /dev/null +++ b/backend/src/data/mod.rs @@ -0,0 +1 @@ +pub mod live; diff --git a/backend/src/database/assets.rs b/backend/src/database/assets.rs index 2798171..3da257a 100644 --- a/backend/src/database/assets.rs +++ b/backend/src/database/assets.rs @@ -8,7 +8,34 @@ use std::error::Error; pub async fn get_assets( postgres_pool: &PostgresPool, ) -> Result, Box> { - query_as!(Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"#) + query_as!( + Asset, + r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"# + ) + .fetch_all(postgres_pool) + .await + .map_err(|e| e.into()) +} + +pub async fn get_assets_stocks( + postgres_pool: &PostgresPool, +) -> Result, Box> { + query_as!( + Asset, + r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = 'us_equity'"# + ) + .fetch_all(postgres_pool) + .await + .map_err(|e| e.into()) +} + +pub async fn get_assets_crypto( + postgres_pool: &PostgresPool, +) -> Result, Box> { + query_as!( + Asset, + r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = 'crypto'"# + ) .fetch_all(postgres_pool) .await .map_err(|e| e.into()) @@ -18,7 +45,10 @@ pub async fn get_asset( postgres_pool: &PostgresPool, symbol: &str, ) -> Result, Box> { - query_as!(Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol) + query_as!( + Asset, + r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol + ) .fetch_optional(postgres_pool) .await .map_err(|e| e.into()) @@ -28,7 +58,12 @@ pub async fn add_asset( postgres_pool: &PostgresPool, asset: Asset, ) -> Result> { - query_as!(Asset, r#"INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5) RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added) + query_as!( + Asset, + r#"INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5) + RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, + asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added + ) .fetch_one(postgres_pool) .await .map_err(|e| e.into()) @@ -39,7 +74,12 @@ pub async fn update_asset_trading( symbol: &str, trading: bool, ) -> Result, Box> { - query_as!(Asset, r#"UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, trading, symbol) + query_as!( + Asset, + r#"UPDATE assets SET trading = $1 WHERE symbol = $2 + RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, + trading, symbol + ) .fetch_optional(postgres_pool) .await .map_err(|e| e.into()) @@ -49,8 +89,13 @@ pub async fn delete_asset( postgres_pool: &PostgresPool, symbol: &str, ) -> Result, Box> { - query_as!(Asset, r#"DELETE FROM assets WHERE symbol = $1 RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, symbol) + Ok(query_as!( + Asset, + r#"DELETE FROM assets WHERE symbol = $1 + RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, + symbol + ) .fetch_optional(postgres_pool) .await - .map_err(|e| e.into()) + .unwrap()) } diff --git a/backend/src/database/bars.rs b/backend/src/database/bars.rs new file mode 100644 index 0000000..483a6cd --- /dev/null +++ b/backend/src/database/bars.rs @@ -0,0 +1,53 @@ +use crate::types::Bar; +use sqlx::{query_as, PgPool}; +use std::error::Error; + +pub async fn add_bar( + postgres_pool: &PgPool, + bar: Bar, +) -> Result> { + query_as!( + Bar, + r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume) VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING timestamp, asset_symbol, open, high, low, close, volume"#, + bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume + ) + .fetch_one(postgres_pool) + .await + .map_err(|e| e.into()) +} + +#[allow(dead_code)] +pub async fn add_bars( + postgres_pool: &PgPool, + bars: &Vec, +) -> Result, Box> { + let mut timestamps = Vec::with_capacity(bars.len()); + let mut asset_symbols = Vec::with_capacity(bars.len()); + let mut opens = Vec::with_capacity(bars.len()); + let mut highs = Vec::with_capacity(bars.len()); + let mut lows = Vec::with_capacity(bars.len()); + let mut closes = Vec::with_capacity(bars.len()); + let mut volumes = Vec::with_capacity(bars.len()); + + for bar in bars { + timestamps.push(bar.timestamp); + asset_symbols.push(bar.asset_symbol.clone()); + opens.push(bar.open); + highs.push(bar.high); + lows.push(bar.low); + closes.push(bar.close); + volumes.push(bar.volume); + } + + query_as!( + Bar, + r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume) + SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[]) + RETURNING timestamp, asset_symbol, open, high, low, close, volume"#, + ×tamps, &asset_symbols, &opens, &highs, &lows, &closes, &volumes + ) + .fetch_all(postgres_pool) + .await + .map_err(|e| e.into()) +} diff --git a/backend/src/database/mod.rs b/backend/src/database/mod.rs index 5d8c80b..5ac2df4 100644 --- a/backend/src/database/mod.rs +++ b/backend/src/database/mod.rs @@ -1 +1,2 @@ pub mod assets; +pub mod bars; diff --git a/backend/src/main.rs b/backend/src/main.rs index ba6763a..2405aad 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,24 +1,61 @@ +mod data; mod database; mod pool; mod routes; mod types; +use apca::data::v2::stream::IEX; +use data::live::{ + crypto::{self, Crypto}, + run_data_live, stocks, +}; use dotenv::dotenv; use pool::{alpaca::create_alpaca_pool_from_env, postgres::create_postgres_pool_from_env}; use routes::run_api; -use std::error::Error; +use std::{error::Error, sync::Arc}; use tokio::spawn; +const NUM_CLIENTS: usize = 10; + #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); log4rs::init_file("log4rs.yaml", Default::default()).unwrap(); - let num_clients = 10; - let postgres_pool = create_postgres_pool_from_env(num_clients).await?; - let alpaca_pool = create_alpaca_pool_from_env(num_clients).await?; + let mut threads = Vec::new(); - let threads = vec![spawn(run_api(postgres_pool.clone(), alpaca_pool.clone()))]; + let postgres_pool = create_postgres_pool_from_env(NUM_CLIENTS).await?; + let alpaca_pool = create_alpaca_pool_from_env(NUM_CLIENTS).await?; + + // Stock Live Data + let (stock_live_stream_subscription_mutex, stock_live_mpsc) = + stocks::init_stream_subscription_mpsc(postgres_pool.clone()).await?; + let stock_live_mpsc_sender_arc = Arc::new(stock_live_mpsc.sender); + + threads.push(spawn(run_data_live::( + postgres_pool.clone(), + stock_live_stream_subscription_mutex.clone(), + stock_live_mpsc.receiver, + ))); + + // Crypto Live Data + let (crypto_stream_subscription_mutex, crypto_live_mpsc) = + crypto::init_stream_subscription_mpsc(postgres_pool.clone()).await?; + let crypto_live_mpsc_sender_arc = Arc::new(crypto_live_mpsc.sender); + + threads.push(spawn(run_data_live::( + postgres_pool.clone(), + crypto_stream_subscription_mutex.clone(), + crypto_live_mpsc.receiver, + ))); + + // REST API + threads.push(spawn(run_api( + postgres_pool.clone(), + alpaca_pool.clone(), + stock_live_mpsc_sender_arc.clone(), + crypto_live_mpsc_sender_arc.clone(), + ))); for thread in threads { let _ = thread.await?; diff --git a/backend/src/routes/assets.rs b/backend/src/routes/assets.rs index 7639c78..4be5f0d 100644 --- a/backend/src/routes/assets.rs +++ b/backend/src/routes/assets.rs @@ -1,12 +1,17 @@ +use crate::data::live::AssetMPSCMessage; use crate::database; use crate::database::assets::update_asset_trading; use crate::pool::alpaca::AlpacaPool; use crate::pool::postgres::PostgresPool; use crate::types::{Asset, Class, Exchange}; use apca::api::v2::asset::{self, Symbol}; +use apca::RequestError; use axum::{extract::Path, http::StatusCode, Extension, Json}; +use log::info; use serde::Deserialize; use sqlx::types::time::OffsetDateTime; +use std::sync::Arc; +use tokio::sync::mpsc::Sender; pub async fn get_assets( Extension(postgres_pool): Extension, @@ -42,6 +47,8 @@ pub struct AddAssetRequest { pub async fn add_asset( Extension(postgres_pool): Extension, Extension(alpaca_pool): Extension, + Extension(stock_live_mpsc_sender): Extension>>, + Extension(crypto_live_mpsc_sender): Extension>>, Json(request): Json, ) -> Result<(StatusCode, Json), StatusCode> { if database::assets::get_asset(&postgres_pool, &request.symbol) @@ -58,7 +65,10 @@ pub async fn add_asset( .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .issue::(&Symbol::Sym(request.symbol)) .await - .map_err(|_| StatusCode::NOT_FOUND)?; + .map_err(|e| match e { + RequestError::Endpoint(_) => StatusCode::NOT_FOUND, + _ => StatusCode::INTERNAL_SERVER_ERROR, + })?; let asset = Asset { symbol: asset.symbol, @@ -72,6 +82,23 @@ pub async fn add_asset( .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + match asset.class { + Class(asset::Class::UsEquity) => { + stock_live_mpsc_sender + .send(AssetMPSCMessage::Added(asset.clone())) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + } + Class(asset::Class::Crypto) => { + crypto_live_mpsc_sender + .send(AssetMPSCMessage::Added(asset.clone())) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + } + _ => {} + } + + info!("Added asset {}.", asset.symbol); Ok((StatusCode::CREATED, Json(asset))) } @@ -91,21 +118,45 @@ pub async fn update_asset( .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; match asset { - Some(asset) => Ok((StatusCode::OK, Json(asset))), + Some(asset) => { + info!("Updated asset {}.", symbol); + Ok((StatusCode::OK, Json(asset))) + } None => Err(StatusCode::NOT_FOUND), } } pub async fn delete_asset( Extension(postgres_pool): Extension, + Extension(stock_live_mpsc_sender): Extension>>, + Extension(crypto_live_mpsc_sender): Extension>>, Path(symbol): Path, -) -> Result<(StatusCode, Json), StatusCode> { +) -> Result { let asset = database::assets::delete_asset(&postgres_pool, &symbol) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; match asset { - Some(asset) => Ok((StatusCode::NO_CONTENT, Json(asset))), + Some(asset) => { + match asset.class { + Class(asset::Class::UsEquity) => { + stock_live_mpsc_sender + .send(AssetMPSCMessage::Removed(asset.clone())) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + } + Class(asset::Class::Crypto) => { + crypto_live_mpsc_sender + .send(AssetMPSCMessage::Removed(asset.clone())) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + } + _ => {} + } + + info!("Deleted asset {}.", symbol); + Ok(StatusCode::NO_CONTENT) + } None => Err(StatusCode::NOT_FOUND), } } diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs index 24d30ff..a1e4c9c 100644 --- a/backend/src/routes/mod.rs +++ b/backend/src/routes/mod.rs @@ -1,16 +1,22 @@ -use crate::pool::{alpaca::AlpacaPool, postgres::PostgresPool}; +use crate::{ + data::live::AssetMPSCMessage, + pool::{alpaca::AlpacaPool, postgres::PostgresPool}, +}; use axum::{ routing::{delete, get, post}, Extension, Router, Server, }; use log::info; -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; +use tokio::sync::mpsc::Sender; pub mod assets; pub async fn run_api( postgres_pool: PostgresPool, alpaca_pool: AlpacaPool, + stock_live_mpsc_sender: Arc>, + crypto_live_mpsc_sender: Arc>, ) -> Result<(), Box> { let app = Router::new() .route("/assets", get(assets::get_assets)) @@ -19,7 +25,9 @@ pub async fn run_api( .route("/assets/:symbol", post(assets::update_asset)) .route("/assets/:symbol", delete(assets::delete_asset)) .layer(Extension(postgres_pool)) - .layer(Extension(alpaca_pool)); + .layer(Extension(alpaca_pool)) + .layer(Extension(stock_live_mpsc_sender)) + .layer(Extension(crypto_live_mpsc_sender)); let addr = SocketAddr::from(([0, 0, 0, 0], 7878)); info!("Listening on {}...", addr); diff --git a/backend/src/types.rs b/backend/src/types.rs index 7529e8b..fe592d7 100644 --- a/backend/src/types.rs +++ b/backend/src/types.rs @@ -6,7 +6,7 @@ use time::OffsetDateTime; macro_rules! impl_apca_sqlx_traits { ($outer_type:ident, $inner_type:path, $fallback:expr) => { #[derive(Clone, Debug, Copy, PartialEq, Serialize, Deserialize)] - pub struct $outer_type($inner_type); + pub struct $outer_type(pub $inner_type); impl Deref for $outer_type { type Target = $inner_type; diff --git a/support/timescaledb/999_init.sh b/support/timescaledb/999_init.sh index f83697a..e6249f3 100644 --- a/support/timescaledb/999_init.sh +++ b/support/timescaledb/999_init.sh @@ -24,14 +24,13 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL CREATE TABLE bars ( timestamp TIMESTAMPTZ NOT NULL, - asset_symbol TEXT NOT NULL REFERENCES assets(symbol), + asset_symbol TEXT NOT NULL REFERENCES assets(symbol) ON DELETE CASCADE ON UPDATE CASCADE, open DOUBLE PRECISION NOT NULL, high DOUBLE PRECISION NOT NULL, low DOUBLE PRECISION NOT NULL, close DOUBLE PRECISION NOT NULL, volume DOUBLE PRECISION NOT NULL, - PRIMARY KEY (asset_symbol, timestamp), - FOREIGN KEY (asset_symbol) REFERENCES assets(symbol) + PRIMARY KEY (asset_symbol, timestamp) ); SELECT create_hypertable('bars', 'timestamp', 'asset_symbol', 2);