Add shared lib

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-03-10 18:28:40 +00:00
parent a15fd2c3c9
commit 8c7ee3d12d
82 changed files with 175 additions and 135 deletions

View File

@@ -3,6 +3,14 @@ name = "qrust"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
[lib]
name = "qrust"
path = "src/lib/mod.rs"
[[bin]]
name = "qrust"
path = "src/main.rs"
[profile.release] [profile.release]
panic = 'abort' panic = 'abort'
strip = true strip = true

View File

@@ -1,6 +1,6 @@
use crate::types::alpaca::shared::{Mode, Source};
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use qrust::types::alpaca::shared::{Mode, Source};
use reqwest::{ use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue}, header::{HeaderMap, HeaderName, HeaderValue},
Client, Client,
@@ -15,42 +15,23 @@ use rust_bert::{
use std::{env, num::NonZeroU32, path::PathBuf, sync::Arc}; use std::{env, num::NonZeroU32, path::PathBuf, sync::Arc};
use tokio::sync::{Mutex, Semaphore}; use tokio::sync::{Mutex, Semaphore};
pub const ALPACA_STOCK_DATA_API_URL: &str = "https://data.alpaca.markets/v2/stocks/bars";
pub const ALPACA_CRYPTO_DATA_API_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars";
pub const ALPACA_NEWS_DATA_API_URL: &str = "https://data.alpaca.markets/v1beta1/news";
pub const ALPACA_STOCK_DATA_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2";
pub const ALPACA_CRYPTO_DATA_WEBSOCKET_URL: &str =
"wss://stream.data.alpaca.markets/v1beta3/crypto/us";
pub const ALPACA_NEWS_DATA_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta1/news";
lazy_static! { lazy_static! {
pub static ref ALPACA_MODE: Mode = env::var("ALPACA_MODE") pub static ref ALPACA_MODE: Mode = env::var("ALPACA_MODE")
.expect("ALPACA_MODE must be set.") .expect("ALPACA_MODE must be set.")
.parse() .parse()
.expect("ALPACA_MODE must be 'live' or 'paper'"); .expect("ALPACA_MODE must be 'live' or 'paper'");
pub static ref ALPACA_API_BASE: String = match *ALPACA_MODE {
Mode::Live => String::from("api"),
Mode::Paper => String::from("paper-api"),
};
pub static ref ALPACA_SOURCE: Source = env::var("ALPACA_SOURCE") pub static ref ALPACA_SOURCE: Source = env::var("ALPACA_SOURCE")
.expect("ALPACA_SOURCE must be set.") .expect("ALPACA_SOURCE must be set.")
.parse() .parse()
.expect("ALPACA_SOURCE must be 'iex', 'sip', or 'otc'"); .expect("ALPACA_SOURCE must be 'iex', 'sip', or 'otc'");
pub static ref ALPACA_API_KEY: String = env::var("ALPACA_API_KEY").expect("ALPACA_API_KEY must be set."); pub static ref ALPACA_API_KEY: String =
pub static ref ALPACA_API_SECRET: String = env::var("ALPACA_API_SECRET").expect("ALPACA_API_SECRET must be set."); env::var("ALPACA_API_KEY").expect("ALPACA_API_KEY must be set.");
#[derive(Debug)] pub static ref ALPACA_API_SECRET: String =
pub static ref ALPACA_API_URL: String = format!( env::var("ALPACA_API_SECRET").expect("ALPACA_API_SECRET must be set.");
"https://{}.alpaca.markets/v2",
match *ALPACA_MODE {
Mode::Live => String::from("api"),
Mode::Paper => String::from("paper-api"),
}
);
#[derive(Debug)]
pub static ref ALPACA_WEBSOCKET_URL: String = format!(
"wss://{}.alpaca.markets/stream",
match *ALPACA_MODE {
Mode::Live => String::from("api"),
Mode::Paper => String::from("paper-api"),
}
);
pub static ref BERT_MAX_INPUTS: usize = env::var("BERT_MAX_INPUTS") pub static ref BERT_MAX_INPUTS: usize = env::var("BERT_MAX_INPUTS")
.expect("BERT_MAX_INPUTS must be set.") .expect("BERT_MAX_INPUTS must be set.")
.parse() .parse()

View File

@@ -1,9 +1,9 @@
use crate::{ use crate::{
config::{Config, ALPACA_MODE}, config::{Config, ALPACA_API_BASE},
database, database,
types::alpaca,
}; };
use log::{info, warn}; use log::{info, warn};
use qrust::types::alpaca;
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::join; use tokio::join;
@@ -13,6 +13,7 @@ pub async fn check_account(config: &Arc<Config>) {
&config.alpaca_client, &config.alpaca_client,
&config.alpaca_rate_limiter, &config.alpaca_rate_limiter,
None, None,
&ALPACA_API_BASE,
) )
.await .await
.unwrap(); .unwrap();
@@ -35,7 +36,7 @@ pub async fn check_account(config: &Arc<Config>) {
warn!( warn!(
"qrust active on {} account with {} {}, avoid transferring funds without shutting down.", "qrust active on {} account with {} {}, avoid transferring funds without shutting down.",
*ALPACA_MODE, account.currency, account.cash *ALPACA_API_BASE, account.currency, account.cash
); );
} }
@@ -54,6 +55,7 @@ pub async fn rehydrate_orders(config: &Arc<Config>) {
..Default::default() ..Default::default()
}, },
None, None,
&ALPACA_API_BASE,
) )
.await .await
.ok() .ok()
@@ -87,6 +89,7 @@ pub async fn rehydrate_positions(config: &Arc<Config>) {
&config.alpaca_client, &config.alpaca_client,
&config.alpaca_rate_limiter, &config.alpaca_rate_limiter,
None, None,
&ALPACA_API_BASE,
) )
.await .await
.unwrap() .unwrap()

