Add live data threads

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-08-31 09:33:56 +03:00
parent a542225680
commit 4fbd7f0e6d
19 changed files with 729 additions and 28 deletions

View File

@@ -1,6 +1,6 @@
{ {
"db_name": "PostgreSQL", "db_name": "PostgreSQL",
"query": "DELETE FROM assets WHERE symbol = $1 RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "query": "DELETE FROM assets WHERE symbol = $1\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@@ -69,5 +69,5 @@
false false
] ]
}, },
"hash": "edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c" "hash": "515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440"
} }

View File

@@ -0,0 +1,71 @@
{
"db_name": "PostgreSQL",
"query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = 'crypto'",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "symbol",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "class: Class",
"type_info": {
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
}
},
{
"ordinal": 2,
"name": "exchange: Exchange",
"type_info": {
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
}
},
{
"ordinal": 3,
"name": "trading",
"type_info": "Bool"
},
{
"ordinal": 4,
"name": "date_added",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false,
false,
false
]
},
"hash": "826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e"
}

View File

@@ -1,6 +1,6 @@
{ {
"db_name": "PostgreSQL", "db_name": "PostgreSQL",
"query": "INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5) RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "query": "INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5)\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@@ -100,5 +100,5 @@
false false
] ]
}, },
"hash": "f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c" "hash": "987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469"
} }

View File

@@ -1,6 +1,6 @@
{ {
"db_name": "PostgreSQL", "db_name": "PostgreSQL",
"query": "UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "query": "UPDATE assets SET trading = $1 WHERE symbol = $2\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@@ -70,5 +70,5 @@
false false
] ]
}, },
"hash": "053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8" "hash": "cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2"
} }

View File

@@ -0,0 +1,64 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume)\n SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[])\n RETURNING timestamp, asset_symbol, open, high, low, close, volume",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "timestamp",
"type_info": "Timestamptz"
},
{
"ordinal": 1,
"name": "asset_symbol",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "open",
"type_info": "Float8"
},
{
"ordinal": 3,
"name": "high",
"type_info": "Float8"
},
{
"ordinal": 4,
"name": "low",
"type_info": "Float8"
},
{
"ordinal": 5,
"name": "close",
"type_info": "Float8"
},
{
"ordinal": 6,
"name": "volume",
"type_info": "Float8"
}
],
"parameters": {
"Left": [
"TimestamptzArray",
"TextArray",
"Float8Array",
"Float8Array",
"Float8Array",
"Float8Array",
"Float8Array"
]
},
"nullable": [
false,
false,
false,
false,
false,
false,
false
]
},
"hash": "e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d"
}

View File

@@ -0,0 +1,64 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume) VALUES ($1, $2, $3, $4, $5, $6, $7)\n RETURNING timestamp, asset_symbol, open, high, low, close, volume",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "timestamp",
"type_info": "Timestamptz"
},
{
"ordinal": 1,
"name": "asset_symbol",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "open",
"type_info": "Float8"
},
{
"ordinal": 3,
"name": "high",
"type_info": "Float8"
},
{
"ordinal": 4,
"name": "low",
"type_info": "Float8"
},
{
"ordinal": 5,
"name": "close",
"type_info": "Float8"
},
{
"ordinal": 6,
"name": "volume",
"type_info": "Float8"
}
],
"parameters": {
"Left": [
"Timestamptz",
"Text",
"Float8",
"Float8",
"Float8",
"Float8",
"Float8"
]
},
"nullable": [
false,
false,
false,
false,
false,
false,
false
]
},
"hash": "e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad"
}

View File

@@ -0,0 +1,71 @@
{
"db_name": "PostgreSQL",
"query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = 'us_equity'",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "symbol",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "class: Class",
"type_info": {
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
}
},
{
"ordinal": 2,
"name": "exchange: Exchange",
"type_info": {
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
}
},
{
"ordinal": 3,
"name": "trading",
"type_info": "Bool"
},
{
"ordinal": 4,
"name": "date_added",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false,
false,
false
]
},
"hash": "f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b"
}

