From 548a8e42d5569da4e1db55faa0c8b7ad8413ebe6 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Tue, 5 Sep 2023 11:31:47 +0300 Subject: [PATCH] Add stock market calendar Signed-off-by: Nikolaos Karaolidis --- ...2e0981687c4e0ff48ccc538cf06b3bd616c60.json | 36 +++++++ ...5ded2148c9e46409975b7bf0b76f7ba0552e8.json | 32 ++++++ backend/Cargo.lock | 100 ++++++++++++++++++ backend/Cargo.toml | 11 +- backend/src/config.rs | 23 +++- backend/src/data/calendar.rs | 41 +++++++ backend/src/data/live.rs | 25 ++--- backend/src/data/mod.rs | 1 + backend/src/database/assets.rs | 36 +++---- backend/src/database/bars.rs | 12 +-- backend/src/database/calendar.rs | 54 ++++++++++ backend/src/database/mod.rs | 1 + backend/src/main.rs | 19 ++-- backend/src/routes/assets.rs | 16 +-- backend/src/routes/mod.rs | 4 +- backend/src/types/api/incoming/calendar.rs | 29 +++++ backend/src/types/api/incoming/mod.rs | 2 + backend/src/types/bar.rs | 6 +- backend/src/types/calendar.rs | 18 ++++ backend/src/types/mod.rs | 2 + support/timescaledb/999_init.sh | 22 ++++ 21 files changed, 417 insertions(+), 73 deletions(-) create mode 100644 backend/.sqlx/query-8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60.json create mode 100644 backend/.sqlx/query-b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8.json create mode 100644 backend/src/data/calendar.rs create mode 100644 backend/src/database/calendar.rs create mode 100644 backend/src/types/api/incoming/calendar.rs create mode 100644 backend/src/types/calendar.rs diff --git a/backend/.sqlx/query-8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60.json b/backend/.sqlx/query-8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60.json new file mode 100644 index 0000000..7a89b3f --- /dev/null +++ b/backend/.sqlx/query-8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60.json @@ -0,0 +1,36 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO calendar (date, open, close)\n SELECT * FROM UNNEST($1::date[], $2::time[], $3::time[])\n RETURNING date, open, close", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "date", + "type_info": "Date" + }, + { + "ordinal": 1, + "name": "open", + "type_info": "Time" + }, + { + "ordinal": 2, + "name": "close", + "type_info": "Time" + } + ], + "parameters": { + "Left": [ + "DateArray", + "TimeArray", + "TimeArray" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60" +} diff --git a/backend/.sqlx/query-b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8.json b/backend/.sqlx/query-b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8.json new file mode 100644 index 0000000..c206585 --- /dev/null +++ b/backend/.sqlx/query-b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM calendar RETURNING date, open, close", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "date", + "type_info": "Date" + }, + { + "ordinal": 1, + "name": "open", + "type_info": "Time" + }, + { + "ordinal": 2, + "name": "close", + "type_info": "Time" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8" +} diff --git a/backend/Cargo.lock b/backend/Cargo.lock index e8a8032..7462155 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -144,6 +144,7 @@ dependencies = [ "axum", "dotenv", "futures-util", + "governor", "http", "log", "log4rs", @@ -330,6 +331,19 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.0", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -507,6 +521,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -574,12 +603,19 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -618,6 +654,24 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +[[package]] +name = "governor" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "821239e5672ff23e2a7060901fa622950bbd80b649cdaadd78d1c1767ed14eb4" +dependencies = [ + "cfg-if", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "quanta", + "rand", + "smallvec", +] + [[package]] name = "h2" version = "0.3.21" @@ -965,6 +1019,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "matchit" version = "0.7.2" @@ -1036,6 +1099,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -1046,6 +1115,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -1290,6 +1365,22 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.33" @@ -1329,6 +1420,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.2.16" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 7af11fe..f63deda 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -32,6 +32,13 @@ time = { version = "0.3.27", features = [ "serde", ] } futures-util = "0.3.28" -reqwest = { version = "0.11.20", features = ["json", "serde_json"] } -tokio-tungstenite = { version = "0.20.0", features = ["tokio-native-tls", "native-tls"] } +reqwest = { version = "0.11.20", features = [ + "json", + "serde_json", +] } +tokio-tungstenite = { version = "0.20.0", features = [ + "tokio-native-tls", + "native-tls", +] } http = "0.2.9" +governor = "0.6.0" diff --git a/backend/src/config.rs b/backend/src/config.rs index e3ef0e3..5b44f9d 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -1,3 +1,5 @@ +use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; +use http::HeaderMap; use reqwest::Client; use sqlx::{postgres::PgPoolOptions, PgPool}; use std::{env, sync::Arc}; @@ -5,22 +7,35 @@ use std::{env, sync::Arc}; pub struct AppConfig { pub alpaca_api_key: String, pub alpaca_api_secret: String, + pub alpaca_client: Client, + pub alpaca_rate_limit: DefaultDirectRateLimiter, pub postgres_pool: PgPool, - pub reqwest_client: Client, } const NUM_CLIENTS: usize = 10; impl AppConfig { pub async fn from_env() -> Result> { + let alpaca_api_key = env::var("ALPACA_API_KEY")?; + let alpaca_api_secret = env::var("ALPACA_API_SECRET")?; + let alpaca_rate_limit = env::var("ALPACA_RATE_LIMIT")?; + Ok(AppConfig { - alpaca_api_key: env::var("APCA_API_KEY_ID").unwrap(), - alpaca_api_secret: env::var("APCA_API_SECRET_KEY").unwrap(), + alpaca_api_key: alpaca_api_key.clone(), + alpaca_api_secret: alpaca_api_secret.clone(), + alpaca_client: Client::builder() + .default_headers({ + let mut headers = HeaderMap::new(); + headers.insert("APCA-API-KEY-ID", alpaca_api_key.parse()?); + headers.insert("APCA-API-SECRET-KEY", alpaca_api_secret.parse()?); + headers + }) + .build()?, + alpaca_rate_limit: RateLimiter::direct(Quota::per_minute(alpaca_rate_limit.parse()?)), postgres_pool: PgPoolOptions::new() .max_connections(NUM_CLIENTS as u32) .connect(&env::var("DATABASE_URL")?) .await?, - reqwest_client: Client::new(), }) } diff --git a/backend/src/data/calendar.rs b/backend/src/data/calendar.rs new file mode 100644 index 0000000..c856dff --- /dev/null +++ b/backend/src/data/calendar.rs @@ -0,0 +1,41 @@ +use crate::{ + config::AppConfig, + database, + types::{api, CalendarDate}, +}; +use log::info; +use std::{error::Error, sync::Arc, time::Duration}; +use tokio::time::interval; + +const ALPACA_CALENDAR_API_URL: &str = "https://api.alpaca.markets/v2/calendar"; +const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 3); +const EARLIEST_DATE: &str = "1970-01-01"; +const LATEST_DATE: &str = "2029-12-31"; + +pub async fn run(app_config: Arc) -> Result<(), Box> { + let mut interval = interval(REFRESH_INTERVAL); + + loop { + interval.tick().await; + + info!("Refreshing calendar..."); + + app_config.alpaca_rate_limit.until_ready().await; + let calendar_dates = app_config + .alpaca_client + .get(ALPACA_CALENDAR_API_URL) + .query(&[("start", EARLIEST_DATE), ("end", LATEST_DATE)]) + .send() + .await? + .json::>() + .await? + .iter() + .map(CalendarDate::from) + .collect::>(); + + database::calendar::reset_calendar_dates(&app_config.postgres_pool, &calendar_dates) + .await?; + + info!("Refreshed calendar."); + } +} diff --git a/backend/src/data/live.rs b/backend/src/data/live.rs index c563895..ea5088c 100644 --- a/backend/src/data/live.rs +++ b/backend/src/data/live.rs @@ -16,20 +16,18 @@ use futures_util::{ }; use log::{debug, error, info, warn}; use serde_json::{from_str, to_string}; -use std::{error::Error, sync::Arc, time::Duration}; +use std::{error::Error, sync::Arc}; use tokio::{ net::TcpStream, spawn, sync::{broadcast::Receiver, RwLock}, - time::timeout, }; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2/iex"; const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; -const TIMEOUT_DURATION: Duration = Duration::from_millis(100); -pub async fn run_data_live( +pub async fn run( class: Class, app_config: Arc, asset_broadcast_receiver: Receiver, @@ -82,7 +80,6 @@ pub async fn run_data_live( } let sink = Arc::new(RwLock::new(sink)); - let stream = Arc::new(RwLock::new(stream)); info!("Running live data thread for {:?}.", class); @@ -101,13 +98,11 @@ pub async fn websocket_handler( app_config: Arc, class: Class, sink: Arc>, Message>>>, - stream: Arc>>>>, + mut stream: SplitStream>>, ) -> Result<(), Box> { loop { - let mut stream = stream.write().await; - - match timeout(TIMEOUT_DURATION, stream.next()).await { - Ok(Some(Ok(Message::Text(data)))) => match from_str::>(&data) { + match stream.next().await { + Some(Ok(Message::Text(data))) => match from_str::>(&data) { Ok(parsed_data) => { for message in parsed_data { match message { @@ -132,13 +127,9 @@ pub async fn websocket_handler( warn!("Unparsed incoming message: {:?}: {}", data, e); } }, - Ok(Some(Ok(Message::Ping(_)))) => { - sink.write().await.send(Message::Pong(vec![])).await? - } - Ok(unknown) => { - error!("Unknown incoming message: {:?}", unknown); - } - Err(_) => {} + Some(Ok(Message::Ping(_))) => sink.write().await.send(Message::Pong(vec![])).await?, + Some(unknown) => error!("Unknown incoming message: {:?}", unknown), + None => panic!(), } } } diff --git a/backend/src/data/mod.rs b/backend/src/data/mod.rs index e95c262..b9b5fb9 100644 --- a/backend/src/data/mod.rs +++ b/backend/src/data/mod.rs @@ -1 +1,2 @@ +pub mod calendar; pub mod live; diff --git a/backend/src/database/assets.rs b/backend/src/database/assets.rs index f7b3511..b93f5c7 100644 --- a/backend/src/database/assets.rs +++ b/backend/src/database/assets.rs @@ -9,9 +9,9 @@ pub async fn get_assets( Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"# ) - .fetch_all(postgres_pool) - .await - .map_err(|e| e.into()) + .fetch_all(postgres_pool) + .await + .map_err(|e| e.into()) } pub async fn get_assets_with_class( @@ -22,9 +22,9 @@ pub async fn get_assets_with_class( Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = $1::CLASS"#, &class as &Class ) - .fetch_all(postgres_pool) - .await - .map_err(|e| e.into()) + .fetch_all(postgres_pool) + .await + .map_err(|e| e.into()) } pub async fn get_asset( @@ -35,9 +35,9 @@ pub async fn get_asset( Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol ) - .fetch_optional(postgres_pool) - .await - .map_err(|e| e.into()) + .fetch_optional(postgres_pool) + .await + .map_err(|e| e.into()) } pub async fn add_asset( @@ -50,9 +50,9 @@ pub async fn add_asset( RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added ) - .fetch_one(postgres_pool) - .await - .map_err(|e| e.into()) + .fetch_one(postgres_pool) + .await + .map_err(|e| e.into()) } pub async fn update_asset_trading( @@ -66,9 +66,9 @@ pub async fn update_asset_trading( RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, trading, symbol ) - .fetch_optional(postgres_pool) - .await - .map_err(|e| e.into()) + .fetch_optional(postgres_pool) + .await + .map_err(|e| e.into()) } pub async fn delete_asset( @@ -81,7 +81,7 @@ pub async fn delete_asset( RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, symbol ) - .fetch_optional(postgres_pool) - .await - .unwrap()) + .fetch_optional(postgres_pool) + .await + .unwrap()) } diff --git a/backend/src/database/bars.rs b/backend/src/database/bars.rs index 46784ef..8cdcbab 100644 --- a/backend/src/database/bars.rs +++ b/backend/src/database/bars.rs @@ -13,9 +13,9 @@ pub async fn add_bar( RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted ) - .fetch_one(postgres_pool) - .await - .map_err(|e| e.into()) + .fetch_one(postgres_pool) + .await + .map_err(|e| e.into()) } #[allow(dead_code)] @@ -52,7 +52,7 @@ pub async fn add_bars( RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, ×tamps, &asset_symbols, &opens, &highs, &lows, &closes, &volumes, &num_trades, &volumes_weighted ) - .fetch_all(postgres_pool) - .await - .map_err(|e| e.into()) + .fetch_all(postgres_pool) + .await + .map_err(|e| e.into()) } diff --git a/backend/src/database/calendar.rs b/backend/src/database/calendar.rs new file mode 100644 index 0000000..b066438 --- /dev/null +++ b/backend/src/database/calendar.rs @@ -0,0 +1,54 @@ +use crate::types::CalendarDate; +use sqlx::{query_as, PgPool}; +use std::error::Error; + +pub async fn add_calendar_dates( + postgres_pool: &PgPool, + calendar_dates: &Vec, +) -> Result, Box> { + let mut dates = Vec::with_capacity(calendar_dates.len()); + let mut opens = Vec::with_capacity(calendar_dates.len()); + let mut closes = Vec::with_capacity(calendar_dates.len()); + + for calendar_date in calendar_dates { + dates.push(calendar_date.date); + opens.push(calendar_date.open); + closes.push(calendar_date.close); + } + + query_as!( + CalendarDate, + r#"INSERT INTO calendar (date, open, close) + SELECT * FROM UNNEST($1::date[], $2::time[], $3::time[]) + RETURNING date, open, close"#, + &dates, + &opens, + &closes + ) + .fetch_all(postgres_pool) + .await + .map_err(|e| e.into()) +} + +pub async fn delete_all_calendar_dates( + postgres_pool: &PgPool, +) -> Result, Box> { + query_as!( + CalendarDate, + "DELETE FROM calendar RETURNING date, open, close" + ) + .fetch_all(postgres_pool) + .await + .map_err(|e| e.into()) +} + +pub async fn reset_calendar_dates( + postgres_pool: &PgPool, + calendar_dates: &Vec, +) -> Result, Box> { + let transaction = postgres_pool.begin().await?; + delete_all_calendar_dates(postgres_pool).await?; + let calendar_dates = add_calendar_dates(postgres_pool, calendar_dates).await; + transaction.commit().await?; + calendar_dates +} diff --git a/backend/src/database/mod.rs b/backend/src/database/mod.rs index 5ac2df4..d760015 100644 --- a/backend/src/database/mod.rs +++ b/backend/src/database/mod.rs @@ -1,2 +1,3 @@ pub mod assets; pub mod bars; +pub mod calendar; diff --git a/backend/src/main.rs b/backend/src/main.rs index 0a751d4..0bef77b 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -5,9 +5,7 @@ mod routes; mod types; use config::AppConfig; -use data::live::run_data_live; use dotenv::dotenv; -use routes::run_api; use std::error::Error; use tokio::{spawn, sync::broadcast}; use types::{AssetBroadcastMessage, Class}; @@ -17,31 +15,32 @@ async fn main() -> Result<(), Box> { dotenv().ok(); log4rs::init_file("log4rs.yaml", Default::default()).unwrap(); let app_config = AppConfig::arc_from_env().await.unwrap(); - let mut threads = Vec::new(); + threads.push(spawn(data::calendar::run(app_config.clone()))); + let (asset_broadcast_sender, _) = broadcast::channel::(100); - // Stock Live Data - threads.push(spawn(run_data_live( + threads.push(spawn(data::live::run( Class::UsEquity, app_config.clone(), asset_broadcast_sender.subscribe(), ))); - // Crypto Live Data - threads.push(spawn(run_data_live( + threads.push(spawn(data::live::run( Class::Crypto, app_config.clone(), asset_broadcast_sender.subscribe(), ))); - // REST API - threads.push(spawn(run_api(app_config.clone(), asset_broadcast_sender))); + threads.push(spawn(routes::run( + app_config.clone(), + asset_broadcast_sender, + ))); for thread in threads { thread.await??; } - Ok(()) + unreachable!() } diff --git a/backend/src/routes/assets.rs b/backend/src/routes/assets.rs index 50d3e18..af258c6 100644 --- a/backend/src/routes/assets.rs +++ b/backend/src/routes/assets.rs @@ -1,17 +1,15 @@ -use std::sync::Arc; - use crate::config::AppConfig; use crate::database; use crate::database::assets::update_asset_trading; use crate::types::api; use crate::types::{Asset, AssetBroadcastMessage, Status}; use axum::{extract::Path, http::StatusCode, Extension, Json}; -use http::Method; use log::info; use serde::Deserialize; +use std::sync::Arc; use tokio::sync::broadcast::Sender; -const ALPACA_API_URL: &str = "https://api.alpaca.markets/v2"; +const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets"; pub async fn get_assets( Extension(app_config): Extension>, @@ -56,14 +54,10 @@ pub async fn add_asset( return Err(StatusCode::CONFLICT); } + app_config.alpaca_rate_limit.until_ready().await; let asset = app_config - .reqwest_client - .request( - Method::GET, - &format!("{}/assets/{}", ALPACA_API_URL, request.symbol), - ) - .header("APCA-API-KEY-ID", &app_config.alpaca_api_key) - .header("APCA-API-SECRET-KEY", &app_config.alpaca_api_secret) + .alpaca_client + .get(&format!("{}/{}", ALPACA_ASSET_API_URL, request.symbol)) .send() .await .map_err(|e| match e.status() { diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs index cb90879..b68b63a 100644 --- a/backend/src/routes/mod.rs +++ b/backend/src/routes/mod.rs @@ -9,7 +9,7 @@ use tokio::sync::broadcast::Sender; pub mod assets; -pub async fn run_api( +pub async fn run( app_config: Arc, asset_broadcast_sender: Sender, ) -> Result<(), Box> { @@ -26,5 +26,5 @@ pub async fn run_api( info!("Listening on {}...", addr); Server::bind(&addr).serve(app.into_make_service()).await?; - Ok(()) + unreachable!() } diff --git a/backend/src/types/api/incoming/calendar.rs b/backend/src/types/api/incoming/calendar.rs new file mode 100644 index 0000000..1f4e769 --- /dev/null +++ b/backend/src/types/api/incoming/calendar.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Deserializer}; +use time::{macros::format_description, Date, Time}; + +#[derive(Debug, PartialEq, Deserialize)] +pub struct CalendarDate { + #[serde(deserialize_with = "deserialize_date")] + pub date: Date, + #[serde(deserialize_with = "deserialize_time")] + pub open: Time, + #[serde(deserialize_with = "deserialize_time")] + pub close: Time, +} + +fn deserialize_date<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let date_str = String::deserialize(deserializer)?; + Date::parse(&date_str, format_description!("[year]-[month]-[day]")) + .map_err(serde::de::Error::custom) +} + +fn deserialize_time<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let time_str = String::deserialize(deserializer)?; + Time::parse(&time_str, format_description!("[hour]:[minute]")).map_err(serde::de::Error::custom) +} diff --git a/backend/src/types/api/incoming/mod.rs b/backend/src/types/api/incoming/mod.rs index 00c8a86..068f05a 100644 --- a/backend/src/types/api/incoming/mod.rs +++ b/backend/src/types/api/incoming/mod.rs @@ -1,3 +1,5 @@ pub mod asset; +pub mod calendar; pub use asset::*; +pub use calendar::*; diff --git a/backend/src/types/bar.rs b/backend/src/types/bar.rs index c89f529..452d0c2 100644 --- a/backend/src/types/bar.rs +++ b/backend/src/types/bar.rs @@ -1,4 +1,4 @@ -use super::websocket::incoming::BarMessage; +use super::websocket; use serde::{Deserialize, Serialize}; use sqlx::FromRow; use time::OffsetDateTime; @@ -16,8 +16,8 @@ pub struct Bar { pub volume_weighted: f64, } -impl From for Bar { - fn from(bar_message: BarMessage) -> Self { +impl From for Bar { + fn from(bar_message: websocket::incoming::BarMessage) -> Self { Self { timestamp: bar_message.timestamp, asset_symbol: bar_message.symbol, diff --git a/backend/src/types/calendar.rs b/backend/src/types/calendar.rs new file mode 100644 index 0000000..0e6dc73 --- /dev/null +++ b/backend/src/types/calendar.rs @@ -0,0 +1,18 @@ +use super::api; +use time::{Date, Time}; + +pub struct CalendarDate { + pub date: Date, + pub open: Time, + pub close: Time, +} + +impl From<&api::incoming::CalendarDate> for CalendarDate { + fn from(calendar: &api::incoming::CalendarDate) -> Self { + Self { + date: calendar.date, + open: calendar.open, + close: calendar.close, + } + } +} diff --git a/backend/src/types/mod.rs b/backend/src/types/mod.rs index d55847a..b9896d6 100644 --- a/backend/src/types/mod.rs +++ b/backend/src/types/mod.rs @@ -1,6 +1,7 @@ pub mod api; pub mod asset; pub mod bar; +pub mod calendar; pub mod class; pub mod exchange; pub mod status; @@ -8,6 +9,7 @@ pub mod websocket; pub use asset::*; pub use bar::*; +pub use calendar::*; pub use class::*; pub use exchange::*; pub use status::*; diff --git a/support/timescaledb/999_init.sh b/support/timescaledb/999_init.sh index ec8c5de..985eac8 100644 --- a/support/timescaledb/999_init.sh +++ b/support/timescaledb/999_init.sh @@ -36,4 +36,26 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL ); SELECT create_hypertable('bars', 'timestamp', 'asset_symbol', 2); + + CREATE TABLE calendar ( + date DATE NOT NULL PRIMARY KEY, + open TIME NOT NULL, + close TIME NOT NULL + ); + + CREATE VIEW bars_missing AS + WITH time_series AS ( + SELECT + asset_symbol, + generate_series(MIN(timestamp), NOW(), interval '1 minute')::TIMESTAMPTZ AS expected_time + FROM bars + GROUP BY asset_symbol + ) + SELECT + ts.asset_symbol, + ts.expected_time AS missing_time + FROM time_series ts + LEFT JOIN bars b + ON ts.asset_symbol = b.asset_symbol AND ts.expected_time = b.timestamp + WHERE b.timestamp IS NULL; EOSQL