From 3ee72a0e1bf8d376eff040b40e97bd97ee99979d Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Tue, 16 Jan 2024 15:46:22 +0000 Subject: [PATCH] Fix bars_validity for market close Signed-off-by: Nikolaos Karaolidis --- src/config.rs | 10 +- src/data/market.rs | 105 ++++++++++-------- src/database/assets.rs | 2 +- src/database/bars.rs | 9 ++ src/routes/assets.rs | 3 +- src/types/alpaca/api/incoming/clock.rs | 13 +++ src/types/alpaca/api/incoming/mod.rs | 1 + src/types/alpaca/api/mod.rs | 1 + src/types/alpaca/api/outgoing/bar.rs | 35 ++++++ src/types/alpaca/api/outgoing/mod.rs | 1 + src/utils/mod.rs | 2 +- src/utils/time.rs | 5 +- .../docker-entrypoint-initdb.d/0000_init.sql | 3 +- 13 files changed, 125 insertions(+), 65 deletions(-) create mode 100644 src/types/alpaca/api/incoming/clock.rs create mode 100644 src/types/alpaca/api/outgoing/bar.rs create mode 100644 src/types/alpaca/api/outgoing/mod.rs diff --git a/src/config.rs b/src/config.rs index d3ad4aa..356937c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,23 +2,19 @@ use crate::types::alpaca::Source; use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; use reqwest::{header::HeaderMap, Client}; use std::{env, num::NonZeroU32, sync::Arc}; -use time::{format_description::FormatItem, macros::format_description}; -use tokio::time::Duration; pub const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets"; +pub const ALPACA_CLOCK_API_URL: &str = "https://api.alpaca.markets/v2/clock"; pub const ALPACA_STOCK_DATA_URL: &str = "https://data.alpaca.markets/v2/stocks/bars"; pub const ALPACA_CRYPTO_DATA_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars"; pub const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2"; pub const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; -pub const ALPACA_TIMESTAMP_FORMAT: &[FormatItem] = - format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z"); pub struct Config { pub alpaca_api_key: String, pub alpaca_api_secret: String, pub alpaca_client: Client, pub alpaca_rate_limit: DefaultDirectRateLimiter, - pub alpaca_historical_offset: Duration, pub alpaca_source: Source, pub clickhouse_client: clickhouse::Client, } @@ -55,10 +51,6 @@ impl Config { Source::Iex => NonZeroU32::new(180).unwrap(), Source::Sip => NonZeroU32::new(900).unwrap(), })), - alpaca_historical_offset: Duration::from_secs(match alpaca_source { - Source::Iex => 900, - Source::Sip => 0, - }), alpaca_source, clickhouse_client: clickhouse::Client::default() .with_url(clickhouse_url) diff --git a/src/data/market.rs b/src/data/market.rs index 20f2ad1..9f3c476 100644 --- a/src/data/market.rs +++ b/src/data/market.rs @@ -1,16 +1,19 @@ use crate::{ config::{ - Config, ALPACA_CRYPTO_DATA_URL, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_DATA_URL, - ALPACA_STOCK_WEBSOCKET_URL, ALPACA_TIMESTAMP_FORMAT, + Config, ALPACA_CLOCK_API_URL, ALPACA_CRYPTO_DATA_URL, ALPACA_CRYPTO_WEBSOCKET_URL, + ALPACA_STOCK_DATA_URL, ALPACA_STOCK_WEBSOCKET_URL, }, data::authenticate_websocket, database, types::{ - alpaca::{api::incoming, websocket}, + alpaca::{ + api::{incoming, outgoing}, + websocket, Source, + }, asset::{self, Asset}, Bar, BarValidity, BroadcastMessage, Class, }, - utils::{duration_until, last_minute, next_minute, ONE_MINUTE}, + utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}, }; use core::panic; use futures_util::{ @@ -188,7 +191,11 @@ async fn websocket_handle_message( backfilled.write().await.insert(asset.symbol.clone(), false); let bar_validity = BarValidity::none(asset.symbol.clone()); - database::bars::upsert_validity(&app_config.clickhouse_client, &bar_validity).await; + database::bars::insert_validity_if_not_exists( + &app_config.clickhouse_client, + &bar_validity, + ) + .await; spawn(backfill( app_config.clone(), @@ -240,14 +247,39 @@ pub async fn backfill( .unwrap(); let fetch_from = bar_validity.time_last + ONE_MINUTE; - let fetch_until = last_minute(); + let fetch_until = if app_config.alpaca_source == Source::Iex { + app_config.alpaca_rate_limit.until_ready().await; + let clock = app_config + .alpaca_client + .get(ALPACA_CLOCK_API_URL) + .send() + .await + .unwrap() + .json::() + .await + .unwrap(); + + if clock.is_open { + last_minute() + } else { + clock.next_open + } + } else { + last_minute() + }; + if fetch_from > fetch_until { return; } - info!("Queing historical data backfill for {}...", asset.symbol); - let task_run_offsetdatetime = next_minute() + app_config.alpaca_historical_offset; - sleep(duration_until(task_run_offsetdatetime)).await; + if app_config.alpaca_source == Source::Iex { + let task_run_delay = duration_until(fetch_until + FIFTEEN_MINUTES + ONE_MINUTE); + info!( + "Queing historical data backfill for {} in {:?}.", + asset.symbol, task_run_delay + ); + sleep(task_run_delay).await; + } info!("Running historical data backfill for {}...", asset.symbol); @@ -255,59 +287,38 @@ pub async fn backfill( let mut next_page_token = None; loop { - let request = app_config + app_config.alpaca_rate_limit.until_ready().await; + let message = app_config .alpaca_client .get(match asset.class { Class::UsEquity => ALPACA_STOCK_DATA_URL, Class::Crypto => ALPACA_CRYPTO_DATA_URL, }) - .query(&[ - ("symbols", &asset.symbol), - ("timeframe", &String::from("1Min")), - ( - "start", - &fetch_from - .format(ALPACA_TIMESTAMP_FORMAT) - .unwrap() - .to_string(), - ), - ( - "end", - &fetch_until - .format(ALPACA_TIMESTAMP_FORMAT) - .unwrap() - .to_string(), - ), - ("limit", &String::from("10000")), - ("page_token", &next_page_token.clone().unwrap_or_default()), - ]); - - app_config.alpaca_rate_limit.until_ready().await; - - let response = request.send().await.unwrap(); - let response = if response.status() == reqwest::StatusCode::OK { - response.json::().await.unwrap() - } else { - error!( - "Failed to backfill historical data for {} from {} to {}: {}", - asset.symbol, + .query(&outgoing::bar::Bar::new( + vec![asset.symbol.clone()], + String::from("1Min"), fetch_from, fetch_until, - response.text().await.unwrap() - ); - break; - }; + 10000, + next_page_token, + )) + .send() + .await + .unwrap() + .json::() + .await + .unwrap(); - response.bars.into_iter().for_each(|(symbol, bar_vec)| { + message.bars.into_iter().for_each(|(symbol, bar_vec)| { bar_vec.unwrap_or_default().into_iter().for_each(|bar| { bars.push(Bar::from((bar, symbol.clone()))); }); }); - if response.next_page_token.is_none() { + if message.next_page_token.is_none() { break; } - next_page_token = response.next_page_token; + next_page_token = message.next_page_token; } database::bars::upsert_batch(&app_config.clickhouse_client, &bars).await; diff --git a/src/database/assets.rs b/src/database/assets.rs index 73609e9..9d39b41 100644 --- a/src/database/assets.rs +++ b/src/database/assets.rs @@ -27,7 +27,7 @@ pub async fn select_where_symbol(clickhouse_client: &Client, symbol: &str) -> Op .unwrap() } -pub async fn insert(clickhouse_client: &Client, asset: &Asset) { +pub async fn upsert(clickhouse_client: &Client, asset: &Asset) { let mut insert = clickhouse_client.insert("assets").unwrap(); insert.write(asset).await.unwrap(); insert.end().await.unwrap(); diff --git a/src/database/bars.rs b/src/database/bars.rs index f2f8afc..fcf8866 100644 --- a/src/database/bars.rs +++ b/src/database/bars.rs @@ -42,6 +42,15 @@ pub async fn upsert_validity(clickhouse_client: &Client, bar_validity: &BarValid insert.end().await.unwrap(); } +pub async fn insert_validity_if_not_exists(clickhouse_client: &Client, bar_validity: &BarValidity) { + if select_validity_where_symbol(clickhouse_client, &bar_validity.symbol) + .await + .is_none() + { + upsert_validity(clickhouse_client, bar_validity).await; + } +} + pub async fn delete_validity_where_symbol(clickhouse_client: &Client, symbol: &str) { clickhouse_client .query("DELETE FROM bars_validity WHERE symbol = ?") diff --git a/src/routes/assets.rs b/src/routes/assets.rs index 3c84da3..f12f245 100644 --- a/src/routes/assets.rs +++ b/src/routes/assets.rs @@ -61,13 +61,12 @@ pub async fn add( .unwrap(); let asset = asset.json::().await.unwrap(); - if asset.status != Status::Active || !asset.tradable || !asset.fractionable { return Err(StatusCode::FORBIDDEN); } let asset = Asset::from(asset); - database::assets::insert(&app_config.clickhouse_client, &asset).await; + database::assets::upsert(&app_config.clickhouse_client, &asset).await; broadcast_sender .send(BroadcastMessage::Asset(asset::BroadcastMessage::Added( diff --git a/src/types/alpaca/api/incoming/clock.rs b/src/types/alpaca/api/incoming/clock.rs new file mode 100644 index 0000000..51bafce --- /dev/null +++ b/src/types/alpaca/api/incoming/clock.rs @@ -0,0 +1,13 @@ +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Clock { + #[serde(with = "time::serde::rfc3339")] + pub timestamp: OffsetDateTime, + pub is_open: bool, + #[serde(with = "time::serde::rfc3339")] + pub next_open: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub next_close: OffsetDateTime, +} diff --git a/src/types/alpaca/api/incoming/mod.rs b/src/types/alpaca/api/incoming/mod.rs index c01abb8..adac42e 100644 --- a/src/types/alpaca/api/incoming/mod.rs +++ b/src/types/alpaca/api/incoming/mod.rs @@ -1,2 +1,3 @@ pub mod asset; pub mod bar; +pub mod clock; diff --git a/src/types/alpaca/api/mod.rs b/src/types/alpaca/api/mod.rs index 1b61618..0687bc2 100644 --- a/src/types/alpaca/api/mod.rs +++ b/src/types/alpaca/api/mod.rs @@ -1,4 +1,5 @@ pub mod incoming; +pub mod outgoing; macro_rules! impl_from_enum { ($source:ty, $target:ty, $( $variant:ident ),* ) => { diff --git a/src/types/alpaca/api/outgoing/bar.rs b/src/types/alpaca/api/outgoing/bar.rs new file mode 100644 index 0000000..3a94ccd --- /dev/null +++ b/src/types/alpaca/api/outgoing/bar.rs @@ -0,0 +1,35 @@ +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Bar { + pub symbols: Vec, + pub timeframe: String, + #[serde(with = "time::serde::rfc3339")] + pub start: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub end: OffsetDateTime, + pub limit: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub page_token: Option, +} + +impl Bar { + pub fn new( + symbols: Vec, + timeframe: String, + start: OffsetDateTime, + end: OffsetDateTime, + limit: i64, + page_token: Option, + ) -> Self { + Self { + symbols, + timeframe, + start, + end, + limit, + page_token, + } + } +} diff --git a/src/types/alpaca/api/outgoing/mod.rs b/src/types/alpaca/api/outgoing/mod.rs new file mode 100644 index 0000000..46f285c --- /dev/null +++ b/src/types/alpaca/api/outgoing/mod.rs @@ -0,0 +1 @@ +pub mod bar; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index bfa47ff..bd9b56b 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,3 @@ pub mod time; -pub use time::{duration_until, last_minute, next_minute, ONE_MINUTE}; +pub use time::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}; diff --git a/src/utils/time.rs b/src/utils/time.rs index 9813731..fa0c2ae 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -2,16 +2,13 @@ use std::time::Duration; use time::OffsetDateTime; pub const ONE_MINUTE: Duration = Duration::from_secs(60); +pub const FIFTEEN_MINUTES: Duration = Duration::from_secs(60 * 15); pub fn last_minute() -> OffsetDateTime { let now_timestamp = OffsetDateTime::now_utc().unix_timestamp(); OffsetDateTime::from_unix_timestamp(now_timestamp - now_timestamp % 60).unwrap() } -pub fn next_minute() -> OffsetDateTime { - last_minute() + ONE_MINUTE -} - pub fn duration_until(time: OffsetDateTime) -> Duration { let duration = time - OffsetDateTime::now_utc(); diff --git a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql index cf76efc..818617f 100644 --- a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql +++ b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql @@ -28,7 +28,8 @@ CREATE TABLE IF NOT EXISTS qrust.bars ( vwap Float64 ) ENGINE = ReplacingMergeTree() -PRIMARY KEY (symbol, time); +PRIMARY KEY (symbol, time) +PARTITION BY toYYYYMM(time); CREATE TABLE IF NOT EXISTS qrust.bars_validity ( symbol String,