View File

@@ -0,0 +1,57 @@
use super::{AssetMPSC, StockStreamSubscription};
use crate::{
database::assets::get_assets_crypto,
pool::{alpaca::create_alpaca_client_from_env, postgres::PostgresPool},
};
use apca::data::v2::stream::{
drive, Bar, CustomUrl, MarketData, Quote, RealtimeData, SymbolList, Symbols, Trade, IEX,
};
use futures_util::FutureExt;
use std::{error::Error, sync::Arc};
use tokio::sync::{mpsc, Mutex};
#[derive(Default)]
pub struct CryptoUrl;
impl ToString for CryptoUrl {
fn to_string(&self) -> String {
"wss://stream.data.alpaca.markets/v1beta3/crypto/us".into()
}
}
pub type Crypto = CustomUrl<CryptoUrl>;
pub async fn init_stream_subscription_mpsc(
postgres_pool: PostgresPool,
) -> Result<(Arc<Mutex<StockStreamSubscription<IEX>>>, AssetMPSC), Box<dyn Error + Send + Sync>> {
let client = create_alpaca_client_from_env().await?;
let (mut stream, mut subscription) = client
.subscribe::<RealtimeData<Crypto, Bar, Quote, Trade>>()
.await?;
let symbols = get_assets_crypto(&postgres_pool)
.await?
.iter()
.map(|asset| asset.symbol.clone())
.collect::<Vec<String>>();
if !symbols.is_empty() {
let data = MarketData {
bars: Symbols::List(SymbolList::from(symbols)),
..Default::default()
};
drive(subscription.subscribe(&data).boxed(), &mut stream)
.await
.unwrap()
.unwrap()
.unwrap();
}
let stream_subscription_mutex = Arc::new(Mutex::new((stream, subscription)));
let (sender, receiver) = mpsc::channel(50);
let asset_mpcs = AssetMPSC { sender, receiver };
Ok((stream_subscription_mutex, asset_mpcs))
}

View File

