diff --git a/src/config.rs b/src/config.rs index 8e2aa73..c8d9fe9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,6 @@ -use crate::types::alpaca::shared::Source; +use crate::types::alpaca::shared::{Mode, Source}; use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; +use lazy_static::lazy_static; use reqwest::{ header::{HeaderMap, HeaderName, HeaderValue}, Client, @@ -14,20 +15,35 @@ use rust_bert::{ use std::{env, num::NonZeroU32, path::PathBuf, sync::Arc}; use tokio::sync::Mutex; -pub const ALPACA_ACCOUNT_API_URL: &str = "https://api.alpaca.markets/v2/account"; -pub const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets"; -pub const ALPACA_ORDER_API_URL: &str = "https://api.alpaca.markets/v2/orders"; -pub const ALPACA_POSITION_API_URL: &str = "https://api.alpaca.markets/v2/positions"; -pub const ALPACA_CLOCK_API_URL: &str = "https://api.alpaca.markets/v2/clock"; +lazy_static! { + pub static ref ALPACA_MODE: Mode = env::var("ALPACA_MODE") + .expect("ALPACA_MODE must be set.") + .parse() + .expect("ALPACA_MODE must be 'live' or 'paper'"); + static ref ALPACA_URL_SUBDOMAIN: String = match *ALPACA_MODE { + Mode::Live => String::from("api"), + Mode::Paper => String::from("paper-api"), + }; + #[derive(Debug)] + pub static ref ALPACA_API_URL: String = format!( + "https://{subdomain}.alpaca.markets/v2", + subdomain = *ALPACA_URL_SUBDOMAIN + ); + #[derive(Debug)] + pub static ref ALPACA_WEBSOCKET_URL: String = format!( + "wss://{subdomain}.alpaca.markets/stream", + subdomain = *ALPACA_URL_SUBDOMAIN + ); +} -pub const ALPACA_STOCK_DATA_URL: &str = "https://data.alpaca.markets/v2/stocks/bars"; -pub const ALPACA_CRYPTO_DATA_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars"; -pub const ALPACA_NEWS_DATA_URL: &str = "https://data.alpaca.markets/v1beta1/news"; +pub const ALPACA_STOCK_DATA_API_URL: &str = "https://data.alpaca.markets/v2/stocks/bars"; +pub const ALPACA_CRYPTO_DATA_API_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars"; +pub const ALPACA_NEWS_DATA_API_URL: &str = "https://data.alpaca.markets/v1beta1/news"; -pub const ALPACA_TRADING_WEBSOCKET_URL: &str = "wss://api.alpaca.markets/stream"; -pub const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2"; -pub const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; -pub const ALPACA_NEWS_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta1/news"; +pub const ALPACA_STOCK_DATA_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2"; +pub const ALPACA_CRYPTO_DATA_WEBSOCKET_URL: &str = + "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; +pub const ALPACA_NEWS_DATA_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta1/news"; pub struct Config { pub alpaca_api_key: String, diff --git a/src/init.rs b/src/init.rs new file mode 100644 index 0000000..e8d8b55 --- /dev/null +++ b/src/init.rs @@ -0,0 +1,103 @@ +use crate::{ + config::{Config, ALPACA_MODE}, + database, + types::alpaca::{self, api, shared::Sort}, +}; +use log::{info, warn}; +use std::{collections::HashSet, sync::Arc}; +use time::OffsetDateTime; + +pub async fn check_account(config: &Arc) { + let account = alpaca::api::incoming::account::get(config, None) + .await + .unwrap(); + + assert!( + !(account.status != alpaca::api::incoming::account::Status::Active), + "Account status is not active: {:?}.", + account.status + ); + assert!( + !account.trade_suspend_by_user, + "Account trading is suspended by user." + ); + assert!(!account.trading_blocked, "Account trading is blocked."); + assert!(!account.blocked, "Account is blocked."); + + if account.cash == 0.0 { + warn!("Account cash is zero, qrust will not be able to trade."); + } + + warn!( + "qrust active on {} account with {} {}, avoid transferring funds without shutting down.", + *ALPACA_MODE, account.currency, account.cash + ); +} + +pub async fn rehydrate_orders(config: &Arc) { + info!("Rehydrating order data."); + + let mut orders = vec![]; + let mut after = OffsetDateTime::UNIX_EPOCH; + + while let Some(message) = api::incoming::order::get( + config, + &api::outgoing::order::Order { + status: Some(api::outgoing::order::Status::All), + limit: Some(500), + after: Some(after), + until: None, + direction: Some(Sort::Asc), + nested: Some(true), + symbols: None, + side: None, + }, + None, + ) + .await + .ok() + .filter(|message| !message.is_empty()) + { + orders.extend(message); + after = orders.last().unwrap().submitted_at; + } + + let orders = orders + .into_iter() + .flat_map(&api::incoming::order::Order::normalize) + .collect::>(); + + database::orders::upsert_batch(&config.clickhouse_client, &orders) + .await + .unwrap(); + + info!("Rehydrated order data."); +} + +pub async fn check_positions(config: &Arc) { + let positions_future = async { + alpaca::api::incoming::position::get(config, None) + .await + .unwrap() + }; + + let assets_future = async { + database::assets::select(&config.clickhouse_client) + .await + .unwrap() + .into_iter() + .map(|asset| asset.symbol) + .collect::>() + }; + + let (positions, assets) = tokio::join!(positions_future, assets_future); + + for position in positions { + if !assets.contains(&position.symbol) { + warn!( + "Position for unmonitored asset: {}, {} shares.", + position.symbol, position.qty + ); + } + } +} diff --git a/src/main.rs b/src/main.rs index 0bf5aeb..6d1a730 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ mod config; mod database; +mod init; mod routes; mod threads; mod types; @@ -33,8 +34,9 @@ async fn main() { ) .unwrap(); - threads::trading::check_account(&config).await; - threads::trading::check_positions(&config).await; + init::check_account(&config).await; + init::rehydrate_orders(&config).await; + init::check_positions(&config).await; spawn(threads::trading::run(config.clone())); diff --git a/src/threads/data/backfill.rs b/src/threads/data/backfill.rs index 7d2947a..ee48354 100644 --- a/src/threads/data/backfill.rs +++ b/src/threads/data/backfill.rs @@ -1,6 +1,6 @@ use super::ThreadType; use crate::{ - config::{Config, ALPACA_CRYPTO_DATA_URL, ALPACA_STOCK_DATA_URL}, + config::{Config, ALPACA_CRYPTO_DATA_API_URL, ALPACA_STOCK_DATA_API_URL}, database, types::{ alpaca::{ @@ -77,6 +77,7 @@ pub async fn run(handler: Arc>, mut receiver: mpsc::Receiver) -> Box Box::new(BarHandler { config, - data_url: ALPACA_STOCK_DATA_URL, + data_url: ALPACA_STOCK_DATA_API_URL, api_query_constructor: us_equity_query_constructor, }), ThreadType::Bars(Class::Crypto) => Box::new(BarHandler { config, - data_url: ALPACA_CRYPTO_DATA_URL, + data_url: ALPACA_CRYPTO_DATA_API_URL, api_query_constructor: crypto_query_constructor, }), ThreadType::News => Box::new(NewsHandler { config }), diff --git a/src/threads/data/mod.rs b/src/threads/data/mod.rs index 41557bc..8d78fc2 100644 --- a/src/threads/data/mod.rs +++ b/src/threads/data/mod.rs @@ -4,7 +4,8 @@ mod websocket; use super::clock; use crate::{ config::{ - Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_NEWS_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL, + Config, ALPACA_CRYPTO_DATA_WEBSOCKET_URL, ALPACA_NEWS_DATA_WEBSOCKET_URL, + ALPACA_STOCK_DATA_WEBSOCKET_URL, }, create_send_await, database, types::{alpaca, Asset, Class}, @@ -99,10 +100,13 @@ async fn init_thread( ) { let websocket_url = match thread_type { ThreadType::Bars(Class::UsEquity) => { - format!("{}/{}", ALPACA_STOCK_WEBSOCKET_URL, &config.alpaca_source) + format!( + "{}/{}", + ALPACA_STOCK_DATA_WEBSOCKET_URL, &config.alpaca_source + ) } - ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_WEBSOCKET_URL.into(), - ThreadType::News => ALPACA_NEWS_WEBSOCKET_URL.into(), + ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_DATA_WEBSOCKET_URL.into(), + ThreadType::News => ALPACA_NEWS_DATA_WEBSOCKET_URL.into(), }; let (websocket, _) = connect_async(websocket_url).await.unwrap(); diff --git a/src/threads/data/websocket.rs b/src/threads/data/websocket.rs index fa725df..26bc702 100644 --- a/src/threads/data/websocket.rs +++ b/src/threads/data/websocket.rs @@ -186,9 +186,7 @@ async fn handle_websocket_message( ) { match message { tungstenite::Message::Text(message) => { - let message = from_str::>(&message); - - if let Ok(message) = message { + if let Ok(message) = from_str::>(&message) { for message in message { let handler = handler.clone(); let pending = pending.clone(); diff --git a/src/threads/trading/mod.rs b/src/threads/trading/mod.rs index 725c9a6..69db55e 100644 --- a/src/threads/trading/mod.rs +++ b/src/threads/trading/mod.rs @@ -1,78 +1,21 @@ -mod rehydrate; mod websocket; use crate::{ - config::{Config, ALPACA_TRADING_WEBSOCKET_URL}, - database, + config::{Config, ALPACA_WEBSOCKET_URL}, types::alpaca, }; use futures_util::StreamExt; -use log::warn; -use rehydrate::rehydrate; -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; use tokio::spawn; use tokio_tungstenite::connect_async; pub async fn run(config: Arc) { - let (websocket, _) = connect_async(ALPACA_TRADING_WEBSOCKET_URL).await.unwrap(); + let (websocket, _) = connect_async(&*ALPACA_WEBSOCKET_URL).await.unwrap(); let (mut websocket_sink, mut websocket_stream) = websocket.split(); alpaca::websocket::trading::authenticate(&config, &mut websocket_sink, &mut websocket_stream) .await; alpaca::websocket::trading::subscribe(&mut websocket_sink, &mut websocket_stream).await; - rehydrate(&config).await; spawn(websocket::run(config, websocket_stream, websocket_sink)); } - -pub async fn check_account(config: &Arc) { - let account = alpaca::api::incoming::account::get(config, None) - .await - .unwrap(); - - assert!( - !(account.status != alpaca::api::incoming::account::Status::Active), - "Account status is not active: {:?}.", - account.status - ); - assert!( - !account.trade_suspend_by_user, - "Account trading is suspended by user." - ); - assert!(!account.trading_blocked, "Account trading is blocked."); - assert!(!account.blocked, "Account is blocked."); - - if account.cash == 0.0 { - warn!("Account cash is zero, qrust will not be able to trade."); - } - - warn!( - "qrust active with {}{}, avoid transferring funds without shutting down.", - account.cash, account.currency - ); -} - -pub async fn check_positions(config: &Arc) { - let positions_future = async { - alpaca::api::incoming::position::get(config, None) - .await - .unwrap() - }; - - let assets_future = async { - database::assets::select(&config.clickhouse_client) - .await - .unwrap() - .into_iter() - .map(|asset| asset.symbol) - .collect::>() - }; - - let (positions, assets) = tokio::join!(positions_future, assets_future); - - for position in positions { - if !assets.contains(&position.symbol) { - warn!("Position for unmonitored asset: {:?}", position.symbol); - } - } -} diff --git a/src/threads/trading/rehydrate.rs b/src/threads/trading/rehydrate.rs deleted file mode 100644 index 8fa4e34..0000000 --- a/src/threads/trading/rehydrate.rs +++ /dev/null @@ -1,48 +0,0 @@ -use crate::{ - config::Config, - database, - types::alpaca::{api, shared::Sort}, -}; -use log::info; -use std::sync::Arc; -use time::OffsetDateTime; - -pub async fn rehydrate(config: &Arc) { - info!("Rehydrating trading data."); - - let mut orders = vec![]; - let mut after = OffsetDateTime::UNIX_EPOCH; - - while let Some(message) = api::incoming::order::get( - config, - &api::outgoing::order::Order { - status: Some(api::outgoing::order::Status::All), - limit: Some(500), - after: Some(after), - until: None, - direction: Some(Sort::Asc), - nested: Some(true), - symbols: None, - side: None, - }, - None, - ) - .await - .ok() - .filter(|message| !message.is_empty()) - { - orders.extend(message); - after = orders.last().unwrap().submitted_at; - } - - let orders = orders - .into_iter() - .flat_map(&api::incoming::order::Order::normalize) - .collect::>(); - - database::orders::upsert_batch(&config.clickhouse_client, &orders) - .await - .unwrap(); - - info!("Rehydrated trading data."); -} diff --git a/src/types/alpaca/api/incoming/account.rs b/src/types/alpaca/api/incoming/account.rs index 8c2eeec..fccb963 100644 --- a/src/types/alpaca/api/incoming/account.rs +++ b/src/types/alpaca/api/incoming/account.rs @@ -1,3 +1,4 @@ +use crate::config::{Config, ALPACA_API_URL}; use backoff::{future::retry_notify, ExponentialBackoff}; use log::warn; use reqwest::Error; @@ -9,8 +10,6 @@ use std::{sync::Arc, time::Duration}; use time::OffsetDateTime; use uuid::Uuid; -use crate::config::{Config, ALPACA_ACCOUNT_API_URL}; - #[derive(Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub enum Status { @@ -90,7 +89,7 @@ pub async fn get( config.alpaca_rate_limit.until_ready().await; config .alpaca_client - .get(ALPACA_ACCOUNT_API_URL) + .get(&format!("{}/account", *ALPACA_API_URL)) .send() .await? .error_for_status() diff --git a/src/types/alpaca/api/incoming/asset.rs b/src/types/alpaca/api/incoming/asset.rs index e29be72..93177f9 100644 --- a/src/types/alpaca/api/incoming/asset.rs +++ b/src/types/alpaca/api/incoming/asset.rs @@ -1,6 +1,6 @@ use super::position::Position; use crate::{ - config::{Config, ALPACA_ASSET_API_URL}, + config::{Config, ALPACA_API_URL}, types::{ self, alpaca::shared::asset::{Class, Exchange, Status}, @@ -57,7 +57,7 @@ pub async fn get_by_symbol( config.alpaca_rate_limit.until_ready().await; config .alpaca_client - .get(&format!("{ALPACA_ASSET_API_URL}/{symbol}")) + .get(&format!("{}/assets/{}", *ALPACA_API_URL, symbol)) .send() .await? .error_for_status() diff --git a/src/types/alpaca/api/incoming/clock.rs b/src/types/alpaca/api/incoming/clock.rs index 0ff4176..33ec1ec 100644 --- a/src/types/alpaca/api/incoming/clock.rs +++ b/src/types/alpaca/api/incoming/clock.rs @@ -1,4 +1,4 @@ -use crate::config::{Config, ALPACA_CLOCK_API_URL}; +use crate::config::{Config, ALPACA_API_URL}; use backoff::{future::retry_notify, ExponentialBackoff}; use log::warn; use reqwest::Error; @@ -27,7 +27,7 @@ pub async fn get( config.alpaca_rate_limit.until_ready().await; config .alpaca_client - .get(ALPACA_CLOCK_API_URL) + .get(&format!("{}/clock", *ALPACA_API_URL)) .send() .await? .error_for_status() diff --git a/src/types/alpaca/api/incoming/news.rs b/src/types/alpaca/api/incoming/news.rs index 042672f..548237d 100644 --- a/src/types/alpaca/api/incoming/news.rs +++ b/src/types/alpaca/api/incoming/news.rs @@ -1,5 +1,5 @@ use crate::{ - config::{Config, ALPACA_NEWS_DATA_URL}, + config::{Config, ALPACA_NEWS_DATA_API_URL}, types::{ self, alpaca::{api::outgoing, shared::news::normalize_html_content}, @@ -83,7 +83,7 @@ pub async fn get_historical( config.alpaca_rate_limit.until_ready().await; config .alpaca_client - .get(ALPACA_NEWS_DATA_URL) + .get(ALPACA_NEWS_DATA_API_URL) .query(query) .send() .await? diff --git a/src/types/alpaca/api/incoming/order.rs b/src/types/alpaca/api/incoming/order.rs index 1ee3d6d..65e2fd8 100644 --- a/src/types/alpaca/api/incoming/order.rs +++ b/src/types/alpaca/api/incoming/order.rs @@ -1,5 +1,5 @@ use crate::{ - config::{Config, ALPACA_ORDER_API_URL}, + config::{Config, ALPACA_API_URL}, types::alpaca::{api::outgoing, shared}, }; use backoff::{future::retry_notify, ExponentialBackoff}; @@ -20,7 +20,7 @@ pub async fn get( config.alpaca_rate_limit.until_ready().await; config .alpaca_client - .get(ALPACA_ORDER_API_URL) + .get(&format!("{}/orders", *ALPACA_API_URL)) .query(query) .send() .await? diff --git a/src/types/alpaca/api/incoming/position.rs b/src/types/alpaca/api/incoming/position.rs index 07dc820..b18f6a6 100644 --- a/src/types/alpaca/api/incoming/position.rs +++ b/src/types/alpaca/api/incoming/position.rs @@ -1,5 +1,5 @@ use crate::{ - config::{Config, ALPACA_POSITION_API_URL}, + config::{Config, ALPACA_API_URL}, types::alpaca::shared::{ self, asset::{Class, Exchange}, @@ -74,7 +74,7 @@ pub async fn get( config.alpaca_rate_limit.until_ready().await; config .alpaca_client - .get(ALPACA_POSITION_API_URL) + .get(&format!("{}/positions", *ALPACA_API_URL)) .send() .await? .error_for_status() @@ -108,7 +108,7 @@ pub async fn get_by_symbol( config.alpaca_rate_limit.until_ready().await; let response = config .alpaca_client - .get(&format!("{ALPACA_POSITION_API_URL}/{symbol}")) + .get(&format!("{}/positions/{}", *ALPACA_API_URL, symbol)) .send() .await?; diff --git a/src/types/alpaca/shared/mod.rs b/src/types/alpaca/shared/mod.rs index ccda22c..0415624 100644 --- a/src/types/alpaca/shared/mod.rs +++ b/src/types/alpaca/shared/mod.rs @@ -1,8 +1,10 @@ pub mod asset; +pub mod mode; pub mod news; pub mod order; pub mod sort; pub mod source; +pub use mode::Mode; pub use sort::Sort; pub use source::Source; diff --git a/src/types/alpaca/shared/mode.rs b/src/types/alpaca/shared/mode.rs new file mode 100644 index 0000000..bb0d336 --- /dev/null +++ b/src/types/alpaca/shared/mode.rs @@ -0,0 +1,33 @@ +use serde::{Deserialize, Serialize}; +use std::{ + fmt::{Display, Formatter}, + str::FromStr, +}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum Mode { + Live, + Paper, +} + +impl FromStr for Mode { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "live" => Ok(Self::Live), + "paper" => Ok(Self::Paper), + _ => Err(format!("Unknown mode: {s}")), + } + } +} + +impl Display for Mode { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + match self { + Self::Live => write!(f, "live"), + Self::Paper => write!(f, "paper"), + } + } +}