From a84daea61cf9f29660ab8d628a028af771c03024 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Thu, 22 Feb 2024 12:35:01 +0000 Subject: [PATCH] Add local market calendar storage Signed-off-by: Nikolaos Karaolidis --- src/database/calendar.rs | 38 +++++++++++++++ src/database/mod.rs | 4 +- src/threads/clock.rs | 47 +++++++++++++++---- src/types/alpaca/api/incoming/calendar.rs | 18 ++++++- src/types/alpaca/api/outgoing/calendar.rs | 11 +++++ src/types/calendar.rs | 13 +++++ src/types/mod.rs | 2 + src/utils/time.rs | 1 + .../docker-entrypoint-initdb.d/0000_init.sql | 8 ++++ 9 files changed, 129 insertions(+), 13 deletions(-) create mode 100644 src/database/calendar.rs create mode 100644 src/types/calendar.rs diff --git a/src/database/calendar.rs b/src/database/calendar.rs new file mode 100644 index 0000000..20024ca --- /dev/null +++ b/src/database/calendar.rs @@ -0,0 +1,38 @@ +use crate::{optimize, types::Calendar}; +use clickhouse::error::Error; +use tokio::try_join; + +optimize!("calendar"); + +pub async fn upsert_batch_and_delete<'a, T>( + client: &clickhouse::Client, + records: T, +) -> Result<(), Error> +where + T: IntoIterator + Send + Sync + Clone, + T::IntoIter: Send, +{ + let upsert_future = async { + let mut insert = client.insert("calendar")?; + for record in records.clone() { + insert.write(record).await?; + } + insert.end().await + }; + + let delete_future = async { + let dates = records + .clone() + .into_iter() + .map(|r| r.date) + .collect::>(); + + client + .query("DELETE FROM calendar WHERE date NOT IN ?") + .bind(dates) + .execute() + .await + }; + + try_join!(upsert_future, delete_future).map(|_| ()) +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 5255369..9523349 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -2,6 +2,7 @@ pub mod assets; pub mod backfills_bars; pub mod backfills_news; pub mod bars; +pub mod calendar; pub mod news; pub mod orders; @@ -144,7 +145,8 @@ pub async fn optimize_all(clickhouse_client: &Client) -> Result<(), Error> { news::optimize(clickhouse_client), backfills_bars::optimize(clickhouse_client), backfills_news::optimize(clickhouse_client), - orders::optimize(clickhouse_client) + orders::optimize(clickhouse_client), + calendar::optimize(clickhouse_client) ) .map(|_| ()) } diff --git a/src/threads/clock.rs b/src/threads/clock.rs index c849028..e1405d2 100644 --- a/src/threads/clock.rs +++ b/src/threads/clock.rs @@ -1,12 +1,13 @@ use crate::{ config::Config, - types::alpaca, + database, + types::{alpaca, Calendar}, utils::{backoff, duration_until}, }; use log::info; use std::sync::Arc; use time::OffsetDateTime; -use tokio::{sync::mpsc, time::sleep}; +use tokio::{join, sync::mpsc, time::sleep}; pub enum Status { Open, @@ -36,13 +37,31 @@ impl From for Message { pub async fn run(config: Arc, sender: mpsc::Sender) { loop { - let clock = alpaca::api::incoming::clock::get( - &config.alpaca_client, - &config.alpaca_rate_limiter, - Some(backoff::infinite()), - ) - .await - .unwrap(); + let clock_future = async { + alpaca::api::incoming::clock::get( + &config.alpaca_client, + &config.alpaca_rate_limiter, + Some(backoff::infinite()), + ) + .await + .unwrap() + }; + + let calendar_future = async { + alpaca::api::incoming::calendar::get( + &config.alpaca_client, + &config.alpaca_rate_limiter, + &alpaca::api::outgoing::calendar::Calendar::default(), + Some(backoff::infinite()), + ) + .await + .unwrap() + .into_iter() + .map(Calendar::from) + .collect::>() + }; + + let (clock, calendar) = join!(clock_future, calendar_future); let sleep_until = duration_until(if clock.is_open { info!("Market is open, will close at {}.", clock.next_close); @@ -52,7 +71,15 @@ pub async fn run(config: Arc, sender: mpsc::Sender) { clock.next_open }); - sleep(sleep_until).await; + let sleep_future = sleep(sleep_until); + + let calendar_future = async { + database::calendar::upsert_batch_and_delete(&config.clickhouse_client, &calendar) + .await + .unwrap(); + }; + + join!(sleep_future, calendar_future); sender.send(clock.into()).await.unwrap(); } } diff --git a/src/types/alpaca/api/incoming/calendar.rs b/src/types/alpaca/api/incoming/calendar.rs index da7c119..8c8118a 100644 --- a/src/types/alpaca/api/incoming/calendar.rs +++ b/src/types/alpaca/api/incoming/calendar.rs @@ -1,11 +1,15 @@ -use crate::{config::ALPACA_API_URL, types::alpaca::api::outgoing, utils::de}; +use crate::{ + config::ALPACA_API_URL, + types::{self, alpaca::api::outgoing}, + utils::de, +}; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; use log::warn; use reqwest::{Client, Error}; use serde::Deserialize; use std::time::Duration; -use time::{Date, Time}; +use time::{Date, OffsetDateTime, Time}; #[derive(Deserialize)] pub struct Calendar { @@ -17,6 +21,16 @@ pub struct Calendar { pub settlement_date: Date, } +impl From for types::Calendar { + fn from(calendar: Calendar) -> Self { + Self { + date: calendar.date, + open: OffsetDateTime::new_utc(calendar.date, calendar.open), + close: OffsetDateTime::new_utc(calendar.date, calendar.close), + } + } +} + pub async fn get( alpaca_client: &Client, alpaca_rate_limiter: &DefaultDirectRateLimiter, diff --git a/src/types/alpaca/api/outgoing/calendar.rs b/src/types/alpaca/api/outgoing/calendar.rs index 9ffef0b..a129c30 100644 --- a/src/types/alpaca/api/outgoing/calendar.rs +++ b/src/types/alpaca/api/outgoing/calendar.rs @@ -1,3 +1,4 @@ +use crate::utils::time::MAX_TIMESTAMP; use serde::Serialize; use time::OffsetDateTime; @@ -18,3 +19,13 @@ pub struct Calendar { #[serde(rename = "date")] pub date_type: DateType, } + +impl Default for Calendar { + fn default() -> Self { + Self { + start: OffsetDateTime::UNIX_EPOCH, + end: OffsetDateTime::from_unix_timestamp(MAX_TIMESTAMP).unwrap(), + date_type: DateType::Trading, + } + } +} diff --git a/src/types/calendar.rs b/src/types/calendar.rs new file mode 100644 index 0000000..235cf84 --- /dev/null +++ b/src/types/calendar.rs @@ -0,0 +1,13 @@ +use clickhouse::Row; +use serde::{Deserialize, Serialize}; +use time::{Date, OffsetDateTime}; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Row)] +pub struct Calendar { + #[serde(with = "clickhouse::serde::time::date")] + pub date: Date, + #[serde(with = "clickhouse::serde::time::datetime")] + pub open: OffsetDateTime, + #[serde(with = "clickhouse::serde::time::datetime")] + pub close: OffsetDateTime, +} diff --git a/src/types/mod.rs b/src/types/mod.rs index 9791365..b06f1ba 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -2,11 +2,13 @@ pub mod alpaca; pub mod asset; pub mod backfill; pub mod bar; +pub mod calendar; pub mod news; pub mod order; pub use asset::{Asset, Class, Exchange}; pub use backfill::Backfill; pub use bar::Bar; +pub use calendar::Calendar; pub use news::News; pub use order::Order; diff --git a/src/utils/time.rs b/src/utils/time.rs index 9be0e7e..faeb22f 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -4,6 +4,7 @@ use time::OffsetDateTime; pub const ONE_SECOND: Duration = Duration::from_secs(1); pub const ONE_MINUTE: Duration = Duration::from_secs(60); pub const FIFTEEN_MINUTES: Duration = Duration::from_secs(60 * 15); +pub const MAX_TIMESTAMP: i64 = 253_402_300_799; pub fn last_minute() -> OffsetDateTime { let now_timestamp = OffsetDateTime::now_utc().unix_timestamp(); diff --git a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql index c7b8f6d..8739862 100644 --- a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql +++ b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql @@ -120,3 +120,11 @@ CREATE TABLE IF NOT EXISTS qrust.orders ( ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(time_submitted) PRIMARY KEY id; + +CREATE TABLE IF NOT EXISTS qrust.calendar ( + date Date, + open DateTime, + close DateTime +) +ENGINE = ReplacingMergeTree() +PRIMARY KEY date;