@@ -0,0 +1,133 @@
pub mod crypto;
pub mod stocks;
use crate::{database::bars::add_bar, pool::postgres::PostgresPool, types::Asset};
use apca::{
data::v2::stream::{drive, Data, MarketData, RealtimeData, Source, SymbolList, Symbols},
Subscribable,
};
use futures_util::{FutureExt, StreamExt};
use log::{debug, error, info, warn};
use std::{any::type_name, error::Error, sync::Arc, time::Duration};
use time::OffsetDateTime;
use tokio::{
spawn,
sync::{
mpsc::{Receiver, Sender},
Mutex,
},
time::timeout,
};
pub enum AssetMPSCMessage {
Added(Asset),
Removed(Asset),
}
pub struct AssetMPSC {
pub sender: Sender<AssetMPSCMessage>,
pub receiver: Receiver<AssetMPSCMessage>,
}
pub type StockStreamSubscription<S> = (
<RealtimeData<S> as Subscribable>::Stream,
<RealtimeData<S> as Subscribable>::Subscription,
);
pub const TIMEOUT_DURATION: Duration = Duration::from_millis(100);
pub async fn run_data_live<S>(
postgres_pool: PostgresPool,
stream_subscription_mutex: Arc<Mutex<StockStreamSubscription<S>>>,
asset_mpsc_receiver: Receiver<AssetMPSCMessage>,
) -> Result<(), Box<dyn Error + Send + Sync>>
where
S: Source + 'static,
{
info!("Running live data thread for {}.", type_name::<S>());
spawn(mpsc_handler::<S>(
stream_subscription_mutex.clone(),
asset_mpsc_receiver,
));
loop {
let (stream, _) = &mut *stream_subscription_mutex.lock().await;
match timeout(TIMEOUT_DURATION, stream.next()).await {
Ok(Some(Ok(Ok(Data::Bar(bar))))) => {
let bar = add_bar(
&postgres_pool,
crate::types::Bar {
timestamp: match OffsetDateTime::from_unix_timestamp(
bar.timestamp.timestamp(),
) {
Ok(timestamp) => timestamp,
Err(_) => {
warn!(
"Failed to parse timestamp for {}: {}.",
bar.symbol, bar.timestamp
);
continue;
}
},
asset_symbol: bar.symbol,
open: bar.open_price.to_f64().unwrap_or_default(),
high: bar.high_price.to_f64().unwrap_or_default(),
low: bar.low_price.to_f64().unwrap_or_default(),
close: bar.close_price.to_f64().unwrap_or_default(),
volume: bar.volume.to_f64().unwrap_or_default(),
},
)
.await?;
debug!(
"Saved timestamp for {}: {}.",
bar.asset_symbol, bar.timestamp
);
}
Ok(Some(Ok(Ok(_)))) | Ok(Some(Ok(Err(_)))) | Err(_) => continue,
_ => panic!(),
}
}
}
pub async fn mpsc_handler<S: Source>(
stream_subscription_mutex: Arc<Mutex<StockStreamSubscription<S>>>,
mut asset_mpsc_receiver: Receiver<AssetMPSCMessage>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
while let Some(message) = asset_mpsc_receiver.recv().await {
let (stream, subscription) = &mut *stream_subscription_mutex.lock().await;
match message {
AssetMPSCMessage::Added(asset) => {
let data = MarketData {
bars: Symbols::List(SymbolList::from(vec![asset.symbol.clone()])),
..Default::default()
};
match drive(subscription.subscribe(&data).boxed(), stream).await {
Ok(_) => info!("Successfully subscribed to {}", asset.symbol),
Err(e) => {
error!("Failed to subscribe to {}: {:?}", asset.symbol, e);
continue;
}
}
}
AssetMPSCMessage::Removed(asset) => {
let data = MarketData {
bars: Symbols::List(SymbolList::from(vec![asset.symbol.clone()])),
..Default::default()
};
match drive(subscription.unsubscribe(&data).boxed(), stream).await {
Ok(_) => info!("Successfully unsubscribed from {}", asset.symbol),
Err(e) => {
error!("Failed to unsubscribe from {}: {:?}", asset.symbol, e);
continue;
}
}
}
}
}
Ok(())
}

View File

@@ -0,0 +1,46 @@
use super::{AssetMPSC, StockStreamSubscription};
use crate::{
database::assets::get_assets_stocks,
pool::{alpaca::create_alpaca_client_from_env, postgres::PostgresPool},
};
use apca::data::v2::stream::{
drive, Bar, MarketData, Quote, RealtimeData, SymbolList, Symbols, Trade, IEX,
};
use futures_util::FutureExt;
use std::{error::Error, sync::Arc};
use tokio::sync::{mpsc, Mutex};
pub async fn init_stream_subscription_mpsc(
postgres_pool: PostgresPool,
) -> Result<(Arc<Mutex<StockStreamSubscription<IEX>>>, AssetMPSC), Box<dyn Error + Send + Sync>> {
let client = create_alpaca_client_from_env().await?;
let (mut stream, mut subscription) = client
.subscribe::<RealtimeData<IEX, Bar, Quote, Trade>>()
.await?;
let symbols = get_assets_stocks(&postgres_pool)
.await?
.iter()
.map(|asset| asset.symbol.clone())
.collect::<Vec<String>>();
if !symbols.is_empty() {
let data = MarketData {
bars: Symbols::List(SymbolList::from(symbols)),
..Default::default()
};
drive(subscription.subscribe(&data).boxed(), &mut stream)
.await
.unwrap()
.unwrap()
.unwrap();
}
let stream_subscription_mutex = Arc::new(Mutex::new((stream, subscription)));
let (sender, receiver) = mpsc::channel(50);
let asset_mpcs = AssetMPSC { sender, receiver };
Ok((stream_subscription_mutex, asset_mpcs))
}