3
src/lib/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod database;
pub mod types;
pub mod utils;

View File

@@ -1,4 +1,3 @@
use crate::config::ALPACA_API_URL;
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
use log::warn; use log::warn;
@@ -84,13 +83,14 @@ pub async fn get(
client: &Client, client: &Client,
rate_limiter: &DefaultDirectRateLimiter, rate_limiter: &DefaultDirectRateLimiter,
backoff: Option<ExponentialBackoff>, backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Account, Error> { ) -> Result<Account, Error> {
retry_notify( retry_notify(
backoff.unwrap_or_default(), backoff.unwrap_or_default(),
|| async { || async {
rate_limiter.until_ready().await; rate_limiter.until_ready().await;
client client
.get(&format!("{}/account", *ALPACA_API_URL)) .get(&format!("https://{}.alpaca.markets/v2/account", api_base))
.send() .send()
.await? .await?
.error_for_status() .error_for_status()

View File

@@ -1,13 +1,10 @@
use super::position::Position; use super::position::Position;
use crate::{ use crate::types::{
config::ALPACA_API_URL,
types::{
self, self,
alpaca::{ alpaca::{
api::outgoing, api::outgoing,
shared::asset::{Class, Exchange, Status}, shared::asset::{Class, Exchange, Status},
}, },
},
}; };
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
@@ -57,13 +54,14 @@ pub async fn get(
rate_limiter: &DefaultDirectRateLimiter, rate_limiter: &DefaultDirectRateLimiter,
query: &outgoing::asset::Asset, query: &outgoing::asset::Asset,
backoff: Option<ExponentialBackoff>, backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Vec<Asset>, Error> { ) -> Result<Vec<Asset>, Error> {
retry_notify( retry_notify(
backoff.unwrap_or_default(), backoff.unwrap_or_default(),
|| async { || async {
rate_limiter.until_ready().await; rate_limiter.until_ready().await;
client client
.get(&format!("{}/assets", *ALPACA_API_URL)) .get(&format!("https://{}.alpaca.markets/v2/assets", api_base))
.query(query) .query(query)
.send() .send()
.await? .await?
@@ -96,13 +94,17 @@ pub async fn get_by_symbol(
rate_limiter: &DefaultDirectRateLimiter, rate_limiter: &DefaultDirectRateLimiter,
symbol: &str, symbol: &str,
backoff: Option<ExponentialBackoff>, backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Asset, Error> { ) -> Result<Asset, Error> {
retry_notify( retry_notify(
backoff.unwrap_or_default(), backoff.unwrap_or_default(),
|| async { || async {
rate_limiter.until_ready().await; rate_limiter.until_ready().await;
client client
.get(&format!("{}/assets/{}", *ALPACA_API_URL, symbol)) .get(&format!(
"https://{}.alpaca.markets/v2/assets/{}",
api_base, symbol
))
.send() .send()
.await? .await?
.error_for_status() .error_for_status()
@@ -134,10 +136,11 @@ pub async fn get_by_symbols(
rate_limiter: &DefaultDirectRateLimiter, rate_limiter: &DefaultDirectRateLimiter,
symbols: &[String], symbols: &[String],
backoff: Option<ExponentialBackoff>, backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Vec<Asset>, Error> { ) -> Result<Vec<Asset>, Error> {
if symbols.len() < 2 { if symbols.len() < 2 {
let symbol = symbols.first().unwrap(); let symbol = symbols.first().unwrap();
let asset = get_by_symbol(client, rate_limiter, symbol, backoff).await?; let asset = get_by_symbol(client, rate_limiter, symbol, backoff, api_base).await?;
return Ok(vec![asset]); return Ok(vec![asset]);
} }
@@ -150,14 +153,20 @@ pub async fn get_by_symbols(
..Default::default() ..Default::default()
}; };
let us_equity_assets = get(client, rate_limiter, &us_equity_query, backoff_clone); let us_equity_assets = get(
client,
rate_limiter,
&us_equity_query,
backoff_clone,
api_base,
);
let crypto_query = outgoing::asset::Asset { let crypto_query = outgoing::asset::Asset {
class: Some(Class::Crypto), class: Some(Class::Crypto),
..Default::default() ..Default::default()
}; };
let crypto_assets = get(client, rate_limiter, &crypto_query, backoff); let crypto_assets = get(client, rate_limiter, &crypto_query, backoff, api_base);
let (us_equity_assets, crypto_assets) = try_join!(us_equity_assets, crypto_assets)?; let (us_equity_assets, crypto_assets) = try_join!(us_equity_assets, crypto_assets)?;

View File

@@ -7,6 +7,7 @@ use serde::Deserialize;
use std::{collections::HashMap, time::Duration}; use std::{collections::HashMap, time::Duration};
use time::OffsetDateTime; use time::OffsetDateTime;
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct Bar { pub struct Bar {
#[serde(rename = "t")] #[serde(rename = "t")]

View File

@@ -1,5 +1,4 @@
use crate::{ use crate::{
config::ALPACA_API_URL,
types::{self, alpaca::api::outgoing}, types::{self, alpaca::api::outgoing},
utils::{de, time::EST_OFFSET}, utils::{de, time::EST_OFFSET},
}; };
@@ -36,13 +35,14 @@ pub async fn get(
rate_limiter: &DefaultDirectRateLimiter, rate_limiter: &DefaultDirectRateLimiter,
query: &outgoing::calendar::Calendar, query: &outgoing::calendar::Calendar,
backoff: Option<ExponentialBackoff>, backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Vec<Calendar>, Error> { ) -> Result<Vec<Calendar>, Error> {
retry_notify( retry_notify(
backoff.unwrap_or_default(), backoff.unwrap_or_default(),
|| async { || async {
rate_limiter.until_ready().await; rate_limiter.until_ready().await;
client client
.get(&format!("{}/calendar", *ALPACA_API_URL)) .get(&format!("https://{}.alpaca.markets/v2/calendar", api_base))
.query(query) .query(query)
.send() .send()
.await? .await?

View File

@@ -1,4 +1,3 @@
use crate::config::ALPACA_API_URL;
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
use log::warn; use log::warn;
@@ -22,13 +21,14 @@ pub async fn get(
client: &Client, client: &Client,
rate_limiter: &DefaultDirectRateLimiter, rate_limiter: &DefaultDirectRateLimiter,
backoff: Option<ExponentialBackoff>, backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Clock, Error> { ) -> Result<Clock, Error> {
retry_notify( retry_notify(
backoff.unwrap_or_default(), backoff.unwrap_or_default(),
|| async { || async {
rate_limiter.until_ready().await; rate_limiter.until_ready().await;
client client
.get(&format!("{}/clock", *ALPACA_API_URL)) .get(&format!("https://{}.alpaca.markets/v2/clock", api_base))
.send() .send()
.await? .await?
.error_for_status() .error_for_status()

View File

@@ -1,8 +1,10 @@
use crate::{ use crate::{
config::ALPACA_NEWS_DATA_API_URL,
types::{ types::{
self, self,
alpaca::{api::outgoing, shared::news::normalize_html_content}, alpaca::{
api::{outgoing, ALPACA_NEWS_DATA_API_URL},
shared::news::normalize_html_content,
},
}, },
utils::de, utils::de,
}; };

View File

@@ -1,27 +1,24 @@
use crate::{ use crate::types::alpaca::{api::outgoing, shared};
config::ALPACA_API_URL,
types::alpaca::{api::outgoing, shared},
};
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
use log::warn; use log::warn;
use reqwest::{Client, Error}; use reqwest::{Client, Error};
use std::time::Duration;
pub use shared::order::Order; pub use shared::order::Order;
use std::time::Duration;
pub async fn get( pub async fn get(
client: &Client, client: &Client,
rate_limiter: &DefaultDirectRateLimiter, rate_limiter: &DefaultDirectRateLimiter,
query: &outgoing::order::Order, query: &outgoing::order::Order,
backoff: Option<ExponentialBackoff>, backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Vec<Order>, Error> { ) -> Result<Vec<Order>, Error> {
retry_notify( retry_notify(
backoff.unwrap_or_default(), backoff.unwrap_or_default(),
|| async { || async {
rate_limiter.until_ready().await; rate_limiter.until_ready().await;
client client
.get(&format!("{}/orders", *ALPACA_API_URL)) .get(&format!("https://{}.alpaca.markets/v2/orders", api_base))
.query(query) .query(query)
.send() .send()
.await? .await?

View File

@@ -1,5 +1,4 @@
use crate::{ use crate::{
config::ALPACA_API_URL,
types::alpaca::shared::{ types::alpaca::shared::{
self, self,
asset::{Class, Exchange}, asset::{Class, Exchange},
@@ -66,17 +65,20 @@ pub struct Position {
pub asset_marginable: bool, pub asset_marginable: bool,
} }
pub const ALPACA_API_URL_TEMPLATE: &str = "";
pub async fn get( pub async fn get(
client: &Client, client: &Client,
rate_limiter: &DefaultDirectRateLimiter, rate_limiter: &DefaultDirectRateLimiter,
backoff: Option<ExponentialBackoff>, backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Vec<Position>, reqwest::Error> { ) -> Result<Vec<Position>, reqwest::Error> {
retry_notify( retry_notify(
backoff.unwrap_or_default(), backoff.unwrap_or_default(),
|| async { || async {
rate_limiter.until_ready().await; rate_limiter.until_ready().await;
client client
.get(&format!("{}/positions", *ALPACA_API_URL)) .get(&format!("https://{}.alpaca.markets/v2/positions", api_base))
.send() .send()
.await? .await?
.error_for_status() .error_for_status()
@@ -106,13 +108,17 @@ pub async fn get_by_symbol(
rate_limiter: &DefaultDirectRateLimiter, rate_limiter: &DefaultDirectRateLimiter,
symbol: &str, symbol: &str,
backoff: Option<ExponentialBackoff>, backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Option<Position>, reqwest::Error> { ) -> Result<Option<Position>, reqwest::Error> {
retry_notify( retry_notify(
backoff.unwrap_or_default(), backoff.unwrap_or_default(),
|| async { || async {
rate_limiter.until_ready().await; rate_limiter.until_ready().await;
let response = client let response = client
.get(&format!("{}/positions/{}", *ALPACA_API_URL, symbol)) .get(&format!(
"https://{}.alpaca.markets/v2/positions/{}",
api_base, symbol
))
.send() .send()
.await?; .await?;
@@ -149,16 +155,17 @@ pub async fn get_by_symbols(
rate_limiter: &DefaultDirectRateLimiter, rate_limiter: &DefaultDirectRateLimiter,
symbols: &[String], symbols: &[String],
backoff: Option<ExponentialBackoff>, backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Vec<Position>, reqwest::Error> { ) -> Result<Vec<Position>, reqwest::Error> {
if symbols.len() < 2 { if symbols.len() < 2 {
let symbol = symbols.first().unwrap(); let symbol = symbols.first().unwrap();
let position = get_by_symbol(client, rate_limiter, symbol, backoff).await?; let position = get_by_symbol(client, rate_limiter, symbol, backoff, api_base).await?;
return Ok(position.into_iter().collect()); return Ok(position.into_iter().collect());
} }
let symbols = symbols.iter().collect::<HashSet<_>>(); let symbols = symbols.iter().collect::<HashSet<_>>();
let positions = get(client, rate_limiter, backoff).await?; let positions = get(client, rate_limiter, backoff, api_base).await?;
Ok(positions Ok(positions
.into_iter() .into_iter()

View File

@@ -0,0 +1,6 @@
pub mod incoming;
pub mod outgoing;
pub const ALPACA_US_EQUITY_DATA_API_URL: &str = "https://data.alpaca.markets/v2/stocks/bars";
pub const ALPACA_CRYPTO_DATA_API_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars";
pub const ALPACA_NEWS_DATA_API_URL: &str = "https://data.alpaca.markets/v1beta1/news";

View File

@@ -1,5 +1,4 @@
use crate::{ use crate::{
config::ALPACA_SOURCE,
types::alpaca::shared::{Sort, Source}, types::alpaca::shared::{Sort, Source},
utils::{ser, ONE_MINUTE}, utils::{ser, ONE_MINUTE},
}; };
@@ -58,7 +57,7 @@ impl Default for UsEquity {
limit: Some(MAX_LIMIT), limit: Some(MAX_LIMIT),
adjustment: Some(Adjustment::All), adjustment: Some(Adjustment::All),
asof: None, asof: None,
feed: Some(*ALPACA_SOURCE), feed: Some(Source::Iex),
currency: None, currency: None,
page_token: None, page_token: None,
sort: Some(Sort::Asc), sort: Some(Sort::Asc),

View File

@@ -1,3 +1,4 @@
pub mod api; pub mod api;
pub mod shared; pub mod shared;
pub mod websocket; pub mod websocket;

View File

@@ -1,10 +1,7 @@
pub mod incoming; pub mod incoming;
pub mod outgoing; pub mod outgoing;
use crate::{ use crate::types::alpaca::websocket;
config::{ALPACA_API_KEY, ALPACA_API_SECRET},
types::alpaca::websocket,
};
use core::panic; use core::panic;
use futures_util::{ use futures_util::{
stream::{SplitSink, SplitStream}, stream::{SplitSink, SplitStream},
@@ -17,6 +14,8 @@ use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
pub async fn authenticate( pub async fn authenticate(
sink: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>, sink: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
api_key: String,
api_secret: String,
) { ) {
match stream.next().await.unwrap().unwrap() { match stream.next().await.unwrap().unwrap() {
Message::Text(data) Message::Text(data)
@@ -32,8 +31,8 @@ pub async fn authenticate(
sink.send(Message::Text( sink.send(Message::Text(
to_string(&websocket::data::outgoing::Message::Auth( to_string(&websocket::data::outgoing::Message::Auth(
websocket::auth::Message { websocket::auth::Message {
key: (*ALPACA_API_KEY).clone(), key: api_key,
secret: (*ALPACA_API_SECRET).clone(), secret: api_secret,
}, },
)) ))
.unwrap(), .unwrap(),

View File

@@ -0,0 +1,8 @@
pub mod auth;
pub mod data;
pub mod trading;
pub const ALPACA_US_EQUITY_DATA_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2";
pub const ALPACA_CRYPTO_DATA_WEBSOCKET_URL: &str =
"wss://stream.data.alpaca.markets/v1beta3/crypto/us";
pub const ALPACA_NEWS_DATA_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta1/news";

View File

@@ -1,10 +1,7 @@
pub mod incoming; pub mod incoming;
pub mod outgoing; pub mod outgoing;
use crate::{ use crate::types::alpaca::websocket;
config::{ALPACA_API_KEY, ALPACA_API_SECRET},
types::alpaca::websocket,
};
use core::panic; use core::panic;
use futures_util::{ use futures_util::{
stream::{SplitSink, SplitStream}, stream::{SplitSink, SplitStream},
@@ -17,12 +14,14 @@ use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
pub async fn authenticate( pub async fn authenticate(
sink: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>, sink: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, stream: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
api_key: String,
api_secret: String,
) { ) {
sink.send(Message::Text( sink.send(Message::Text(
to_string(&websocket::trading::outgoing::Message::Auth( to_string(&websocket::trading::outgoing::Message::Auth(
websocket::auth::Message { websocket::auth::Message {
key: (*ALPACA_API_KEY).clone(), key: api_key,
secret: (*ALPACA_API_SECRET).clone(), secret: api_secret,
}, },
)) ))
.unwrap(), .unwrap(),

View File

@@ -3,16 +3,14 @@
#![feature(hash_extract_if)] #![feature(hash_extract_if)]
mod config; mod config;
mod database;
mod init; mod init;
mod routes; mod routes;
mod threads; mod threads;
mod types;
mod utils;
use config::Config; use config::Config;
use dotenv::dotenv; use dotenv::dotenv;
use log4rs::config::Deserializers; use log4rs::config::Deserializers;
use qrust::{create_send_await, database};
use tokio::{join, spawn, sync::mpsc, try_join}; use tokio::{join, spawn, sync::mpsc, try_join};
#[tokio::main] #[tokio::main]

View File

@@ -1,10 +1,10 @@
use crate::{ use crate::{
config::Config, config::{Config, ALPACA_API_BASE},
create_send_await, database, threads, create_send_await, database, threads,
types::{alpaca, Asset},
}; };
use axum::{extract::Path, Extension, Json}; use axum::{extract::Path, Extension, Json};
use http::StatusCode; use http::StatusCode;
use qrust::types::{alpaca, Asset};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
@@ -74,6 +74,7 @@ pub async fn add(
&config.alpaca_rate_limiter, &config.alpaca_rate_limiter,
&request.symbols, &request.symbols,
None, None,
&ALPACA_API_BASE,
) )
.await .await
.map_err(|e| { .map_err(|e| {
@@ -147,6 +148,7 @@ pub async fn add_symbol(
&config.alpaca_rate_limiter, &config.alpaca_rate_limiter,
&symbol, &symbol,
None, None,
&ALPACA_API_BASE,
) )
.await .await
.map_err(|e| { .map_err(|e| {

View File

@@ -1,10 +1,12 @@
use crate::{ use crate::{
config::Config, config::{Config, ALPACA_API_BASE},
database, database,
};
use log::info;
use qrust::{
types::{alpaca, Calendar}, types::{alpaca, Calendar},
utils::{backoff, duration_until}, utils::{backoff, duration_until},
}; };
use log::info;
use std::sync::Arc; use std::sync::Arc;
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::{join, sync::mpsc, time::sleep}; use tokio::{join, sync::mpsc, time::sleep};
@@ -42,6 +44,7 @@ pub async fn run(config: Arc<Config>, sender: mpsc::Sender<Message>) {
&config.alpaca_client, &config.alpaca_client,
&config.alpaca_rate_limiter, &config.alpaca_rate_limiter,
Some(backoff::infinite()), Some(backoff::infinite()),
&ALPACA_API_BASE,
) )
.await .await
.unwrap() .unwrap()
@@ -53,6 +56,7 @@ pub async fn run(config: Arc<Config>, sender: mpsc::Sender<Message>) {
&config.alpaca_rate_limiter, &config.alpaca_rate_limiter,
&alpaca::api::outgoing::calendar::Calendar::default(), &alpaca::api::outgoing::calendar::Calendar::default(),
Some(backoff::infinite()), Some(backoff::infinite()),
&ALPACA_API_BASE,
) )
.await .await
.unwrap() .unwrap()

View File

@@ -2,6 +2,11 @@ use super::Job;
use crate::{ use crate::{
config::{Config, ALPACA_SOURCE}, config::{Config, ALPACA_SOURCE},
database, database,
};
use async_trait::async_trait;
use itertools::{Either, Itertools};
use log::{error, info};
use qrust::{
types::{ types::{
alpaca::{ alpaca::{
self, self,
@@ -11,9 +16,6 @@ use crate::{
}, },
utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE}, utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE},
}; };
use async_trait::async_trait;
use itertools::{Either, Itertools};
use log::{error, info};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::time::sleep; use tokio::time::sleep;
@@ -41,6 +43,7 @@ pub fn us_equity_query_constructor(
end: Some(fetch_to), end: Some(fetch_to),
page_token: next_page_token, page_token: next_page_token,
sort: Some(Sort::Asc), sort: Some(Sort::Asc),
feed: Some(*ALPACA_SOURCE),
..Default::default() ..Default::default()
}) })
} }

View File

@@ -2,14 +2,17 @@ mod bars;
mod news; mod news;
use super::ThreadType; use super::ThreadType;
use crate::{ use crate::config::Config;
config::{Config, ALPACA_CRYPTO_DATA_API_URL, ALPACA_STOCK_DATA_API_URL},
types::{Backfill, Class},
utils::{last_minute, ONE_SECOND},
};
use async_trait::async_trait; use async_trait::async_trait;
use itertools::Itertools; use itertools::Itertools;
use log::{info, warn}; use log::{info, warn};
use qrust::{
types::{
alpaca::api::{ALPACA_CRYPTO_DATA_API_URL, ALPACA_US_EQUITY_DATA_API_URL},
Backfill, Class,
},
utils::{last_minute, ONE_SECOND},
};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::{ use tokio::{
@@ -224,7 +227,7 @@ pub fn create_handler(thread_type: ThreadType, config: Arc<Config>) -> Box<dyn H
match thread_type { match thread_type {
ThreadType::Bars(Class::UsEquity) => Box::new(bars::Handler { ThreadType::Bars(Class::UsEquity) => Box::new(bars::Handler {
config, config,
data_url: ALPACA_STOCK_DATA_API_URL, data_url: ALPACA_US_EQUITY_DATA_API_URL,
api_query_constructor: bars::us_equity_query_constructor, api_query_constructor: bars::us_equity_query_constructor,
}), }),
ThreadType::Bars(Class::Crypto) => Box::new(bars::Handler { ThreadType::Bars(Class::Crypto) => Box::new(bars::Handler {

View File

@@ -2,6 +2,12 @@ use super::Job;
use crate::{ use crate::{
config::{Config, ALPACA_SOURCE, BERT_MAX_INPUTS}, config::{Config, ALPACA_SOURCE, BERT_MAX_INPUTS},
database, database,
};
use async_trait::async_trait;
use futures_util::future::join_all;
use itertools::{Either, Itertools};
use log::{error, info};
use qrust::{
types::{ types::{
alpaca::{ alpaca::{
self, self,
@@ -12,10 +18,6 @@ use crate::{
}, },
utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE}, utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE},
}; };
use async_trait::async_trait;
use futures_util::future::join_all;
use itertools::{Either, Itertools};
use log::{error, info};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::{task::block_in_place, time::sleep}; use tokio::{task::block_in_place, time::sleep};

View File

@@ -3,16 +3,22 @@ mod websocket;
use super::clock; use super::clock;
use crate::{ use crate::{
config::{ config::{Config, ALPACA_API_BASE, ALPACA_API_KEY, ALPACA_API_SECRET, ALPACA_SOURCE},
Config, ALPACA_CRYPTO_DATA_WEBSOCKET_URL, ALPACA_NEWS_DATA_WEBSOCKET_URL, ALPACA_SOURCE,
ALPACA_STOCK_DATA_WEBSOCKET_URL,
},
create_send_await, database, create_send_await, database,
types::{alpaca, Asset, Class},
}; };
use futures_util::StreamExt; use futures_util::StreamExt;
use itertools::{Either, Itertools}; use itertools::{Either, Itertools};
use log::error; use log::error;
use qrust::types::{
alpaca::{
self,
websocket::{
ALPACA_CRYPTO_DATA_WEBSOCKET_URL, ALPACA_NEWS_DATA_WEBSOCKET_URL,
ALPACA_US_EQUITY_DATA_WEBSOCKET_URL,
},
},
Asset, Class,
};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::{ use tokio::{
join, select, spawn, join, select, spawn,
@@ -103,7 +109,7 @@ async fn init_thread(
) { ) {
let websocket_url = match thread_type { let websocket_url = match thread_type {
ThreadType::Bars(Class::UsEquity) => { ThreadType::Bars(Class::UsEquity) => {
format!("{}/{}", ALPACA_STOCK_DATA_WEBSOCKET_URL, *ALPACA_SOURCE) format!("{}/{}", ALPACA_US_EQUITY_DATA_WEBSOCKET_URL, *ALPACA_SOURCE)
} }
ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_DATA_WEBSOCKET_URL.into(), ThreadType::Bars(Class::Crypto) => ALPACA_CRYPTO_DATA_WEBSOCKET_URL.into(),
ThreadType::News => ALPACA_NEWS_DATA_WEBSOCKET_URL.into(), ThreadType::News => ALPACA_NEWS_DATA_WEBSOCKET_URL.into(),
@@ -111,7 +117,13 @@ async fn init_thread(
let (websocket, _) = connect_async(websocket_url).await.unwrap(); let (websocket, _) = connect_async(websocket_url).await.unwrap();
let (mut websocket_sink, mut websocket_stream) = websocket.split(); let (mut websocket_sink, mut websocket_stream) = websocket.split();
alpaca::websocket::data::authenticate(&mut websocket_sink, &mut websocket_stream).await; alpaca::websocket::data::authenticate(
&mut websocket_sink,
&mut websocket_stream,
(*ALPACA_API_KEY).to_string(),
(*ALPACA_API_SECRET).to_string(),
)
.await;
let (backfill_sender, backfill_receiver) = mpsc::channel(100); let (backfill_sender, backfill_receiver) = mpsc::channel(100);
spawn(backfill::run( spawn(backfill::run(
@@ -207,6 +219,7 @@ async fn handle_message(
&config.alpaca_rate_limiter, &config.alpaca_rate_limiter,
&symbols, &symbols,
None, None,
&ALPACA_API_BASE,
) )
.await .await
.unwrap() .unwrap()
@@ -221,6 +234,7 @@ async fn handle_message(
&config.alpaca_rate_limiter, &config.alpaca_rate_limiter,
&symbols, &symbols,
None, None,
&ALPACA_API_BASE,
) )
.await .await
.unwrap() .unwrap()

View File

@@ -1,11 +1,8 @@
use super::Pending; use super::Pending;
use crate::{ use crate::{config::Config, database};
config::Config,
database,
types::{alpaca::websocket, Bar},
};
use async_trait::async_trait; use async_trait::async_trait;
use log::{debug, error, info}; use log::{debug, error, info};
use qrust::types::{alpaca::websocket, Bar};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock; use tokio::sync::RwLock;

View File

@@ -2,10 +2,7 @@ mod bars;
mod news; mod news;
use super::ThreadType; use super::ThreadType;
use crate::{ use crate::config::Config;
config::Config,
types::{alpaca::websocket, Class},
};
use async_trait::async_trait; use async_trait::async_trait;
use futures_util::{ use futures_util::{
future::join_all, future::join_all,
@@ -13,6 +10,7 @@ use futures_util::{
SinkExt, StreamExt, SinkExt, StreamExt,
}; };
use log::error; use log::error;
use qrust::types::{alpaca::websocket, Class};
use serde_json::{from_str, to_string}; use serde_json::{from_str, to_string};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::{ use tokio::{

View File

@@ -1,11 +1,8 @@
use super::Pending; use super::Pending;
use crate::{ use crate::{config::Config, database};
config::Config,
database,
types::{alpaca::websocket, news::Prediction, News},
};
use async_trait::async_trait; use async_trait::async_trait;
use log::{debug, error, info}; use log::{debug, error, info};
use qrust::types::{alpaca::websocket, news::Prediction, News};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::{sync::RwLock, task::block_in_place}; use tokio::{sync::RwLock, task::block_in_place};

View File

@@ -1,19 +1,26 @@
mod websocket; mod websocket;
use crate::{ use crate::config::{Config, ALPACA_API_BASE, ALPACA_API_KEY, ALPACA_API_SECRET};
config::{Config, ALPACA_WEBSOCKET_URL},
types::alpaca,
};
use futures_util::StreamExt; use futures_util::StreamExt;
use qrust::types::alpaca;
use std::sync::Arc; use std::sync::Arc;
use tokio::spawn; use tokio::spawn;
use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async;
pub async fn run(config: Arc<Config>) { pub async fn run(config: Arc<Config>) {
let (websocket, _) = connect_async(&*ALPACA_WEBSOCKET_URL).await.unwrap(); let (websocket, _) =
connect_async(&format!("wss://{}.alpaca.markets/stream", *ALPACA_API_BASE))
.await
.unwrap();
let (mut websocket_sink, mut websocket_stream) = websocket.split(); let (mut websocket_sink, mut websocket_stream) = websocket.split();
alpaca::websocket::trading::authenticate(&mut websocket_sink, &mut websocket_stream).await; alpaca::websocket::trading::authenticate(
&mut websocket_sink,
&mut websocket_stream,
(*ALPACA_API_KEY).to_string(),
(*ALPACA_API_SECRET).to_string(),
)
.await;
alpaca::websocket::trading::subscribe(&mut websocket_sink, &mut websocket_stream).await; alpaca::websocket::trading::subscribe(&mut websocket_sink, &mut websocket_stream).await;
spawn(websocket::run(config, websocket_stream)); spawn(websocket::run(config, websocket_stream));

View File

@@ -1,10 +1,7 @@
use crate::{ use crate::{config::Config, database};
config::Config,
database,
types::{alpaca::websocket, Order},
};
use futures_util::{stream::SplitStream, StreamExt}; use futures_util::{stream::SplitStream, StreamExt};
use log::{debug, error}; use log::{debug, error};
use qrust::types::{alpaca::websocket, Order};
use serde_json::from_str; use serde_json::from_str;
use std::sync::Arc; use std::sync::Arc;
use tokio::{net::TcpStream, spawn}; use tokio::{net::TcpStream, spawn};

View File

@@ -1,2 +0,0 @@
pub mod incoming;
pub mod outgoing;

View File

@@ -1,3 +0,0 @@
pub mod auth;
pub mod data;
pub mod trading;