Add stock market calendar

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-09-05 11:31:47 +03:00
parent c8df507d47
commit 548a8e42d5
21 changed files with 417 additions and 73 deletions

View File

@@ -0,0 +1,36 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO calendar (date, open, close)\n SELECT * FROM UNNEST($1::date[], $2::time[], $3::time[])\n RETURNING date, open, close",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "date",
"type_info": "Date"
},
{
"ordinal": 1,
"name": "open",
"type_info": "Time"
},
{
"ordinal": 2,
"name": "close",
"type_info": "Time"
}
],
"parameters": {
"Left": [
"DateArray",
"TimeArray",
"TimeArray"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60"
}

View File

@@ -0,0 +1,32 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM calendar RETURNING date, open, close",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "date",
"type_info": "Date"
},
{
"ordinal": 1,
"name": "open",
"type_info": "Time"
},
{
"ordinal": 2,
"name": "close",
"type_info": "Time"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false
]
},
"hash": "b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8"
}

100
backend/Cargo.lock generated
View File

@@ -144,6 +144,7 @@ dependencies = [
"axum",
"dotenv",
"futures-util",
"governor",
"http",
"log",
"log4rs",
@@ -330,6 +331,19 @@ dependencies = [
"typenum",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.0",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "data-encoding"
version = "2.4.0"
@@ -507,6 +521,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.28"
@@ -574,12 +603,19 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@@ -618,6 +654,24 @@ version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
[[package]]
name = "governor"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "821239e5672ff23e2a7060901fa622950bbd80b649cdaadd78d1c1767ed14eb4"
dependencies = [
"cfg-if",
"dashmap",
"futures",
"futures-timer",
"no-std-compat",
"nonzero_ext",
"parking_lot",
"quanta",
"rand",
"smallvec",
]
[[package]]
name = "h2"
version = "0.3.21"
@@ -965,6 +1019,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "mach2"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8"
dependencies = [
"libc",
]
[[package]]
name = "matchit"
version = "0.7.2"
@@ -1036,6 +1099,12 @@ dependencies = [
"tempfile",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nom"
version = "7.1.3"
@@ -1046,6 +1115,12 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "num-bigint-dig"
version = "0.8.4"
@@ -1290,6 +1365,22 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "quanta"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab"
dependencies = [
"crossbeam-utils",
"libc",
"mach2",
"once_cell",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quote"
version = "1.0.33"
@@ -1329,6 +1420,15 @@ dependencies = [
"getrandom",
]
[[package]]
name = "raw-cpuid"
version = "10.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"

View File

@@ -32,6 +32,13 @@ time = { version = "0.3.27", features = [
"serde",
] }
futures-util = "0.3.28"
reqwest = { version = "0.11.20", features = ["json", "serde_json"] }
tokio-tungstenite = { version = "0.20.0", features = ["tokio-native-tls", "native-tls"] }
reqwest = { version = "0.11.20", features = [
"json",
"serde_json",
] }
tokio-tungstenite = { version = "0.20.0", features = [
"tokio-native-tls",
"native-tls",
] }
http = "0.2.9"
governor = "0.6.0"

View File

@@ -1,3 +1,5 @@
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use http::HeaderMap;
use reqwest::Client;
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::{env, sync::Arc};
@@ -5,22 +7,35 @@ use std::{env, sync::Arc};
pub struct AppConfig {
pub alpaca_api_key: String,
pub alpaca_api_secret: String,
pub alpaca_client: Client,
pub alpaca_rate_limit: DefaultDirectRateLimiter,
pub postgres_pool: PgPool,
pub reqwest_client: Client,
}
const NUM_CLIENTS: usize = 10;
impl AppConfig {
pub async fn from_env() -> Result<Self, Box<dyn std::error::Error>> {
let alpaca_api_key = env::var("ALPACA_API_KEY")?;
let alpaca_api_secret = env::var("ALPACA_API_SECRET")?;
let alpaca_rate_limit = env::var("ALPACA_RATE_LIMIT")?;
Ok(AppConfig {
alpaca_api_key: env::var("APCA_API_KEY_ID").unwrap(),
alpaca_api_secret: env::var("APCA_API_SECRET_KEY").unwrap(),
alpaca_api_key: alpaca_api_key.clone(),
alpaca_api_secret: alpaca_api_secret.clone(),
alpaca_client: Client::builder()
.default_headers({
let mut headers = HeaderMap::new();
headers.insert("APCA-API-KEY-ID", alpaca_api_key.parse()?);
headers.insert("APCA-API-SECRET-KEY", alpaca_api_secret.parse()?);
headers
})
.build()?,
alpaca_rate_limit: RateLimiter::direct(Quota::per_minute(alpaca_rate_limit.parse()?)),
postgres_pool: PgPoolOptions::new()
.max_connections(NUM_CLIENTS as u32)
.connect(&env::var("DATABASE_URL")?)
.await?,
reqwest_client: Client::new(),
})
}

View File

@@ -0,0 +1,41 @@
use crate::{
config::AppConfig,
database,
types::{api, CalendarDate},
};
use log::info;
use std::{error::Error, sync::Arc, time::Duration};
use tokio::time::interval;
const ALPACA_CALENDAR_API_URL: &str = "https://api.alpaca.markets/v2/calendar";
const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 3);
const EARLIEST_DATE: &str = "1970-01-01";
const LATEST_DATE: &str = "2029-12-31";
pub async fn run(app_config: Arc<AppConfig>) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut interval = interval(REFRESH_INTERVAL);
loop {
interval.tick().await;
info!("Refreshing calendar...");
app_config.alpaca_rate_limit.until_ready().await;
let calendar_dates = app_config
.alpaca_client
.get(ALPACA_CALENDAR_API_URL)
.query(&[("start", EARLIEST_DATE), ("end", LATEST_DATE)])
.send()
.await?
.json::<Vec<api::incoming::CalendarDate>>()
.await?
.iter()
.map(CalendarDate::from)
.collect::<Vec<CalendarDate>>();
database::calendar::reset_calendar_dates(&app_config.postgres_pool, &calendar_dates)
.await?;
info!("Refreshed calendar.");
}
}

View File

@@ -16,20 +16,18 @@ use futures_util::{
};
use log::{debug, error, info, warn};
use serde_json::{from_str, to_string};
use std::{error::Error, sync::Arc, time::Duration};
use std::{error::Error, sync::Arc};
use tokio::{
net::TcpStream,
spawn,
sync::{broadcast::Receiver, RwLock},
time::timeout,
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2/iex";
const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us";
const TIMEOUT_DURATION: Duration = Duration::from_millis(100);
pub async fn run_data_live(
pub async fn run(
class: Class,
app_config: Arc<AppConfig>,
asset_broadcast_receiver: Receiver<AssetBroadcastMessage>,
@@ -82,7 +80,6 @@ pub async fn run_data_live(
}
let sink = Arc::new(RwLock::new(sink));
let stream = Arc::new(RwLock::new(stream));
info!("Running live data thread for {:?}.", class);
@@ -101,13 +98,11 @@ pub async fn websocket_handler(
app_config: Arc<AppConfig>,
class: Class,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
stream: Arc<RwLock<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
mut stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
loop {
let mut stream = stream.write().await;
match timeout(TIMEOUT_DURATION, stream.next()).await {
Ok(Some(Ok(Message::Text(data)))) => match from_str::<Vec<IncomingMessage>>(&data) {
match stream.next().await {
Some(Ok(Message::Text(data))) => match from_str::<Vec<IncomingMessage>>(&data) {
Ok(parsed_data) => {
for message in parsed_data {
match message {
@@ -132,13 +127,9 @@ pub async fn websocket_handler(
warn!("Unparsed incoming message: {:?}: {}", data, e);
}
},
Ok(Some(Ok(Message::Ping(_)))) => {
sink.write().await.send(Message::Pong(vec![])).await?
}
Ok(unknown) => {
error!("Unknown incoming message: {:?}", unknown);
}
Err(_) => {}
Some(Ok(Message::Ping(_))) => sink.write().await.send(Message::Pong(vec![])).await?,
Some(unknown) => error!("Unknown incoming message: {:?}", unknown),
None => panic!(),
}
}
}

View File

@@ -1 +1,2 @@
pub mod calendar;
pub mod live;

View File

@@ -9,9 +9,9 @@ pub async fn get_assets(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"#
)
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
}
pub async fn get_assets_with_class(
@@ -22,9 +22,9 @@ pub async fn get_assets_with_class(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = $1::CLASS"#, &class as &Class
)
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
}
pub async fn get_asset(
@@ -35,9 +35,9 @@ pub async fn get_asset(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol
)
.fetch_optional(postgres_pool)
.await
.map_err(|e| e.into())
.fetch_optional(postgres_pool)
.await
.map_err(|e| e.into())
}
pub async fn add_asset(
@@ -50,9 +50,9 @@ pub async fn add_asset(
RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#,
asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added
)
.fetch_one(postgres_pool)
.await
.map_err(|e| e.into())
.fetch_one(postgres_pool)
.await
.map_err(|e| e.into())
}
pub async fn update_asset_trading(
@@ -66,9 +66,9 @@ pub async fn update_asset_trading(
RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#,
trading, symbol
)
.fetch_optional(postgres_pool)
.await
.map_err(|e| e.into())
.fetch_optional(postgres_pool)
.await
.map_err(|e| e.into())
}
pub async fn delete_asset(
@@ -81,7 +81,7 @@ pub async fn delete_asset(
RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#,
symbol
)
.fetch_optional(postgres_pool)
.await
.unwrap())
.fetch_optional(postgres_pool)
.await
.unwrap())
}

View File

@@ -13,9 +13,9 @@ pub async fn add_bar(
RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#,
bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted
)
.fetch_one(postgres_pool)
.await
.map_err(|e| e.into())
.fetch_one(postgres_pool)
.await
.map_err(|e| e.into())
}
#[allow(dead_code)]
@@ -52,7 +52,7 @@ pub async fn add_bars(
RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#,
&timestamps, &asset_symbols, &opens, &highs, &lows, &closes, &volumes, &num_trades, &volumes_weighted
)
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
}

View File

@@ -0,0 +1,54 @@
use crate::types::CalendarDate;
use sqlx::{query_as, PgPool};
use std::error::Error;
pub async fn add_calendar_dates(
postgres_pool: &PgPool,
calendar_dates: &Vec<CalendarDate>,
) -> Result<Vec<CalendarDate>, Box<dyn Error + Send + Sync>> {
let mut dates = Vec::with_capacity(calendar_dates.len());
let mut opens = Vec::with_capacity(calendar_dates.len());
let mut closes = Vec::with_capacity(calendar_dates.len());
for calendar_date in calendar_dates {
dates.push(calendar_date.date);
opens.push(calendar_date.open);
closes.push(calendar_date.close);
}
query_as!(
CalendarDate,
r#"INSERT INTO calendar (date, open, close)
SELECT * FROM UNNEST($1::date[], $2::time[], $3::time[])
RETURNING date, open, close"#,
&dates,
&opens,
&closes
)
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
}
pub async fn delete_all_calendar_dates(
postgres_pool: &PgPool,
) -> Result<Vec<CalendarDate>, Box<dyn Error + Send + Sync>> {
query_as!(
CalendarDate,
"DELETE FROM calendar RETURNING date, open, close"
)
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
}
pub async fn reset_calendar_dates(
postgres_pool: &PgPool,
calendar_dates: &Vec<CalendarDate>,
) -> Result<Vec<CalendarDate>, Box<dyn Error + Send + Sync>> {
let transaction = postgres_pool.begin().await?;
delete_all_calendar_dates(postgres_pool).await?;
let calendar_dates = add_calendar_dates(postgres_pool, calendar_dates).await;
transaction.commit().await?;
calendar_dates
}

View File

@@ -1,2 +1,3 @@
pub mod assets;
pub mod bars;
pub mod calendar;

View File

@@ -5,9 +5,7 @@ mod routes;
mod types;
use config::AppConfig;
use data::live::run_data_live;
use dotenv::dotenv;
use routes::run_api;
use std::error::Error;
use tokio::{spawn, sync::broadcast};
use types::{AssetBroadcastMessage, Class};
@@ -17,31 +15,32 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
dotenv().ok();
log4rs::init_file("log4rs.yaml", Default::default()).unwrap();
let app_config = AppConfig::arc_from_env().await.unwrap();
let mut threads = Vec::new();
threads.push(spawn(data::calendar::run(app_config.clone())));
let (asset_broadcast_sender, _) = broadcast::channel::<AssetBroadcastMessage>(100);
// Stock Live Data
threads.push(spawn(run_data_live(
threads.push(spawn(data::live::run(
Class::UsEquity,
app_config.clone(),
asset_broadcast_sender.subscribe(),
)));
// Crypto Live Data
threads.push(spawn(run_data_live(
threads.push(spawn(data::live::run(
Class::Crypto,
app_config.clone(),
asset_broadcast_sender.subscribe(),
)));
// REST API
threads.push(spawn(run_api(app_config.clone(), asset_broadcast_sender)));
threads.push(spawn(routes::run(
app_config.clone(),
asset_broadcast_sender,
)));
for thread in threads {
thread.await??;
}
Ok(())
unreachable!()
}

View File

@@ -1,17 +1,15 @@
use std::sync::Arc;
use crate::config::AppConfig;
use crate::database;
use crate::database::assets::update_asset_trading;
use crate::types::api;
use crate::types::{Asset, AssetBroadcastMessage, Status};
use axum::{extract::Path, http::StatusCode, Extension, Json};
use http::Method;
use log::info;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast::Sender;
const ALPACA_API_URL: &str = "https://api.alpaca.markets/v2";
const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets";
pub async fn get_assets(
Extension(app_config): Extension<Arc<AppConfig>>,
@@ -56,14 +54,10 @@ pub async fn add_asset(
return Err(StatusCode::CONFLICT);
}
app_config.alpaca_rate_limit.until_ready().await;
let asset = app_config
.reqwest_client
.request(
Method::GET,
&format!("{}/assets/{}", ALPACA_API_URL, request.symbol),
)
.header("APCA-API-KEY-ID", &app_config.alpaca_api_key)
.header("APCA-API-SECRET-KEY", &app_config.alpaca_api_secret)
.alpaca_client
.get(&format!("{}/{}", ALPACA_ASSET_API_URL, request.symbol))
.send()
.await
.map_err(|e| match e.status() {

View File

@@ -9,7 +9,7 @@ use tokio::sync::broadcast::Sender;
pub mod assets;
pub async fn run_api(
pub async fn run(
app_config: Arc<AppConfig>,
asset_broadcast_sender: Sender<AssetBroadcastMessage>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
@@ -26,5 +26,5 @@ pub async fn run_api(
info!("Listening on {}...", addr);
Server::bind(&addr).serve(app.into_make_service()).await?;
Ok(())
unreachable!()
}

View File

@@ -0,0 +1,29 @@
use serde::{Deserialize, Deserializer};
use time::{macros::format_description, Date, Time};
#[derive(Debug, PartialEq, Deserialize)]
pub struct CalendarDate {
#[serde(deserialize_with = "deserialize_date")]
pub date: Date,
#[serde(deserialize_with = "deserialize_time")]
pub open: Time,
#[serde(deserialize_with = "deserialize_time")]
pub close: Time,
}
fn deserialize_date<'de, D>(deserializer: D) -> Result<Date, D::Error>
where
D: Deserializer<'de>,
{
let date_str = String::deserialize(deserializer)?;
Date::parse(&date_str, format_description!("[year]-[month]-[day]"))
.map_err(serde::de::Error::custom)
}
fn deserialize_time<'de, D>(deserializer: D) -> Result<Time, D::Error>
where
D: Deserializer<'de>,
{
let time_str = String::deserialize(deserializer)?;
Time::parse(&time_str, format_description!("[hour]:[minute]")).map_err(serde::de::Error::custom)
}

View File

@@ -1,3 +1,5 @@
pub mod asset;
pub mod calendar;
pub use asset::*;
pub use calendar::*;

View File

@@ -1,4 +1,4 @@
use super::websocket::incoming::BarMessage;
use super::websocket;
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use time::OffsetDateTime;
@@ -16,8 +16,8 @@ pub struct Bar {
pub volume_weighted: f64,
}
impl From<BarMessage> for Bar {
fn from(bar_message: BarMessage) -> Self {
impl From<websocket::incoming::BarMessage> for Bar {
fn from(bar_message: websocket::incoming::BarMessage) -> Self {
Self {
timestamp: bar_message.timestamp,
asset_symbol: bar_message.symbol,

View File

@@ -0,0 +1,18 @@
use super::api;
use time::{Date, Time};
pub struct CalendarDate {
pub date: Date,
pub open: Time,
pub close: Time,
}
impl From<&api::incoming::CalendarDate> for CalendarDate {
fn from(calendar: &api::incoming::CalendarDate) -> Self {
Self {
date: calendar.date,
open: calendar.open,
close: calendar.close,
}
}
}

View File

@@ -1,6 +1,7 @@
pub mod api;
pub mod asset;
pub mod bar;
pub mod calendar;
pub mod class;
pub mod exchange;
pub mod status;
@@ -8,6 +9,7 @@ pub mod websocket;
pub use asset::*;
pub use bar::*;
pub use calendar::*;
pub use class::*;
pub use exchange::*;
pub use status::*;