1
backend/src/data/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod live;

View File

@@ -8,7 +8,34 @@ use std::error::Error;
pub async fn get_assets( pub async fn get_assets(
postgres_pool: &PostgresPool, postgres_pool: &PostgresPool,
) -> Result<Vec<Asset>, Box<dyn Error + Send + Sync>> { ) -> Result<Vec<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"#) query_as!(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"#
)
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
}
pub async fn get_assets_stocks(
postgres_pool: &PostgresPool,
) -> Result<Vec<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = 'us_equity'"#
)
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
}
pub async fn get_assets_crypto(
postgres_pool: &PostgresPool,
) -> Result<Vec<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = 'crypto'"#
)
.fetch_all(postgres_pool) .fetch_all(postgres_pool)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
@@ -18,7 +45,10 @@ pub async fn get_asset(
postgres_pool: &PostgresPool, postgres_pool: &PostgresPool,
symbol: &str, symbol: &str,
) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> { ) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol) query_as!(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol
)
.fetch_optional(postgres_pool) .fetch_optional(postgres_pool)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
@@ -28,7 +58,12 @@ pub async fn add_asset(
postgres_pool: &PostgresPool, postgres_pool: &PostgresPool,
asset: Asset, asset: Asset,
) -> Result<Asset, Box<dyn Error + Send + Sync>> { ) -> Result<Asset, Box<dyn Error + Send + Sync>> {
query_as!(Asset, r#"INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5) RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added) query_as!(
Asset,
r#"INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5)
RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#,
asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added
)
.fetch_one(postgres_pool) .fetch_one(postgres_pool)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
@@ -39,7 +74,12 @@ pub async fn update_asset_trading(
symbol: &str, symbol: &str,
trading: bool, trading: bool,
) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> { ) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(Asset, r#"UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, trading, symbol) query_as!(
Asset,
r#"UPDATE assets SET trading = $1 WHERE symbol = $2
RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#,
trading, symbol
)
.fetch_optional(postgres_pool) .fetch_optional(postgres_pool)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
@@ -49,8 +89,13 @@ pub async fn delete_asset(
postgres_pool: &PostgresPool, postgres_pool: &PostgresPool,
symbol: &str, symbol: &str,
) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> { ) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(Asset, r#"DELETE FROM assets WHERE symbol = $1 RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, symbol) Ok(query_as!(
Asset,
r#"DELETE FROM assets WHERE symbol = $1
RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#,
symbol
)
.fetch_optional(postgres_pool) .fetch_optional(postgres_pool)
.await .await
.map_err(|e| e.into()) .unwrap())
} }

View File

@@ -0,0 +1,53 @@
use crate::types::Bar;
use sqlx::{query_as, PgPool};
use std::error::Error;
pub async fn add_bar(
postgres_pool: &PgPool,
bar: Bar,
) -> Result<Bar, Box<dyn Error + Send + Sync>> {
query_as!(
Bar,
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING timestamp, asset_symbol, open, high, low, close, volume"#,
bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume
)
.fetch_one(postgres_pool)
.await
.map_err(|e| e.into())
}
#[allow(dead_code)]
pub async fn add_bars(
postgres_pool: &PgPool,
bars: &Vec<Bar>,
) -> Result<Vec<Bar>, Box<dyn Error + Send + Sync>> {
let mut timestamps = Vec::with_capacity(bars.len());
let mut asset_symbols = Vec::with_capacity(bars.len());
let mut opens = Vec::with_capacity(bars.len());
let mut highs = Vec::with_capacity(bars.len());
let mut lows = Vec::with_capacity(bars.len());
let mut closes = Vec::with_capacity(bars.len());
let mut volumes = Vec::with_capacity(bars.len());
for bar in bars {
timestamps.push(bar.timestamp);
asset_symbols.push(bar.asset_symbol.clone());
opens.push(bar.open);
highs.push(bar.high);
lows.push(bar.low);
closes.push(bar.close);
volumes.push(bar.volume);
}
query_as!(
Bar,
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume)
SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[])
RETURNING timestamp, asset_symbol, open, high, low, close, volume"#,
&timestamps, &asset_symbols, &opens, &highs, &lows, &closes, &volumes
)
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
}

