Fix bars_validity for market close
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
@@ -2,23 +2,19 @@ use crate::types::alpaca::Source;
|
|||||||
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
|
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
|
||||||
use reqwest::{header::HeaderMap, Client};
|
use reqwest::{header::HeaderMap, Client};
|
||||||
use std::{env, num::NonZeroU32, sync::Arc};
|
use std::{env, num::NonZeroU32, sync::Arc};
|
||||||
use time::{format_description::FormatItem, macros::format_description};
|
|
||||||
use tokio::time::Duration;
|
|
||||||
|
|
||||||
pub const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets";
|
pub const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets";
|
||||||
|
pub const ALPACA_CLOCK_API_URL: &str = "https://api.alpaca.markets/v2/clock";
|
||||||
pub const ALPACA_STOCK_DATA_URL: &str = "https://data.alpaca.markets/v2/stocks/bars";
|
pub const ALPACA_STOCK_DATA_URL: &str = "https://data.alpaca.markets/v2/stocks/bars";
|
||||||
pub const ALPACA_CRYPTO_DATA_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars";
|
pub const ALPACA_CRYPTO_DATA_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars";
|
||||||
pub const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2";
|
pub const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2";
|
||||||
pub const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us";
|
pub const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us";
|
||||||
pub const ALPACA_TIMESTAMP_FORMAT: &[FormatItem] =
|
|
||||||
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z");
|
|
||||||
|
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub alpaca_api_key: String,
|
pub alpaca_api_key: String,
|
||||||
pub alpaca_api_secret: String,
|
pub alpaca_api_secret: String,
|
||||||
pub alpaca_client: Client,
|
pub alpaca_client: Client,
|
||||||
pub alpaca_rate_limit: DefaultDirectRateLimiter,
|
pub alpaca_rate_limit: DefaultDirectRateLimiter,
|
||||||
pub alpaca_historical_offset: Duration,
|
|
||||||
pub alpaca_source: Source,
|
pub alpaca_source: Source,
|
||||||
pub clickhouse_client: clickhouse::Client,
|
pub clickhouse_client: clickhouse::Client,
|
||||||
}
|
}
|
||||||
@@ -55,10 +51,6 @@ impl Config {
|
|||||||
Source::Iex => NonZeroU32::new(180).unwrap(),
|
Source::Iex => NonZeroU32::new(180).unwrap(),
|
||||||
Source::Sip => NonZeroU32::new(900).unwrap(),
|
Source::Sip => NonZeroU32::new(900).unwrap(),
|
||||||
})),
|
})),
|
||||||
alpaca_historical_offset: Duration::from_secs(match alpaca_source {
|
|
||||||
Source::Iex => 900,
|
|
||||||
Source::Sip => 0,
|
|
||||||
}),
|
|
||||||
alpaca_source,
|
alpaca_source,
|
||||||
clickhouse_client: clickhouse::Client::default()
|
clickhouse_client: clickhouse::Client::default()
|
||||||
.with_url(clickhouse_url)
|
.with_url(clickhouse_url)
|
||||||
|
@@ -1,16 +1,19 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
config::{
|
config::{
|
||||||
Config, ALPACA_CRYPTO_DATA_URL, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_DATA_URL,
|
Config, ALPACA_CLOCK_API_URL, ALPACA_CRYPTO_DATA_URL, ALPACA_CRYPTO_WEBSOCKET_URL,
|
||||||
ALPACA_STOCK_WEBSOCKET_URL, ALPACA_TIMESTAMP_FORMAT,
|
ALPACA_STOCK_DATA_URL, ALPACA_STOCK_WEBSOCKET_URL,
|
||||||
},
|
},
|
||||||
data::authenticate_websocket,
|
data::authenticate_websocket,
|
||||||
database,
|
database,
|
||||||
types::{
|
types::{
|
||||||
alpaca::{api::incoming, websocket},
|
alpaca::{
|
||||||
|
api::{incoming, outgoing},
|
||||||
|
websocket, Source,
|
||||||
|
},
|
||||||
asset::{self, Asset},
|
asset::{self, Asset},
|
||||||
Bar, BarValidity, BroadcastMessage, Class,
|
Bar, BarValidity, BroadcastMessage, Class,
|
||||||
},
|
},
|
||||||
utils::{duration_until, last_minute, next_minute, ONE_MINUTE},
|
utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE},
|
||||||
};
|
};
|
||||||
use core::panic;
|
use core::panic;
|
||||||
use futures_util::{
|
use futures_util::{
|
||||||
@@ -188,7 +191,11 @@ async fn websocket_handle_message(
|
|||||||
backfilled.write().await.insert(asset.symbol.clone(), false);
|
backfilled.write().await.insert(asset.symbol.clone(), false);
|
||||||
|
|
||||||
let bar_validity = BarValidity::none(asset.symbol.clone());
|
let bar_validity = BarValidity::none(asset.symbol.clone());
|
||||||
database::bars::upsert_validity(&app_config.clickhouse_client, &bar_validity).await;
|
database::bars::insert_validity_if_not_exists(
|
||||||
|
&app_config.clickhouse_client,
|
||||||
|
&bar_validity,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
spawn(backfill(
|
spawn(backfill(
|
||||||
app_config.clone(),
|
app_config.clone(),
|
||||||
@@ -240,14 +247,39 @@ pub async fn backfill(
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let fetch_from = bar_validity.time_last + ONE_MINUTE;
|
let fetch_from = bar_validity.time_last + ONE_MINUTE;
|
||||||
let fetch_until = last_minute();
|
let fetch_until = if app_config.alpaca_source == Source::Iex {
|
||||||
|
app_config.alpaca_rate_limit.until_ready().await;
|
||||||
|
let clock = app_config
|
||||||
|
.alpaca_client
|
||||||
|
.get(ALPACA_CLOCK_API_URL)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.json::<incoming::clock::Clock>()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
if clock.is_open {
|
||||||
|
last_minute()
|
||||||
|
} else {
|
||||||
|
clock.next_open
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
last_minute()
|
||||||
|
};
|
||||||
|
|
||||||
if fetch_from > fetch_until {
|
if fetch_from > fetch_until {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Queing historical data backfill for {}...", asset.symbol);
|
if app_config.alpaca_source == Source::Iex {
|
||||||
let task_run_offsetdatetime = next_minute() + app_config.alpaca_historical_offset;
|
let task_run_delay = duration_until(fetch_until + FIFTEEN_MINUTES + ONE_MINUTE);
|
||||||
sleep(duration_until(task_run_offsetdatetime)).await;
|
info!(
|
||||||
|
"Queing historical data backfill for {} in {:?}.",
|
||||||
|
asset.symbol, task_run_delay
|
||||||
|
);
|
||||||
|
sleep(task_run_delay).await;
|
||||||
|
}
|
||||||
|
|
||||||
info!("Running historical data backfill for {}...", asset.symbol);
|
info!("Running historical data backfill for {}...", asset.symbol);
|
||||||
|
|
||||||
@@ -255,59 +287,38 @@ pub async fn backfill(
|
|||||||
let mut next_page_token = None;
|
let mut next_page_token = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let request = app_config
|
app_config.alpaca_rate_limit.until_ready().await;
|
||||||
|
let message = app_config
|
||||||
.alpaca_client
|
.alpaca_client
|
||||||
.get(match asset.class {
|
.get(match asset.class {
|
||||||
Class::UsEquity => ALPACA_STOCK_DATA_URL,
|
Class::UsEquity => ALPACA_STOCK_DATA_URL,
|
||||||
Class::Crypto => ALPACA_CRYPTO_DATA_URL,
|
Class::Crypto => ALPACA_CRYPTO_DATA_URL,
|
||||||
})
|
})
|
||||||
.query(&[
|
.query(&outgoing::bar::Bar::new(
|
||||||
("symbols", &asset.symbol),
|
vec![asset.symbol.clone()],
|
||||||
("timeframe", &String::from("1Min")),
|
String::from("1Min"),
|
||||||
(
|
|
||||||
"start",
|
|
||||||
&fetch_from
|
|
||||||
.format(ALPACA_TIMESTAMP_FORMAT)
|
|
||||||
.unwrap()
|
|
||||||
.to_string(),
|
|
||||||
),
|
|
||||||
(
|
|
||||||
"end",
|
|
||||||
&fetch_until
|
|
||||||
.format(ALPACA_TIMESTAMP_FORMAT)
|
|
||||||
.unwrap()
|
|
||||||
.to_string(),
|
|
||||||
),
|
|
||||||
("limit", &String::from("10000")),
|
|
||||||
("page_token", &next_page_token.clone().unwrap_or_default()),
|
|
||||||
]);
|
|
||||||
|
|
||||||
app_config.alpaca_rate_limit.until_ready().await;
|
|
||||||
|
|
||||||
let response = request.send().await.unwrap();
|
|
||||||
let response = if response.status() == reqwest::StatusCode::OK {
|
|
||||||
response.json::<incoming::bar::Message>().await.unwrap()
|
|
||||||
} else {
|
|
||||||
error!(
|
|
||||||
"Failed to backfill historical data for {} from {} to {}: {}",
|
|
||||||
asset.symbol,
|
|
||||||
fetch_from,
|
fetch_from,
|
||||||
fetch_until,
|
fetch_until,
|
||||||
response.text().await.unwrap()
|
10000,
|
||||||
);
|
next_page_token,
|
||||||
break;
|
))
|
||||||
};
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.json::<incoming::bar::Message>()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
response.bars.into_iter().for_each(|(symbol, bar_vec)| {
|
message.bars.into_iter().for_each(|(symbol, bar_vec)| {
|
||||||
bar_vec.unwrap_or_default().into_iter().for_each(|bar| {
|
bar_vec.unwrap_or_default().into_iter().for_each(|bar| {
|
||||||
bars.push(Bar::from((bar, symbol.clone())));
|
bars.push(Bar::from((bar, symbol.clone())));
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
if response.next_page_token.is_none() {
|
if message.next_page_token.is_none() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
next_page_token = response.next_page_token;
|
next_page_token = message.next_page_token;
|
||||||
}
|
}
|
||||||
|
|
||||||
database::bars::upsert_batch(&app_config.clickhouse_client, &bars).await;
|
database::bars::upsert_batch(&app_config.clickhouse_client, &bars).await;
|
||||||
|
@@ -27,7 +27,7 @@ pub async fn select_where_symbol(clickhouse_client: &Client, symbol: &str) -> Op
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn insert(clickhouse_client: &Client, asset: &Asset) {
|
pub async fn upsert(clickhouse_client: &Client, asset: &Asset) {
|
||||||
let mut insert = clickhouse_client.insert("assets").unwrap();
|
let mut insert = clickhouse_client.insert("assets").unwrap();
|
||||||
insert.write(asset).await.unwrap();
|
insert.write(asset).await.unwrap();
|
||||||
insert.end().await.unwrap();
|
insert.end().await.unwrap();
|
||||||
|
@@ -42,6 +42,15 @@ pub async fn upsert_validity(clickhouse_client: &Client, bar_validity: &BarValid
|
|||||||
insert.end().await.unwrap();
|
insert.end().await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn insert_validity_if_not_exists(clickhouse_client: &Client, bar_validity: &BarValidity) {
|
||||||
|
if select_validity_where_symbol(clickhouse_client, &bar_validity.symbol)
|
||||||
|
.await
|
||||||
|
.is_none()
|
||||||
|
{
|
||||||
|
upsert_validity(clickhouse_client, bar_validity).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn delete_validity_where_symbol(clickhouse_client: &Client, symbol: &str) {
|
pub async fn delete_validity_where_symbol(clickhouse_client: &Client, symbol: &str) {
|
||||||
clickhouse_client
|
clickhouse_client
|
||||||
.query("DELETE FROM bars_validity WHERE symbol = ?")
|
.query("DELETE FROM bars_validity WHERE symbol = ?")
|
||||||
|
@@ -61,13 +61,12 @@ pub async fn add(
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let asset = asset.json::<incoming::asset::Asset>().await.unwrap();
|
let asset = asset.json::<incoming::asset::Asset>().await.unwrap();
|
||||||
|
|
||||||
if asset.status != Status::Active || !asset.tradable || !asset.fractionable {
|
if asset.status != Status::Active || !asset.tradable || !asset.fractionable {
|
||||||
return Err(StatusCode::FORBIDDEN);
|
return Err(StatusCode::FORBIDDEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
let asset = Asset::from(asset);
|
let asset = Asset::from(asset);
|
||||||
database::assets::insert(&app_config.clickhouse_client, &asset).await;
|
database::assets::upsert(&app_config.clickhouse_client, &asset).await;
|
||||||
|
|
||||||
broadcast_sender
|
broadcast_sender
|
||||||
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Added(
|
.send(BroadcastMessage::Asset(asset::BroadcastMessage::Added(
|
||||||
|
13
src/types/alpaca/api/incoming/clock.rs
Normal file
13
src/types/alpaca/api/incoming/clock.rs
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct Clock {
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub timestamp: OffsetDateTime,
|
||||||
|
pub is_open: bool,
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub next_open: OffsetDateTime,
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub next_close: OffsetDateTime,
|
||||||
|
}
|
@@ -1,2 +1,3 @@
|
|||||||
pub mod asset;
|
pub mod asset;
|
||||||
pub mod bar;
|
pub mod bar;
|
||||||
|
pub mod clock;
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
pub mod incoming;
|
pub mod incoming;
|
||||||
|
pub mod outgoing;
|
||||||
|
|
||||||
macro_rules! impl_from_enum {
|
macro_rules! impl_from_enum {
|
||||||
($source:ty, $target:ty, $( $variant:ident ),* ) => {
|
($source:ty, $target:ty, $( $variant:ident ),* ) => {
|
||||||
|
35
src/types/alpaca/api/outgoing/bar.rs
Normal file
35
src/types/alpaca/api/outgoing/bar.rs
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct Bar {
|
||||||
|
pub symbols: Vec<String>,
|
||||||
|
pub timeframe: String,
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub start: OffsetDateTime,
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub end: OffsetDateTime,
|
||||||
|
pub limit: i64,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub page_token: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Bar {
|
||||||
|
pub fn new(
|
||||||
|
symbols: Vec<String>,
|
||||||
|
timeframe: String,
|
||||||
|
start: OffsetDateTime,
|
||||||
|
end: OffsetDateTime,
|
||||||
|
limit: i64,
|
||||||
|
page_token: Option<String>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
symbols,
|
||||||
|
timeframe,
|
||||||
|
start,
|
||||||
|
end,
|
||||||
|
limit,
|
||||||
|
page_token,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
1
src/types/alpaca/api/outgoing/mod.rs
Normal file
1
src/types/alpaca/api/outgoing/mod.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
pub mod bar;
|
@@ -1,3 +1,3 @@
|
|||||||
pub mod time;
|
pub mod time;
|
||||||
|
|
||||||
pub use time::{duration_until, last_minute, next_minute, ONE_MINUTE};
|
pub use time::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE};
|
||||||
|
@@ -2,16 +2,13 @@ use std::time::Duration;
|
|||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
pub const ONE_MINUTE: Duration = Duration::from_secs(60);
|
pub const ONE_MINUTE: Duration = Duration::from_secs(60);
|
||||||
|
pub const FIFTEEN_MINUTES: Duration = Duration::from_secs(60 * 15);
|
||||||
|
|
||||||
pub fn last_minute() -> OffsetDateTime {
|
pub fn last_minute() -> OffsetDateTime {
|
||||||
let now_timestamp = OffsetDateTime::now_utc().unix_timestamp();
|
let now_timestamp = OffsetDateTime::now_utc().unix_timestamp();
|
||||||
OffsetDateTime::from_unix_timestamp(now_timestamp - now_timestamp % 60).unwrap()
|
OffsetDateTime::from_unix_timestamp(now_timestamp - now_timestamp % 60).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn next_minute() -> OffsetDateTime {
|
|
||||||
last_minute() + ONE_MINUTE
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn duration_until(time: OffsetDateTime) -> Duration {
|
pub fn duration_until(time: OffsetDateTime) -> Duration {
|
||||||
let duration = time - OffsetDateTime::now_utc();
|
let duration = time - OffsetDateTime::now_utc();
|
||||||
|
|
||||||
|
@@ -28,7 +28,8 @@ CREATE TABLE IF NOT EXISTS qrust.bars (
|
|||||||
vwap Float64
|
vwap Float64
|
||||||
)
|
)
|
||||||
ENGINE = ReplacingMergeTree()
|
ENGINE = ReplacingMergeTree()
|
||||||
PRIMARY KEY (symbol, time);
|
PRIMARY KEY (symbol, time)
|
||||||
|
PARTITION BY toYYYYMM(time);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS qrust.bars_validity (
|
CREATE TABLE IF NOT EXISTS qrust.bars_validity (
|
||||||
symbol String,
|
symbol String,
|
||||||
|
Reference in New Issue
Block a user