diff --git a/src/init.rs b/src/init.rs index 9219fd0..dca3b2b 100644 --- a/src/init.rs +++ b/src/init.rs @@ -1,7 +1,7 @@ use crate::{ config::{Config, ALPACA_MODE}, database, - types::alpaca::{self, api, shared::Sort}, + types::alpaca::{self, shared::Sort}, }; use log::{info, warn}; use std::{collections::HashMap, sync::Arc}; @@ -45,11 +45,11 @@ pub async fn rehydrate_orders(config: &Arc) { let mut orders = vec![]; let mut after = OffsetDateTime::UNIX_EPOCH; - while let Some(message) = api::incoming::order::get( + while let Some(message) = alpaca::api::incoming::order::get( &config.alpaca_client, &config.alpaca_rate_limiter, - &api::outgoing::order::Order { - status: Some(api::outgoing::order::Status::All), + &alpaca::api::outgoing::order::Order { + status: Some(alpaca::api::outgoing::order::Status::All), limit: Some(500), after: Some(after), until: None, @@ -70,7 +70,7 @@ pub async fn rehydrate_orders(config: &Arc) { let orders = orders .into_iter() - .flat_map(&api::incoming::order::Order::normalize) + .flat_map(&alpaca::api::incoming::order::Order::normalize) .collect::>(); database::orders::upsert_batch(&config.clickhouse_client, &orders) diff --git a/src/threads/data/backfill.rs b/src/threads/data/backfill.rs index c3caf27..9fb48eb 100644 --- a/src/threads/data/backfill.rs +++ b/src/threads/data/backfill.rs @@ -7,7 +7,7 @@ use crate::{ database, types::{ alpaca::{ - api, + self, shared::{Sort, Source}, }, news::Prediction, @@ -167,7 +167,7 @@ struct BarHandler { fetch_from: OffsetDateTime, fetch_to: OffsetDateTime, next_page_token: Option, - ) -> api::outgoing::bar::Bar, + ) -> alpaca::api::outgoing::bar::Bar, } fn us_equity_query_constructor( @@ -175,14 +175,14 @@ fn us_equity_query_constructor( fetch_from: OffsetDateTime, fetch_to: OffsetDateTime, next_page_token: Option, -) -> api::outgoing::bar::Bar { - api::outgoing::bar::Bar::UsEquity { +) -> alpaca::api::outgoing::bar::Bar { + alpaca::api::outgoing::bar::Bar::UsEquity { symbols: vec![symbol], timeframe: ONE_MINUTE, start: Some(fetch_from), end: Some(fetch_to), limit: Some(10000), - adjustment: None, + adjustment: Some(alpaca::api::outgoing::bar::Adjustment::All), asof: None, feed: Some(*ALPACA_SOURCE), currency: None, @@ -196,8 +196,8 @@ fn crypto_query_constructor( fetch_from: OffsetDateTime, fetch_to: OffsetDateTime, next_page_token: Option, -) -> api::outgoing::bar::Bar { - api::outgoing::bar::Bar::Crypto { +) -> alpaca::api::outgoing::bar::Bar { + alpaca::api::outgoing::bar::Bar::Crypto { symbols: vec![symbol], timeframe: ONE_MINUTE, start: Some(fetch_from), @@ -241,7 +241,7 @@ impl Handler for BarHandler { let mut next_page_token = None; loop { - let Ok(message) = api::incoming::bar::get_historical( + let Ok(message) = alpaca::api::incoming::bar::get_historical( &self.config.alpaca_client, &self.config.alpaca_rate_limiter, self.data_url, @@ -328,10 +328,10 @@ impl Handler for NewsHandler { let mut next_page_token = None; loop { - let Ok(message) = api::incoming::news::get_historical( + let Ok(message) = alpaca::api::incoming::news::get_historical( &self.config.alpaca_client, &self.config.alpaca_rate_limiter, - &api::outgoing::news::News { + &alpaca::api::outgoing::news::News { symbols: vec![symbol.clone()], start: Some(fetch_from), end: Some(fetch_to), diff --git a/src/types/alpaca/api/incoming/account.rs b/src/types/alpaca/api/incoming/account.rs index 580ba47..1463073 100644 --- a/src/types/alpaca/api/incoming/account.rs +++ b/src/types/alpaca/api/incoming/account.rs @@ -95,7 +95,9 @@ pub async fn get( .await? .error_for_status() .map_err(|e| match e.status() { - Some(reqwest::StatusCode::FORBIDDEN) => backoff::Error::Permanent(e), + Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { + backoff::Error::Permanent(e) + } _ => e.into(), })? .json::() diff --git a/src/types/alpaca/api/incoming/asset.rs b/src/types/alpaca/api/incoming/asset.rs index fb7367c..8df93f6 100644 --- a/src/types/alpaca/api/incoming/asset.rs +++ b/src/types/alpaca/api/incoming/asset.rs @@ -63,9 +63,11 @@ pub async fn get_by_symbol( .await? .error_for_status() .map_err(|e| match e.status() { - Some(reqwest::StatusCode::FORBIDDEN | reqwest::StatusCode::NOT_FOUND) => { - backoff::Error::Permanent(e) - } + Some( + reqwest::StatusCode::BAD_REQUEST + | reqwest::StatusCode::FORBIDDEN + | reqwest::StatusCode::NOT_FOUND, + ) => backoff::Error::Permanent(e), _ => e.into(), })? .json::() diff --git a/src/types/alpaca/api/incoming/bar.rs b/src/types/alpaca/api/incoming/bar.rs index 4ad1182..8bf6f6a 100644 --- a/src/types/alpaca/api/incoming/bar.rs +++ b/src/types/alpaca/api/incoming/bar.rs @@ -68,7 +68,9 @@ pub async fn get_historical( .await? .error_for_status() .map_err(|e| match e.status() { - Some(reqwest::StatusCode::FORBIDDEN) => backoff::Error::Permanent(e), + Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { + backoff::Error::Permanent(e) + } _ => e.into(), })? .json::() diff --git a/src/types/alpaca/api/incoming/clock.rs b/src/types/alpaca/api/incoming/clock.rs index 23d1841..c23eabb 100644 --- a/src/types/alpaca/api/incoming/clock.rs +++ b/src/types/alpaca/api/incoming/clock.rs @@ -33,7 +33,9 @@ pub async fn get( .await? .error_for_status() .map_err(|e| match e.status() { - Some(reqwest::StatusCode::FORBIDDEN) => backoff::Error::Permanent(e), + Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { + backoff::Error::Permanent(e) + } _ => e.into(), })? .json::() diff --git a/src/types/alpaca/api/incoming/news.rs b/src/types/alpaca/api/incoming/news.rs index b682718..25e3e9b 100644 --- a/src/types/alpaca/api/incoming/news.rs +++ b/src/types/alpaca/api/incoming/news.rs @@ -88,7 +88,13 @@ pub async fn get_historical( .query(query) .send() .await? - .error_for_status()? + .error_for_status() + .map_err(|e| match e.status() { + Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { + backoff::Error::Permanent(e) + } + _ => e.into(), + })? .json::() .await .map_err(backoff::Error::Permanent) diff --git a/src/types/alpaca/api/incoming/order.rs b/src/types/alpaca/api/incoming/order.rs index 1967104..dc68eb8 100644 --- a/src/types/alpaca/api/incoming/order.rs +++ b/src/types/alpaca/api/incoming/order.rs @@ -27,7 +27,9 @@ pub async fn get( .await? .error_for_status() .map_err(|e| match e.status() { - Some(reqwest::StatusCode::FORBIDDEN) => backoff::Error::Permanent(e), + Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { + backoff::Error::Permanent(e) + } _ => e.into(), })? .json::>() diff --git a/src/types/alpaca/api/incoming/position.rs b/src/types/alpaca/api/incoming/position.rs index 5ba8c73..3371fdc 100644 --- a/src/types/alpaca/api/incoming/position.rs +++ b/src/types/alpaca/api/incoming/position.rs @@ -81,7 +81,9 @@ pub async fn get( .await? .error_for_status() .map_err(|e| match e.status() { - Some(reqwest::StatusCode::FORBIDDEN) => backoff::Error::Permanent(e), + Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { + backoff::Error::Permanent(e) + } _ => e.into(), })? .json::>() @@ -121,7 +123,9 @@ pub async fn get_by_symbol( response .error_for_status() .map_err(|e| match e.status() { - Some(reqwest::StatusCode::FORBIDDEN) => backoff::Error::Permanent(e), + Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { + backoff::Error::Permanent(e) + } _ => e.into(), })? .json::() diff --git a/src/types/alpaca/api/outgoing/bar.rs b/src/types/alpaca/api/outgoing/bar.rs index 51c2086..b8c81da 100644 --- a/src/types/alpaca/api/outgoing/bar.rs +++ b/src/types/alpaca/api/outgoing/bar.rs @@ -7,6 +7,7 @@ use std::time::Duration; use time::OffsetDateTime; #[derive(Serialize)] +#[serde(rename_all = "snake_case")] #[allow(dead_code)] pub enum Adjustment { Raw,