View File

@@ -1 +1,2 @@
pub mod assets; pub mod assets;
pub mod bars;

View File

@@ -1,24 +1,61 @@
mod data;
mod database; mod database;
mod pool; mod pool;
mod routes; mod routes;
mod types; mod types;
use apca::data::v2::stream::IEX;
use data::live::{
crypto::{self, Crypto},
run_data_live, stocks,
};
use dotenv::dotenv; use dotenv::dotenv;
use pool::{alpaca::create_alpaca_pool_from_env, postgres::create_postgres_pool_from_env}; use pool::{alpaca::create_alpaca_pool_from_env, postgres::create_postgres_pool_from_env};
use routes::run_api; use routes::run_api;
use std::error::Error; use std::{error::Error, sync::Arc};
use tokio::spawn; use tokio::spawn;
const NUM_CLIENTS: usize = 10;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
dotenv().ok(); dotenv().ok();
log4rs::init_file("log4rs.yaml", Default::default()).unwrap(); log4rs::init_file("log4rs.yaml", Default::default()).unwrap();
let num_clients = 10; let mut threads = Vec::new();
let postgres_pool = create_postgres_pool_from_env(num_clients).await?;
let alpaca_pool = create_alpaca_pool_from_env(num_clients).await?;
let threads = vec![spawn(run_api(postgres_pool.clone(), alpaca_pool.clone()))]; let postgres_pool = create_postgres_pool_from_env(NUM_CLIENTS).await?;
let alpaca_pool = create_alpaca_pool_from_env(NUM_CLIENTS).await?;
// Stock Live Data
let (stock_live_stream_subscription_mutex, stock_live_mpsc) =
stocks::init_stream_subscription_mpsc(postgres_pool.clone()).await?;
let stock_live_mpsc_sender_arc = Arc::new(stock_live_mpsc.sender);
threads.push(spawn(run_data_live::<IEX>(
postgres_pool.clone(),
stock_live_stream_subscription_mutex.clone(),
stock_live_mpsc.receiver,
)));
// Crypto Live Data
let (crypto_stream_subscription_mutex, crypto_live_mpsc) =
crypto::init_stream_subscription_mpsc(postgres_pool.clone()).await?;
let crypto_live_mpsc_sender_arc = Arc::new(crypto_live_mpsc.sender);
threads.push(spawn(run_data_live::<Crypto>(
postgres_pool.clone(),
crypto_stream_subscription_mutex.clone(),
crypto_live_mpsc.receiver,
)));
// REST API
threads.push(spawn(run_api(
postgres_pool.clone(),
alpaca_pool.clone(),
stock_live_mpsc_sender_arc.clone(),
crypto_live_mpsc_sender_arc.clone(),
)));
for thread in threads { for thread in threads {
let _ = thread.await?; let _ = thread.await?;

View File

@@ -1,12 +1,17 @@
use crate::data::live::AssetMPSCMessage;
use crate::database; use crate::database;
use crate::database::assets::update_asset_trading; use crate::database::assets::update_asset_trading;
use crate::pool::alpaca::AlpacaPool; use crate::pool::alpaca::AlpacaPool;
use crate::pool::postgres::PostgresPool; use crate::pool::postgres::PostgresPool;
use crate::types::{Asset, Class, Exchange}; use crate::types::{Asset, Class, Exchange};
use apca::api::v2::asset::{self, Symbol}; use apca::api::v2::asset::{self, Symbol};
use apca::RequestError;
use axum::{extract::Path, http::StatusCode, Extension, Json}; use axum::{extract::Path, http::StatusCode, Extension, Json};
use log::info;
use serde::Deserialize; use serde::Deserialize;
use sqlx::types::time::OffsetDateTime; use sqlx::types::time::OffsetDateTime;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
pub async fn get_assets( pub async fn get_assets(
Extension(postgres_pool): Extension<PostgresPool>, Extension(postgres_pool): Extension<PostgresPool>,
@@ -42,6 +47,8 @@ pub struct AddAssetRequest {
pub async fn add_asset( pub async fn add_asset(
Extension(postgres_pool): Extension<PostgresPool>, Extension(postgres_pool): Extension<PostgresPool>,
Extension(alpaca_pool): Extension<AlpacaPool>, Extension(alpaca_pool): Extension<AlpacaPool>,
Extension(stock_live_mpsc_sender): Extension<Arc<Sender<AssetMPSCMessage>>>,
Extension(crypto_live_mpsc_sender): Extension<Arc<Sender<AssetMPSCMessage>>>,
Json(request): Json<AddAssetRequest>, Json(request): Json<AddAssetRequest>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> { ) -> Result<(StatusCode, Json<Asset>), StatusCode> {
if database::assets::get_asset(&postgres_pool, &request.symbol) if database::assets::get_asset(&postgres_pool, &request.symbol)
@@ -58,7 +65,10 @@ pub async fn add_asset(
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.issue::<asset::Get>(&Symbol::Sym(request.symbol)) .issue::<asset::Get>(&Symbol::Sym(request.symbol))
.await .await
.map_err(|_| StatusCode::NOT_FOUND)?; .map_err(|e| match e {
RequestError::Endpoint(_) => StatusCode::NOT_FOUND,
_ => StatusCode::INTERNAL_SERVER_ERROR,
})?;
let asset = Asset { let asset = Asset {
symbol: asset.symbol, symbol: asset.symbol,
@@ -72,6 +82,23 @@ pub async fn add_asset(
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match asset.class {
Class(asset::Class::UsEquity) => {
stock_live_mpsc_sender
.send(AssetMPSCMessage::Added(asset.clone()))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
Class(asset::Class::Crypto) => {
crypto_live_mpsc_sender
.send(AssetMPSCMessage::Added(asset.clone()))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
_ => {}
}
info!("Added asset {}.", asset.symbol);
Ok((StatusCode::CREATED, Json(asset))) Ok((StatusCode::CREATED, Json(asset)))
} }
@@ -91,21 +118,45 @@ pub async fn update_asset(
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match asset { match asset {
Some(asset) => Ok((StatusCode::OK, Json(asset))), Some(asset) => {
info!("Updated asset {}.", symbol);
Ok((StatusCode::OK, Json(asset)))
}
None => Err(StatusCode::NOT_FOUND), None => Err(StatusCode::NOT_FOUND),
} }
} }
pub async fn delete_asset( pub async fn delete_asset(
Extension(postgres_pool): Extension<PostgresPool>, Extension(postgres_pool): Extension<PostgresPool>,
Extension(stock_live_mpsc_sender): Extension<Arc<Sender<AssetMPSCMessage>>>,
Extension(crypto_live_mpsc_sender): Extension<Arc<Sender<AssetMPSCMessage>>>,
Path(symbol): Path<String>, Path(symbol): Path<String>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> { ) -> Result<StatusCode, StatusCode> {
let asset = database::assets::delete_asset(&postgres_pool, &symbol) let asset = database::assets::delete_asset(&postgres_pool, &symbol)
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match asset { match asset {
Some(asset) => Ok((StatusCode::NO_CONTENT, Json(asset))), Some(asset) => {
match asset.class {
Class(asset::Class::UsEquity) => {
stock_live_mpsc_sender
.send(AssetMPSCMessage::Removed(asset.clone()))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
Class(asset::Class::Crypto) => {
crypto_live_mpsc_sender
.send(AssetMPSCMessage::Removed(asset.clone()))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
_ => {}
}
info!("Deleted asset {}.", symbol);
Ok(StatusCode::NO_CONTENT)
}
None => Err(StatusCode::NOT_FOUND), None => Err(StatusCode::NOT_FOUND),
} }
} }

View File

@@ -1,16 +1,22 @@
use crate::pool::{alpaca::AlpacaPool, postgres::PostgresPool}; use crate::{
data::live::AssetMPSCMessage,
pool::{alpaca::AlpacaPool, postgres::PostgresPool},
};
use axum::{ use axum::{
routing::{delete, get, post}, routing::{delete, get, post},
Extension, Router, Server, Extension, Router, Server,
}; };
use log::info; use log::info;
use std::net::SocketAddr; use std::{net::SocketAddr, sync::Arc};
use tokio::sync::mpsc::Sender;
pub mod assets; pub mod assets;
pub async fn run_api( pub async fn run_api(
postgres_pool: PostgresPool, postgres_pool: PostgresPool,
alpaca_pool: AlpacaPool, alpaca_pool: AlpacaPool,
stock_live_mpsc_sender: Arc<Sender<AssetMPSCMessage>>,
crypto_live_mpsc_sender: Arc<Sender<AssetMPSCMessage>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let app = Router::new() let app = Router::new()
.route("/assets", get(assets::get_assets)) .route("/assets", get(assets::get_assets))
@@ -19,7 +25,9 @@ pub async fn run_api(
.route("/assets/:symbol", post(assets::update_asset)) .route("/assets/:symbol", post(assets::update_asset))
.route("/assets/:symbol", delete(assets::delete_asset)) .route("/assets/:symbol", delete(assets::delete_asset))
.layer(Extension(postgres_pool)) .layer(Extension(postgres_pool))
.layer(Extension(alpaca_pool)); .layer(Extension(alpaca_pool))
.layer(Extension(stock_live_mpsc_sender))
.layer(Extension(crypto_live_mpsc_sender));
let addr = SocketAddr::from(([0, 0, 0, 0], 7878)); let addr = SocketAddr::from(([0, 0, 0, 0], 7878));
info!("Listening on {}...", addr); info!("Listening on {}...", addr);

View File

@@ -6,7 +6,7 @@ use time::OffsetDateTime;
macro_rules! impl_apca_sqlx_traits { macro_rules! impl_apca_sqlx_traits {
($outer_type:ident, $inner_type:path, $fallback:expr) => { ($outer_type:ident, $inner_type:path, $fallback:expr) => {
#[derive(Clone, Debug, Copy, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, Copy, PartialEq, Serialize, Deserialize)]
pub struct $outer_type($inner_type); pub struct $outer_type(pub $inner_type);
impl Deref for $outer_type { impl Deref for $outer_type {
type Target = $inner_type; type Target = $inner_type;

View File

@@ -24,14 +24,13 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
CREATE TABLE bars ( CREATE TABLE bars (
timestamp TIMESTAMPTZ NOT NULL, timestamp TIMESTAMPTZ NOT NULL,
asset_symbol TEXT NOT NULL REFERENCES assets(symbol), asset_symbol TEXT NOT NULL REFERENCES assets(symbol) ON DELETE CASCADE ON UPDATE CASCADE,
open DOUBLE PRECISION NOT NULL, open DOUBLE PRECISION NOT NULL,
high DOUBLE PRECISION NOT NULL, high DOUBLE PRECISION NOT NULL,
low DOUBLE PRECISION NOT NULL, low DOUBLE PRECISION NOT NULL,
close DOUBLE PRECISION NOT NULL, close DOUBLE PRECISION NOT NULL,
volume DOUBLE PRECISION NOT NULL, volume DOUBLE PRECISION NOT NULL,
PRIMARY KEY (asset_symbol, timestamp), PRIMARY KEY (asset_symbol, timestamp)
FOREIGN KEY (asset_symbol) REFERENCES assets(symbol)
); );
SELECT create_hypertable('bars', 'timestamp', 'asset_symbol', 2); SELECT create_hypertable('bars', 'timestamp', 'asset_symbol', 2);