diff --git a/Cargo.toml b/Cargo.toml index 130f8c3..1440a2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,14 @@ name = "qrust" version = "0.1.0" edition = "2021" +[lib] +name = "qrust" +path = "src/lib/mod.rs" + +[[bin]] +name = "qrust" +path = "src/main.rs" + [profile.release] panic = 'abort' strip = true diff --git a/src/config.rs b/src/config.rs index e77ad4c..f57e040 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,6 @@ -use crate::types::alpaca::shared::{Mode, Source}; use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; use lazy_static::lazy_static; +use qrust::types::alpaca::shared::{Mode, Source}; use reqwest::{ header::{HeaderMap, HeaderName, HeaderValue}, Client, @@ -15,42 +15,23 @@ use rust_bert::{ use std::{env, num::NonZeroU32, path::PathBuf, sync::Arc}; use tokio::sync::{Mutex, Semaphore}; -pub const ALPACA_STOCK_DATA_API_URL: &str = "https://data.alpaca.markets/v2/stocks/bars"; -pub const ALPACA_CRYPTO_DATA_API_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars"; -pub const ALPACA_NEWS_DATA_API_URL: &str = "https://data.alpaca.markets/v1beta1/news"; - -pub const ALPACA_STOCK_DATA_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2"; -pub const ALPACA_CRYPTO_DATA_WEBSOCKET_URL: &str = - "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; -pub const ALPACA_NEWS_DATA_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta1/news"; - lazy_static! { pub static ref ALPACA_MODE: Mode = env::var("ALPACA_MODE") .expect("ALPACA_MODE must be set.") .parse() .expect("ALPACA_MODE must be 'live' or 'paper'"); + pub static ref ALPACA_API_BASE: String = match *ALPACA_MODE { + Mode::Live => String::from("api"), + Mode::Paper => String::from("paper-api"), + }; pub static ref ALPACA_SOURCE: Source = env::var("ALPACA_SOURCE") .expect("ALPACA_SOURCE must be set.") .parse() .expect("ALPACA_SOURCE must be 'iex', 'sip', or 'otc'"); - pub static ref ALPACA_API_KEY: String = env::var("ALPACA_API_KEY").expect("ALPACA_API_KEY must be set."); - pub static ref ALPACA_API_SECRET: String = env::var("ALPACA_API_SECRET").expect("ALPACA_API_SECRET must be set."); - #[derive(Debug)] - pub static ref ALPACA_API_URL: String = format!( - "https://{}.alpaca.markets/v2", - match *ALPACA_MODE { - Mode::Live => String::from("api"), - Mode::Paper => String::from("paper-api"), - } - ); - #[derive(Debug)] - pub static ref ALPACA_WEBSOCKET_URL: String = format!( - "wss://{}.alpaca.markets/stream", - match *ALPACA_MODE { - Mode::Live => String::from("api"), - Mode::Paper => String::from("paper-api"), - } - ); + pub static ref ALPACA_API_KEY: String = + env::var("ALPACA_API_KEY").expect("ALPACA_API_KEY must be set."); + pub static ref ALPACA_API_SECRET: String = + env::var("ALPACA_API_SECRET").expect("ALPACA_API_SECRET must be set."); pub static ref BERT_MAX_INPUTS: usize = env::var("BERT_MAX_INPUTS") .expect("BERT_MAX_INPUTS must be set.") .parse() diff --git a/src/init.rs b/src/init.rs index 0a0c4c2..c6f654c 100644 --- a/src/init.rs +++ b/src/init.rs @@ -1,9 +1,9 @@ use crate::{ - config::{Config, ALPACA_MODE}, + config::{Config, ALPACA_API_BASE}, database, - types::alpaca, }; use log::{info, warn}; +use qrust::types::alpaca; use std::{collections::HashMap, sync::Arc}; use time::OffsetDateTime; use tokio::join; @@ -13,6 +13,7 @@ pub async fn check_account(config: &Arc) { &config.alpaca_client, &config.alpaca_rate_limiter, None, + &ALPACA_API_BASE, ) .await .unwrap(); @@ -35,7 +36,7 @@ pub async fn check_account(config: &Arc) { warn!( "qrust active on {} account with {} {}, avoid transferring funds without shutting down.", - *ALPACA_MODE, account.currency, account.cash + *ALPACA_API_BASE, account.currency, account.cash ); } @@ -54,6 +55,7 @@ pub async fn rehydrate_orders(config: &Arc) { ..Default::default() }, None, + &ALPACA_API_BASE, ) .await .ok() @@ -87,6 +89,7 @@ pub async fn rehydrate_positions(config: &Arc) { &config.alpaca_client, &config.alpaca_rate_limiter, None, + &ALPACA_API_BASE, ) .await .unwrap() diff --git a/src/database/assets.rs b/src/lib/database/assets.rs similarity index 100% rename from src/database/assets.rs rename to src/lib/database/assets.rs diff --git a/src/database/backfills_bars.rs b/src/lib/database/backfills_bars.rs similarity index 100% rename from src/database/backfills_bars.rs rename to src/lib/database/backfills_bars.rs diff --git a/src/database/backfills_news.rs b/src/lib/database/backfills_news.rs similarity index 100% rename from src/database/backfills_news.rs rename to src/lib/database/backfills_news.rs diff --git a/src/database/bars.rs b/src/lib/database/bars.rs similarity index 100% rename from src/database/bars.rs rename to src/lib/database/bars.rs diff --git a/src/database/calendar.rs b/src/lib/database/calendar.rs similarity index 100% rename from src/database/calendar.rs rename to src/lib/database/calendar.rs diff --git a/src/database/mod.rs b/src/lib/database/mod.rs similarity index 100% rename from src/database/mod.rs rename to src/lib/database/mod.rs diff --git a/src/database/news.rs b/src/lib/database/news.rs similarity index 100% rename from src/database/news.rs rename to src/lib/database/news.rs diff --git a/src/database/orders.rs b/src/lib/database/orders.rs similarity index 100% rename from src/database/orders.rs rename to src/lib/database/orders.rs diff --git a/src/lib/mod.rs b/src/lib/mod.rs new file mode 100644 index 0000000..284cd23 --- /dev/null +++ b/src/lib/mod.rs @@ -0,0 +1,3 @@ +pub mod database; +pub mod types; +pub mod utils; diff --git a/src/types/alpaca/api/incoming/account.rs b/src/lib/types/alpaca/api/incoming/account.rs similarity index 97% rename from src/types/alpaca/api/incoming/account.rs rename to src/lib/types/alpaca/api/incoming/account.rs index 7b9962d..831d55c 100644 --- a/src/types/alpaca/api/incoming/account.rs +++ b/src/lib/types/alpaca/api/incoming/account.rs @@ -1,4 +1,3 @@ -use crate::config::ALPACA_API_URL; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; use log::warn; @@ -84,13 +83,14 @@ pub async fn get( client: &Client, rate_limiter: &DefaultDirectRateLimiter, backoff: Option, + api_base: &str, ) -> Result { retry_notify( backoff.unwrap_or_default(), || async { rate_limiter.until_ready().await; client - .get(&format!("{}/account", *ALPACA_API_URL)) + .get(&format!("https://{}.alpaca.markets/v2/account", api_base)) .send() .await? .error_for_status() diff --git a/src/types/alpaca/api/incoming/asset.rs b/src/lib/types/alpaca/api/incoming/asset.rs similarity index 88% rename from src/types/alpaca/api/incoming/asset.rs rename to src/lib/types/alpaca/api/incoming/asset.rs index 5cd9f16..bfa4ab7 100644 --- a/src/types/alpaca/api/incoming/asset.rs +++ b/src/lib/types/alpaca/api/incoming/asset.rs @@ -1,12 +1,9 @@ use super::position::Position; -use crate::{ - config::ALPACA_API_URL, - types::{ - self, - alpaca::{ - api::outgoing, - shared::asset::{Class, Exchange, Status}, - }, +use crate::types::{ + self, + alpaca::{ + api::outgoing, + shared::asset::{Class, Exchange, Status}, }, }; use backoff::{future::retry_notify, ExponentialBackoff}; @@ -57,13 +54,14 @@ pub async fn get( rate_limiter: &DefaultDirectRateLimiter, query: &outgoing::asset::Asset, backoff: Option, + api_base: &str, ) -> Result, Error> { retry_notify( backoff.unwrap_or_default(), || async { rate_limiter.until_ready().await; client - .get(&format!("{}/assets", *ALPACA_API_URL)) + .get(&format!("https://{}.alpaca.markets/v2/assets", api_base)) .query(query) .send() .await? @@ -96,13 +94,17 @@ pub async fn get_by_symbol( rate_limiter: &DefaultDirectRateLimiter, symbol: &str, backoff: Option, + api_base: &str, ) -> Result { retry_notify( backoff.unwrap_or_default(), || async { rate_limiter.until_ready().await; client - .get(&format!("{}/assets/{}", *ALPACA_API_URL, symbol)) + .get(&format!( + "https://{}.alpaca.markets/v2/assets/{}", + api_base, symbol + )) .send() .await? .error_for_status() @@ -134,10 +136,11 @@ pub async fn get_by_symbols( rate_limiter: &DefaultDirectRateLimiter, symbols: &[String], backoff: Option, + api_base: &str, ) -> Result, Error> { if symbols.len() < 2 { let symbol = symbols.first().unwrap(); - let asset = get_by_symbol(client, rate_limiter, symbol, backoff).await?; + let asset = get_by_symbol(client, rate_limiter, symbol, backoff, api_base).await?; return Ok(vec![asset]); } @@ -150,14 +153,20 @@ pub async fn get_by_symbols( ..Default::default() }; - let us_equity_assets = get(client, rate_limiter, &us_equity_query, backoff_clone); + let us_equity_assets = get( + client, + rate_limiter, + &us_equity_query, + backoff_clone, + api_base, + ); let crypto_query = outgoing::asset::Asset { class: Some(Class::Crypto), ..Default::default() }; - let crypto_assets = get(client, rate_limiter, &crypto_query, backoff); + let crypto_assets = get(client, rate_limiter, &crypto_query, backoff, api_base); let (us_equity_assets, crypto_assets) = try_join!(us_equity_assets, crypto_assets)?; diff --git a/src/types/alpaca/api/incoming/bar.rs b/src/lib/types/alpaca/api/incoming/bar.rs similarity index 99% rename from src/types/alpaca/api/incoming/bar.rs rename to src/lib/types/alpaca/api/incoming/bar.rs index ece3265..e77afdf 100644 --- a/src/types/alpaca/api/incoming/bar.rs +++ b/src/lib/types/alpaca/api/incoming/bar.rs @@ -7,6 +7,7 @@ use serde::Deserialize; use std::{collections::HashMap, time::Duration}; use time::OffsetDateTime; + #[derive(Deserialize)] pub struct Bar { #[serde(rename = "t")] diff --git a/src/types/alpaca/api/incoming/calendar.rs b/src/lib/types/alpaca/api/incoming/calendar.rs similarity index 95% rename from src/types/alpaca/api/incoming/calendar.rs rename to src/lib/types/alpaca/api/incoming/calendar.rs index 80e6c80..65c540b 100644 --- a/src/types/alpaca/api/incoming/calendar.rs +++ b/src/lib/types/alpaca/api/incoming/calendar.rs @@ -1,5 +1,4 @@ use crate::{ - config::ALPACA_API_URL, types::{self, alpaca::api::outgoing}, utils::{de, time::EST_OFFSET}, }; @@ -36,13 +35,14 @@ pub async fn get( rate_limiter: &DefaultDirectRateLimiter, query: &outgoing::calendar::Calendar, backoff: Option, + api_base: &str, ) -> Result, Error> { retry_notify( backoff.unwrap_or_default(), || async { rate_limiter.until_ready().await; client - .get(&format!("{}/calendar", *ALPACA_API_URL)) + .get(&format!("https://{}.alpaca.markets/v2/calendar", api_base)) .query(query) .send() .await? diff --git a/src/types/alpaca/api/incoming/clock.rs b/src/lib/types/alpaca/api/incoming/clock.rs similarity index 93% rename from src/types/alpaca/api/incoming/clock.rs rename to src/lib/types/alpaca/api/incoming/clock.rs index afc50ac..8543283 100644 --- a/src/types/alpaca/api/incoming/clock.rs +++ b/src/lib/types/alpaca/api/incoming/clock.rs @@ -1,4 +1,3 @@ -use crate::config::ALPACA_API_URL; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; use log::warn; @@ -22,13 +21,14 @@ pub async fn get( client: &Client, rate_limiter: &DefaultDirectRateLimiter, backoff: Option, + api_base: &str, ) -> Result { retry_notify( backoff.unwrap_or_default(), || async { rate_limiter.until_ready().await; client - .get(&format!("{}/clock", *ALPACA_API_URL)) + .get(&format!("https://{}.alpaca.markets/v2/clock", api_base)) .send() .await? .error_for_status() diff --git a/src/types/alpaca/api/incoming/mod.rs b/src/lib/types/alpaca/api/incoming/mod.rs similarity index 100% rename from src/types/alpaca/api/incoming/mod.rs rename to src/lib/types/alpaca/api/incoming/mod.rs diff --git a/src/types/alpaca/api/incoming/news.rs b/src/lib/types/alpaca/api/incoming/news.rs similarity index 95% rename from src/types/alpaca/api/incoming/news.rs rename to src/lib/types/alpaca/api/incoming/news.rs index 61108f3..180c0e5 100644 --- a/src/types/alpaca/api/incoming/news.rs +++ b/src/lib/types/alpaca/api/incoming/news.rs @@ -1,8 +1,10 @@ use crate::{ - config::ALPACA_NEWS_DATA_API_URL, types::{ self, - alpaca::{api::outgoing, shared::news::normalize_html_content}, + alpaca::{ + api::{outgoing, ALPACA_NEWS_DATA_API_URL}, + shared::news::normalize_html_content, + }, }, utils::de, }; diff --git a/src/types/alpaca/api/incoming/order.rs b/src/lib/types/alpaca/api/incoming/order.rs similarity index 89% rename from src/types/alpaca/api/incoming/order.rs rename to src/lib/types/alpaca/api/incoming/order.rs index 4d17435..3f6dbf0 100644 --- a/src/types/alpaca/api/incoming/order.rs +++ b/src/lib/types/alpaca/api/incoming/order.rs @@ -1,27 +1,24 @@ -use crate::{ - config::ALPACA_API_URL, - types::alpaca::{api::outgoing, shared}, -}; +use crate::types::alpaca::{api::outgoing, shared}; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; use log::warn; use reqwest::{Client, Error}; -use std::time::Duration; - pub use shared::order::Order; +use std::time::Duration; pub async fn get( client: &Client, rate_limiter: &DefaultDirectRateLimiter, query: &outgoing::order::Order, backoff: Option, + api_base: &str, ) -> Result, Error> { retry_notify( backoff.unwrap_or_default(), || async { rate_limiter.until_ready().await; client - .get(&format!("{}/orders", *ALPACA_API_URL)) + .get(&format!("https://{}.alpaca.markets/v2/orders", api_base)) .query(query) .send() .await? diff --git a/src/types/alpaca/api/incoming/position.rs b/src/lib/types/alpaca/api/incoming/position.rs similarity index 91% rename from src/types/alpaca/api/incoming/position.rs rename to src/lib/types/alpaca/api/incoming/position.rs index 5b733fa..2342eb1 100644 --- a/src/types/alpaca/api/incoming/position.rs +++ b/src/lib/types/alpaca/api/incoming/position.rs @@ -1,5 +1,4 @@ use crate::{ - config::ALPACA_API_URL, types::alpaca::shared::{ self, asset::{Class, Exchange}, @@ -66,17 +65,20 @@ pub struct Position { pub asset_marginable: bool, } +pub const ALPACA_API_URL_TEMPLATE: &str = ""; + pub async fn get( client: &Client, rate_limiter: &DefaultDirectRateLimiter, backoff: Option, + api_base: &str, ) -> Result, reqwest::Error> { retry_notify( backoff.unwrap_or_default(), || async { rate_limiter.until_ready().await; client - .get(&format!("{}/positions", *ALPACA_API_URL)) + .get(&format!("https://{}.alpaca.markets/v2/positions", api_base)) .send() .await? .error_for_status() @@ -106,13 +108,17 @@ pub async fn get_by_symbol( rate_limiter: &DefaultDirectRateLimiter, symbol: &str, backoff: Option, + api_base: &str, ) -> Result, reqwest::Error> { retry_notify( backoff.unwrap_or_default(), || async { rate_limiter.until_ready().await; let response = client - .get(&format!("{}/positions/{}", *ALPACA_API_URL, symbol)) + .get(&format!( + "https://{}.alpaca.markets/v2/positions/{}", + api_base, symbol + )) .send() .await?; @@ -149,16 +155,17 @@ pub async fn get_by_symbols( rate_limiter: &DefaultDirectRateLimiter, symbols: &[String], backoff: Option, + api_base: &str, ) -> Result, reqwest::Error> { if symbols.len() < 2 { let symbol = symbols.first().unwrap(); - let position = get_by_symbol(client, rate_limiter, symbol, backoff).await?; + let position = get_by_symbol(client, rate_limiter, symbol, backoff, api_base).await?; return Ok(position.into_iter().collect()); } let symbols = symbols.iter().collect::>(); - let positions = get(client, rate_limiter, backoff).await?; + let positions = get(client, rate_limiter, backoff, api_base).await?; Ok(positions .into_iter() diff --git a/src/lib/types/alpaca/api/mod.rs b/src/lib/types/alpaca/api/mod.rs new file mode 100644 index 0000000..7e597c0 --- /dev/null +++ b/src/lib/types/alpaca/api/mod.rs @@ -0,0 +1,6 @@ +pub mod incoming; +pub mod outgoing; + +pub const ALPACA_US_EQUITY_DATA_API_URL: &str = "https://data.alpaca.markets/v2/stocks/bars"; +pub const ALPACA_CRYPTO_DATA_API_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars"; +pub const ALPACA_NEWS_DATA_API_URL: &str = "https://data.alpaca.markets/v1beta1/news"; diff --git a/src/types/alpaca/api/outgoing/asset.rs b/src/lib/types/alpaca/api/outgoing/asset.rs similarity index 100% rename from src/types/alpaca/api/outgoing/asset.rs rename to src/lib/types/alpaca/api/outgoing/asset.rs diff --git a/src/types/alpaca/api/outgoing/bar.rs b/src/lib/types/alpaca/api/outgoing/bar.rs similarity index 97% rename from src/types/alpaca/api/outgoing/bar.rs rename to src/lib/types/alpaca/api/outgoing/bar.rs index 4300292..09e851f 100644 --- a/src/types/alpaca/api/outgoing/bar.rs +++ b/src/lib/types/alpaca/api/outgoing/bar.rs @@ -1,5 +1,4 @@ use crate::{ - config::ALPACA_SOURCE, types::alpaca::shared::{Sort, Source}, utils::{ser, ONE_MINUTE}, }; @@ -58,7 +57,7 @@ impl Default for UsEquity { limit: Some(MAX_LIMIT), adjustment: Some(Adjustment::All), asof: None, - feed: Some(*ALPACA_SOURCE), + feed: Some(Source::Iex), currency: None, page_token: None, sort: Some(Sort::Asc), diff --git a/src/types/alpaca/api/outgoing/calendar.rs b/src/lib/types/alpaca/api/outgoing/calendar.rs similarity index 100% rename from src/types/alpaca/api/outgoing/calendar.rs rename to src/lib/types/alpaca/api/outgoing/calendar.rs diff --git a/src/types/alpaca/api/outgoing/mod.rs b/src/lib/types/alpaca/api/outgoing/mod.rs similarity index 100% rename from src/types/alpaca/api/outgoing/mod.rs rename to src/lib/types/alpaca/api/outgoing/mod.rs diff --git a/src/types/alpaca/api/outgoing/news.rs b/src/lib/types/alpaca/api/outgoing/news.rs similarity index 100% rename from src/types/alpaca/api/outgoing/news.rs rename to src/lib/types/alpaca/api/outgoing/news.rs diff --git a/src/types/alpaca/api/outgoing/order.rs b/src/lib/types/alpaca/api/outgoing/order.rs similarity index 100% rename from src/types/alpaca/api/outgoing/order.rs rename to src/lib/types/alpaca/api/outgoing/order.rs diff --git a/src/types/alpaca/mod.rs b/src/lib/types/alpaca/mod.rs similarity index 97% rename from src/types/alpaca/mod.rs rename to src/lib/types/alpaca/mod.rs index e4e0157..eaf5691 100644 --- a/src/types/alpaca/mod.rs +++ b/src/lib/types/alpaca/mod.rs @@ -1,3 +1,4 @@ pub mod api; pub mod shared; pub mod websocket; + diff --git a/src/types/alpaca/shared/asset.rs b/src/lib/types/alpaca/shared/asset.rs similarity index 100% rename from src/types/alpaca/shared/asset.rs rename to src/lib/types/alpaca/shared/asset.rs diff --git a/src/types/alpaca/shared/mod.rs b/src/lib/types/alpaca/shared/mod.rs similarity index 100% rename from src/types/alpaca/shared/mod.rs rename to src/lib/types/alpaca/shared/mod.rs diff --git a/src/types/alpaca/shared/mode.rs b/src/lib/types/alpaca/shared/mode.rs similarity index 100% rename from src/types/alpaca/shared/mode.rs rename to src/lib/types/alpaca/shared/mode.rs diff --git a/src/types/alpaca/shared/news.rs b/src/lib/types/alpaca/shared/news.rs similarity index 100% rename from src/types/alpaca/shared/news.rs rename to src/lib/types/alpaca/shared/news.rs diff --git a/src/types/alpaca/shared/order.rs b/src/lib/types/alpaca/shared/order.rs similarity index 100% rename from src/types/alpaca/shared/order.rs rename to src/lib/types/alpaca/shared/order.rs diff --git a/src/types/alpaca/shared/sort.rs b/src/lib/types/alpaca/shared/sort.rs similarity index 100% rename from src/types/alpaca/shared/sort.rs rename to src/lib/types/alpaca/shared/sort.rs diff --git a/src/types/alpaca/shared/source.rs b/src/lib/types/alpaca/shared/source.rs similarity index 100% rename from src/types/alpaca/shared/source.rs rename to src/lib/types/alpaca/shared/source.rs diff --git a/src/types/alpaca/websocket/auth.rs b/src/lib/types/alpaca/websocket/auth.rs similarity index 100% rename from src/types/alpaca/websocket/auth.rs rename to src/lib/types/alpaca/websocket/auth.rs diff --git a/src/types/alpaca/websocket/data/incoming/bar.rs b/src/lib/types/alpaca/websocket/data/incoming/bar.rs similarity index 100% rename from src/types/alpaca/websocket/data/incoming/bar.rs rename to src/lib/types/alpaca/websocket/data/incoming/bar.rs diff --git a/src/types/alpaca/websocket/data/incoming/error.rs b/src/lib/types/alpaca/websocket/data/incoming/error.rs similarity index 100% rename from src/types/alpaca/websocket/data/incoming/error.rs rename to src/lib/types/alpaca/websocket/data/incoming/error.rs diff --git a/src/types/alpaca/websocket/data/incoming/mod.rs b/src/lib/types/alpaca/websocket/data/incoming/mod.rs similarity index 100% rename from src/types/alpaca/websocket/data/incoming/mod.rs rename to src/lib/types/alpaca/websocket/data/incoming/mod.rs diff --git a/src/types/alpaca/websocket/data/incoming/news.rs b/src/lib/types/alpaca/websocket/data/incoming/news.rs similarity index 100% rename from src/types/alpaca/websocket/data/incoming/news.rs rename to src/lib/types/alpaca/websocket/data/incoming/news.rs diff --git a/src/types/alpaca/websocket/data/incoming/status.rs b/src/lib/types/alpaca/websocket/data/incoming/status.rs similarity index 100% rename from src/types/alpaca/websocket/data/incoming/status.rs rename to src/lib/types/alpaca/websocket/data/incoming/status.rs diff --git a/src/types/alpaca/websocket/data/incoming/subscription.rs b/src/lib/types/alpaca/websocket/data/incoming/subscription.rs similarity index 100% rename from src/types/alpaca/websocket/data/incoming/subscription.rs rename to src/lib/types/alpaca/websocket/data/incoming/subscription.rs diff --git a/src/types/alpaca/websocket/data/incoming/success.rs b/src/lib/types/alpaca/websocket/data/incoming/success.rs similarity index 100% rename from src/types/alpaca/websocket/data/incoming/success.rs rename to src/lib/types/alpaca/websocket/data/incoming/success.rs diff --git a/src/types/alpaca/websocket/data/mod.rs b/src/lib/types/alpaca/websocket/data/mod.rs similarity index 88% rename from src/types/alpaca/websocket/data/mod.rs rename to src/lib/types/alpaca/websocket/data/mod.rs index 7d9a289..ff64e5b 100644 --- a/src/types/alpaca/websocket/data/mod.rs +++ b/src/lib/types/alpaca/websocket/data/mod.rs @@ -1,10 +1,7 @@ pub mod incoming; pub mod outgoing; -use crate::{ - config::{ALPACA_API_KEY, ALPACA_API_SECRET}, - types::alpaca::websocket, -}; +use crate::types::alpaca::websocket; use core::panic; use futures_util::{ stream::{SplitSink, SplitStream}, @@ -17,6 +14,8 @@ use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream}; pub async fn authenticate( sink: &mut SplitSink>, Message>, stream: &mut SplitStream>>, + api_key: String, + api_secret: String, ) { match stream.next().await.unwrap().unwrap() { Message::Text(data) @@ -32,8 +31,8 @@ pub async fn authenticate( sink.send(Message::Text( to_string(&websocket::data::outgoing::Message::Auth( websocket::auth::Message { - key: (*ALPACA_API_KEY).clone(), - secret: (*ALPACA_API_SECRET).clone(), + key: api_key, + secret: api_secret, }, )) .unwrap(), diff --git a/src/types/alpaca/websocket/data/outgoing/mod.rs b/src/lib/types/alpaca/websocket/data/outgoing/mod.rs similarity index 100% rename from src/types/alpaca/websocket/data/outgoing/mod.rs rename to src/lib/types/alpaca/websocket/data/outgoing/mod.rs diff --git a/src/types/alpaca/websocket/data/outgoing/subscribe.rs b/src/lib/types/alpaca/websocket/data/outgoing/subscribe.rs similarity index 100% rename from src/types/alpaca/websocket/data/outgoing/subscribe.rs rename to src/lib/types/alpaca/websocket/data/outgoing/subscribe.rs diff --git a/src/lib/types/alpaca/websocket/mod.rs b/src/lib/types/alpaca/websocket/mod.rs new file mode 100644 index 0000000..e8094c7 --- /dev/null +++ b/src/lib/types/alpaca/websocket/mod.rs @@ -0,0 +1,8 @@ +pub mod auth; +pub mod data; +pub mod trading; + +pub const ALPACA_US_EQUITY_DATA_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2"; +pub const ALPACA_CRYPTO_DATA_WEBSOCKET_URL: &str = + "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; +pub const ALPACA_NEWS_DATA_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta1/news"; diff --git a/src/types/alpaca/websocket/trading/incoming/auth.rs b/src/lib/types/alpaca/websocket/trading/incoming/auth.rs similarity index 100% rename from src/types/alpaca/websocket/trading/incoming/auth.rs rename to src/lib/types/alpaca/websocket/trading/incoming/auth.rs diff --git a/src/types/alpaca/websocket/trading/incoming/mod.rs b/src/lib/types/alpaca/websocket/trading/incoming/mod.rs similarity index 100% rename from src/types/alpaca/websocket/trading/incoming/mod.rs rename to src/lib/types/alpaca/websocket/trading/incoming/mod.rs diff --git a/src/types/alpaca/websocket/trading/incoming/order.rs b/src/lib/types/alpaca/websocket/trading/incoming/order.rs similarity index 100% rename from src/types/alpaca/websocket/trading/incoming/order.rs rename to src/lib/types/alpaca/websocket/trading/incoming/order.rs diff --git a/src/types/alpaca/websocket/trading/incoming/subscription.rs b/src/lib/types/alpaca/websocket/trading/incoming/subscription.rs similarity index 100% rename from src/types/alpaca/websocket/trading/incoming/subscription.rs rename to src/lib/types/alpaca/websocket/trading/incoming/subscription.rs diff --git a/src/types/alpaca/websocket/trading/mod.rs b/src/lib/types/alpaca/websocket/trading/mod.rs similarity index 92% rename from src/types/alpaca/websocket/trading/mod.rs rename to src/lib/types/alpaca/websocket/trading/mod.rs index 6de6dd6..3ae4227 100644 --- a/src/types/alpaca/websocket/trading/mod.rs +++ b/src/lib/types/alpaca/websocket/trading/mod.rs @@ -1,10 +1,7 @@ pub mod incoming; pub mod outgoing; -use crate::{ - config::{ALPACA_API_KEY, ALPACA_API_SECRET}, - types::alpaca::websocket, -}; +use crate::types::alpaca::websocket; use core::panic; use futures_util::{ stream::{SplitSink, SplitStream}, @@ -17,12 +14,14 @@ use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream}; pub async fn authenticate( sink: &mut SplitSink>, Message>, stream: &mut SplitStream>>, + api_key: String, + api_secret: String, ) { sink.send(Message::Text( to_string(&websocket::trading::outgoing::Message::Auth( websocket::auth::Message { - key: (*ALPACA_API_KEY).clone(), - secret: (*ALPACA_API_SECRET).clone(), + key: api_key, + secret: api_secret, }, )) .unwrap(), diff --git a/src/types/alpaca/websocket/trading/outgoing/mod.rs b/src/lib/types/alpaca/websocket/trading/outgoing/mod.rs similarity index 100% rename from src/types/alpaca/websocket/trading/outgoing/mod.rs rename to src/lib/types/alpaca/websocket/trading/outgoing/mod.rs diff --git a/src/types/alpaca/websocket/trading/outgoing/subscribe.rs b/src/lib/types/alpaca/websocket/trading/outgoing/subscribe.rs similarity index 100% rename from src/types/alpaca/websocket/trading/outgoing/subscribe.rs rename to src/lib/types/alpaca/websocket/trading/outgoing/subscribe.rs diff --git a/src/types/asset.rs b/src/lib/types/asset.rs similarity index 100% rename from src/types/asset.rs rename to src/lib/types/asset.rs diff --git a/src/types/backfill.rs b/src/lib/types/backfill.rs similarity index 100% rename from src/types/backfill.rs rename to src/lib/types/backfill.rs diff --git a/src/types/bar.rs b/src/lib/types/bar.rs similarity index 100% rename from src/types/bar.rs rename to src/lib/types/bar.rs diff --git a/src/types/calendar.rs b/src/lib/types/calendar.rs similarity index 100% rename from src/types/calendar.rs rename to src/lib/types/calendar.rs diff --git a/src/types/mod.rs b/src/lib/types/mod.rs similarity index 100% rename from src/types/mod.rs rename to src/lib/types/mod.rs diff --git a/src/types/news.rs b/src/lib/types/news.rs similarity index 100% rename from src/types/news.rs rename to src/lib/types/news.rs diff --git a/src/types/order.rs b/src/lib/types/order.rs similarity index 100% rename from src/types/order.rs rename to src/lib/types/order.rs diff --git a/src/utils/backoff.rs b/src/lib/utils/backoff.rs similarity index 100% rename from src/utils/backoff.rs rename to src/lib/utils/backoff.rs diff --git a/src/utils/de.rs b/src/lib/utils/de.rs similarity index 100% rename from src/utils/de.rs rename to src/lib/utils/de.rs diff --git a/src/utils/macros.rs b/src/lib/utils/macros.rs similarity index 100% rename from src/utils/macros.rs rename to src/lib/utils/macros.rs diff --git a/src/utils/mod.rs b/src/lib/utils/mod.rs similarity index 100% rename from src/utils/mod.rs rename to src/lib/utils/mod.rs diff --git a/src/utils/ser.rs b/src/lib/utils/ser.rs similarity index 100% rename from src/utils/ser.rs rename to src/lib/utils/ser.rs diff --git a/src/utils/time.rs b/src/lib/utils/time.rs similarity index 100% rename from src/utils/time.rs rename to src/lib/utils/time.rs diff --git a/src/main.rs b/src/main.rs index 41f6f58..b34c273 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,16 +3,14 @@ #![feature(hash_extract_if)] mod config; -mod database; mod init; mod routes; mod threads; -mod types; -mod utils; use config::Config; use dotenv::dotenv; use log4rs::config::Deserializers; +use qrust::{create_send_await, database}; use tokio::{join, spawn, sync::mpsc, try_join}; #[tokio::main] diff --git a/src/routes/assets.rs b/src/routes/assets.rs index 3e99ede..f24170b 100644 --- a/src/routes/assets.rs +++ b/src/routes/assets.rs @@ -1,10 +1,10 @@ use crate::{ - config::Config, + config::{Config, ALPACA_API_BASE}, create_send_await, database, threads, - types::{alpaca, Asset}, }; use axum::{extract::Path, Extension, Json}; use http::StatusCode; +use qrust::types::{alpaca, Asset}; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, @@ -74,6 +74,7 @@ pub async fn add( &config.alpaca_rate_limiter, &request.symbols, None, + &ALPACA_API_BASE, ) .await .map_err(|e| { @@ -147,6 +148,7 @@ pub async fn add_symbol( &config.alpaca_rate_limiter, &symbol, None, + &ALPACA_API_BASE, ) .await .map_err(|e| { diff --git a/src/threads/clock.rs b/src/threads/clock.rs index 165131c..819d964 100644 --- a/src/threads/clock.rs +++ b/src/threads/clock.rs @@ -1,10 +1,12 @@ use crate::{ - config::Config, + config::{Config, ALPACA_API_BASE}, database, +}; +use log::info; +use qrust::{ types::{alpaca, Calendar}, utils::{backoff, duration_until}, }; -use log::info; use std::sync::Arc; use time::OffsetDateTime; use tokio::{join, sync::mpsc, time::sleep}; @@ -42,6 +44,7 @@ pub async fn run(config: Arc, sender: mpsc::Sender) { &config.alpaca_client, &config.alpaca_rate_limiter, Some(backoff::infinite()), + &ALPACA_API_BASE, ) .await .unwrap() @@ -53,6 +56,7 @@ pub async fn run(config: Arc, sender: mpsc::Sender) { &config.alpaca_rate_limiter, &alpaca::api::outgoing::calendar::Calendar::default(), Some(backoff::infinite()), + &ALPACA_API_BASE, ) .await .unwrap() diff --git a/src/threads/data/backfill/bars.rs b/src/threads/data/backfill/bars.rs index aba5845..1c6f900 100644 --- a/src/threads/data/backfill/bars.rs +++ b/src/threads/data/backfill/bars.rs @@ -2,6 +2,11 @@ use super::Job; use crate::{ config::{Config, ALPACA_SOURCE}, database, +}; +use async_trait::async_trait; +use itertools::{Either, Itertools}; +use log::{error, info}; +use qrust::{ types::{ alpaca::{ self, @@ -11,9 +16,6 @@ use crate::{ }, utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE}, }; -use async_trait::async_trait; -use itertools::{Either, Itertools}; -use log::{error, info}; use std::{collections::HashMap, sync::Arc}; use time::OffsetDateTime; use tokio::time::sleep; @@ -41,6 +43,7 @@ pub fn us_equity_query_constructor( end: Some(fetch_to), page_token: next_page_token, sort: Some(Sort::Asc), + feed: Some(*ALPACA_SOURCE), ..Default::default() }) } diff --git a/src/threads/data/backfill/mod.rs b/src/threads/data/backfill/mod.rs index a483f40..ff32276 100644 --- a/src/threads/data/backfill/mod.rs +++ b/src/threads/data/backfill/mod.rs @@ -2,14 +2,17 @@ mod bars; mod news; use super::ThreadType; -use crate::{ - config::{Config, ALPACA_CRYPTO_DATA_API_URL, ALPACA_STOCK_DATA_API_URL}, - types::{Backfill, Class}, - utils::{last_minute, ONE_SECOND}, -}; +use crate::config::Config; use async_trait::async_trait; use itertools::Itertools; use log::{info, warn}; +use qrust::{ + types::{ + alpaca::api::{ALPACA_CRYPTO_DATA_API_URL, ALPACA_US_EQUITY_DATA_API_URL}, + Backfill, Class, + }, + utils::{last_minute, ONE_SECOND}, +}; use std::{collections::HashMap, sync::Arc}; use time::OffsetDateTime; use tokio::{ @@ -224,7 +227,7 @@ pub fn create_handler(thread_type: ThreadType, config: Arc) -> Box Box::new(bars::Handler { config, - data_url: ALPACA_STOCK_DATA_API_URL, + data_url: ALPACA_US_EQUITY_DATA_API_URL, api_query_constructor: bars::us_equity_query_constructor, }), ThreadType::Bars(Class::Crypto) => Box::new(bars::Handler { diff --git a/src/threads/data/backfill/news.rs b/src/threads/data/backfill/news.rs index 34a7745..6aa1303 100644 --- a/src/threads/data/backfill/news.rs +++ b/src/threads/data/backfill/news.rs @@ -2,6 +2,12 @@ use super::Job; use crate::{ config::{Config, ALPACA_SOURCE, BERT_MAX_INPUTS}, database, +}; +use async_trait::async_trait; +use futures_util::future::join_all; +use itertools::{Either, Itertools}; +use log::{error, info}; +use qrust::{ types::{ alpaca::{ self, @@ -12,10 +18,6 @@ use crate::{ }, utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE}, }; -use async_trait::async_trait; -use futures_util::future::join_all; -use itertools::{Either, Itertools}; -use log::{error, info}; use std::{collections::HashMap, sync::Arc}; use tokio::{task::block_in_place, time::sleep}; diff --git a/src/threads/data/mod.rs b/src/threads/data/mod.rs index 82b25d8..15fc7f7 100644 --- a/src/threads/data/mod.rs +++ b/src/threads/data/mod.rs @@ -3,16 +3,22 @@ mod websocket; use super::clock; use crate::{ - config::{ - Config, ALPACA_CRYPTO_DATA_WEBSOCKET_URL, ALPACA_NEWS_DATA_WEBSOCKET_URL, ALPACA_SOURCE, - ALPACA_STOCK_DATA_WEBSOCKET_URL, - }, + config::{Config, ALPACA_API_BASE, ALPACA_API_KEY, ALPACA_API_SECRET, ALPACA_SOURCE}, create_send_await, database, - types::{alpaca, Asset, Class}, }; use futures_util::StreamExt; use itertools::{Either, Itertools}; use log::error; +use qrust::types::{ + alpaca::{ + self, + websocket::{ + ALPACA_CRYPTO_DATA_WEBSOCKET_URL, ALPACA_NEWS_DATA_WEBSOCKET_URL, + ALPACA_US_EQUITY_DATA_WEBSOCKET_URL, + }, + }, + Asset, Class, +}; use std::{collections::HashMap, sync::Arc}; use tokio::{ join, select, spawn, @@ -103,7 +109,7 @@ async fn init_thread( ) { let websocket_url = match thread_type { ThreadType::Bars(Class::UsEquity) => { - format!("{}/{}", ALPACA_STOCK_DATA_WEBSOCKET_URL, *ALPACA_SOURCE) + format!("{}/{}", ALPACA_US_EQUITY_DATA_WEBSOCKET_URL, *ALPACA_SOURCE) } ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_DATA_WEBSOCKET_URL.into(), ThreadType::News => ALPACA_NEWS_DATA_WEBSOCKET_URL.into(), @@ -111,7 +117,13 @@ async fn init_thread( let (websocket, _) = connect_async(websocket_url).await.unwrap(); let (mut websocket_sink, mut websocket_stream) = websocket.split(); - alpaca::websocket::data::authenticate(&mut websocket_sink, &mut websocket_stream).await; + alpaca::websocket::data::authenticate( + &mut websocket_sink, + &mut websocket_stream, + (*ALPACA_API_KEY).to_string(), + (*ALPACA_API_SECRET).to_string(), + ) + .await; let (backfill_sender, backfill_receiver) = mpsc::channel(100); spawn(backfill::run( @@ -207,6 +219,7 @@ async fn handle_message( &config.alpaca_rate_limiter, &symbols, None, + &ALPACA_API_BASE, ) .await .unwrap() @@ -221,6 +234,7 @@ async fn handle_message( &config.alpaca_rate_limiter, &symbols, None, + &ALPACA_API_BASE, ) .await .unwrap() diff --git a/src/threads/data/websocket/bars.rs b/src/threads/data/websocket/bars.rs index 4413467..0a616be 100644 --- a/src/threads/data/websocket/bars.rs +++ b/src/threads/data/websocket/bars.rs @@ -1,11 +1,8 @@ use super::Pending; -use crate::{ - config::Config, - database, - types::{alpaca::websocket, Bar}, -}; +use crate::{config::Config, database}; use async_trait::async_trait; use log::{debug, error, info}; +use qrust::types::{alpaca::websocket, Bar}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; diff --git a/src/threads/data/websocket/mod.rs b/src/threads/data/websocket/mod.rs index 5181a0c..47e1cff 100644 --- a/src/threads/data/websocket/mod.rs +++ b/src/threads/data/websocket/mod.rs @@ -2,10 +2,7 @@ mod bars; mod news; use super::ThreadType; -use crate::{ - config::Config, - types::{alpaca::websocket, Class}, -}; +use crate::config::Config; use async_trait::async_trait; use futures_util::{ future::join_all, @@ -13,6 +10,7 @@ use futures_util::{ SinkExt, StreamExt, }; use log::error; +use qrust::types::{alpaca::websocket, Class}; use serde_json::{from_str, to_string}; use std::{collections::HashMap, sync::Arc}; use tokio::{ diff --git a/src/threads/data/websocket/news.rs b/src/threads/data/websocket/news.rs index 7512614..0975378 100644 --- a/src/threads/data/websocket/news.rs +++ b/src/threads/data/websocket/news.rs @@ -1,11 +1,8 @@ use super::Pending; -use crate::{ - config::Config, - database, - types::{alpaca::websocket, news::Prediction, News}, -}; +use crate::{config::Config, database}; use async_trait::async_trait; use log::{debug, error, info}; +use qrust::types::{alpaca::websocket, news::Prediction, News}; use std::{collections::HashMap, sync::Arc}; use tokio::{sync::RwLock, task::block_in_place}; diff --git a/src/threads/trading/mod.rs b/src/threads/trading/mod.rs index ad88d58..37279ad 100644 --- a/src/threads/trading/mod.rs +++ b/src/threads/trading/mod.rs @@ -1,19 +1,26 @@ mod websocket; -use crate::{ - config::{Config, ALPACA_WEBSOCKET_URL}, - types::alpaca, -}; +use crate::config::{Config, ALPACA_API_BASE, ALPACA_API_KEY, ALPACA_API_SECRET}; use futures_util::StreamExt; +use qrust::types::alpaca; use std::sync::Arc; use tokio::spawn; use tokio_tungstenite::connect_async; pub async fn run(config: Arc) { - let (websocket, _) = connect_async(&*ALPACA_WEBSOCKET_URL).await.unwrap(); + let (websocket, _) = + connect_async(&format!("wss://{}.alpaca.markets/stream", *ALPACA_API_BASE)) + .await + .unwrap(); let (mut websocket_sink, mut websocket_stream) = websocket.split(); - alpaca::websocket::trading::authenticate(&mut websocket_sink, &mut websocket_stream).await; + alpaca::websocket::trading::authenticate( + &mut websocket_sink, + &mut websocket_stream, + (*ALPACA_API_KEY).to_string(), + (*ALPACA_API_SECRET).to_string(), + ) + .await; alpaca::websocket::trading::subscribe(&mut websocket_sink, &mut websocket_stream).await; spawn(websocket::run(config, websocket_stream)); diff --git a/src/threads/trading/websocket.rs b/src/threads/trading/websocket.rs index 0f2de3f..aea886c 100644 --- a/src/threads/trading/websocket.rs +++ b/src/threads/trading/websocket.rs @@ -1,10 +1,7 @@ -use crate::{ - config::Config, - database, - types::{alpaca::websocket, Order}, -}; +use crate::{config::Config, database}; use futures_util::{stream::SplitStream, StreamExt}; use log::{debug, error}; +use qrust::types::{alpaca::websocket, Order}; use serde_json::from_str; use std::sync::Arc; use tokio::{net::TcpStream, spawn}; diff --git a/src/types/alpaca/api/mod.rs b/src/types/alpaca/api/mod.rs deleted file mode 100644 index 9aac270..0000000 --- a/src/types/alpaca/api/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod incoming; -pub mod outgoing; diff --git a/src/types/alpaca/websocket/mod.rs b/src/types/alpaca/websocket/mod.rs deleted file mode 100644 index e0a418d..0000000 --- a/src/types/alpaca/websocket/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod auth; -pub mod data; -pub mod trading;