From 1707d74cf70f70474dda0b2cefd622ed174bf2c8 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Wed, 13 Mar 2024 19:45:06 +0000 Subject: [PATCH] Improve alpaca request error handling Signed-off-by: Nikolaos Karaolidis --- src/config.rs | 3 +-- src/lib/alpaca/account.rs | 13 +++++-------- src/lib/alpaca/assets.rs | 29 +++++++++-------------------- src/lib/alpaca/bars.rs | 13 +++++-------- src/lib/alpaca/calendar.rs | 13 +++++-------- src/lib/alpaca/clock.rs | 13 +++++-------- src/lib/alpaca/mod.rs | 24 ++++++++++++++++++++++++ src/lib/alpaca/news.rs | 13 +++++-------- src/lib/alpaca/orders.rs | 13 +++++-------- src/lib/alpaca/positions.rs | 25 +++++++++---------------- 10 files changed, 73 insertions(+), 86 deletions(-) diff --git a/src/config.rs b/src/config.rs index a6b5514..dcac036 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,7 +5,6 @@ use reqwest::{ header::{HeaderMap, HeaderName, HeaderValue}, Client, }; - use std::{env, num::NonZeroU32, sync::Arc}; use tokio::sync::Semaphore; @@ -67,7 +66,7 @@ impl Config { .unwrap(), alpaca_rate_limiter: RateLimiter::direct(Quota::per_minute(match *ALPACA_SOURCE { Source::Iex => unsafe { NonZeroU32::new_unchecked(200) }, - Source::Sip => unsafe { NonZeroU32::new_unchecked(10000) }, + Source::Sip => unsafe { NonZeroU32::new_unchecked(10_000) }, Source::Otc => unimplemented!("OTC rate limit not implemented."), })), clickhouse_client: clickhouse::Client::default() diff --git a/src/lib/alpaca/account.rs b/src/lib/alpaca/account.rs index c13a325..6d36936 100644 --- a/src/lib/alpaca/account.rs +++ b/src/lib/alpaca/account.rs @@ -1,3 +1,4 @@ +use super::{error_to_backoff, status_error_to_backoff}; use crate::types::alpaca::api::incoming::account::Account; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -18,17 +19,13 @@ pub async fn get( client .get(&format!("https://{}.alpaca.markets/v2/account", api_base)) .send() - .await? + .await + .map_err(error_to_backoff)? .error_for_status() - .map_err(|e| match e.status() { - Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { - backoff::Error::Permanent(e) - } - _ => e.into(), - })? + .map_err(status_error_to_backoff)? .json::() .await - .map_err(backoff::Error::Permanent) + .map_err(error_to_backoff) }, |e, duration: Duration| { warn!( diff --git a/src/lib/alpaca/assets.rs b/src/lib/alpaca/assets.rs index e8b3e5f..8310ee3 100644 --- a/src/lib/alpaca/assets.rs +++ b/src/lib/alpaca/assets.rs @@ -1,3 +1,4 @@ +use super::{error_to_backoff, status_error_to_backoff}; use crate::types::alpaca::api::{ incoming::asset::{Asset, Class}, outgoing, @@ -25,19 +26,13 @@ pub async fn get( .get(&format!("https://{}.alpaca.markets/v2/assets", api_base)) .query(query) .send() - .await? + .await + .map_err(error_to_backoff)? .error_for_status() - .map_err(|e| match e.status() { - Some( - reqwest::StatusCode::BAD_REQUEST - | reqwest::StatusCode::FORBIDDEN - | reqwest::StatusCode::NOT_FOUND, - ) => backoff::Error::Permanent(e), - _ => e.into(), - })? + .map_err(status_error_to_backoff)? .json::>() .await - .map_err(backoff::Error::Permanent) + .map_err(error_to_backoff) }, |e, duration: Duration| { warn!( @@ -67,19 +62,13 @@ pub async fn get_by_symbol( api_base, symbol )) .send() - .await? + .await + .map_err(error_to_backoff)? .error_for_status() - .map_err(|e| match e.status() { - Some( - reqwest::StatusCode::BAD_REQUEST - | reqwest::StatusCode::FORBIDDEN - | reqwest::StatusCode::NOT_FOUND, - ) => backoff::Error::Permanent(e), - _ => e.into(), - })? + .map_err(status_error_to_backoff)? .json::() .await - .map_err(backoff::Error::Permanent) + .map_err(error_to_backoff) }, |e, duration: Duration| { warn!( diff --git a/src/lib/alpaca/bars.rs b/src/lib/alpaca/bars.rs index e3c896b..b5f72e9 100644 --- a/src/lib/alpaca/bars.rs +++ b/src/lib/alpaca/bars.rs @@ -1,3 +1,4 @@ +use super::{error_to_backoff, status_error_to_backoff}; use crate::types::alpaca::api::{incoming::bar::Bar, outgoing}; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -29,17 +30,13 @@ pub async fn get( .get(data_url) .query(query) .send() - .await? + .await + .map_err(error_to_backoff)? .error_for_status() - .map_err(|e| match e.status() { - Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { - backoff::Error::Permanent(e) - } - _ => e.into(), - })? + .map_err(status_error_to_backoff)? .json::() .await - .map_err(backoff::Error::Permanent) + .map_err(error_to_backoff) }, |e, duration: Duration| { warn!( diff --git a/src/lib/alpaca/calendar.rs b/src/lib/alpaca/calendar.rs index 8f4cff7..789b219 100644 --- a/src/lib/alpaca/calendar.rs +++ b/src/lib/alpaca/calendar.rs @@ -1,3 +1,4 @@ +use super::{error_to_backoff, status_error_to_backoff}; use crate::types::alpaca::api::{incoming::calendar::Calendar, outgoing}; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -20,17 +21,13 @@ pub async fn get( .get(&format!("https://{}.alpaca.markets/v2/calendar", api_base)) .query(query) .send() - .await? + .await + .map_err(error_to_backoff)? .error_for_status() - .map_err(|e| match e.status() { - Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { - backoff::Error::Permanent(e) - } - _ => e.into(), - })? + .map_err(status_error_to_backoff)? .json::>() .await - .map_err(backoff::Error::Permanent) + .map_err(error_to_backoff) }, |e, duration: Duration| { warn!( diff --git a/src/lib/alpaca/clock.rs b/src/lib/alpaca/clock.rs index 4b79997..2423c43 100644 --- a/src/lib/alpaca/clock.rs +++ b/src/lib/alpaca/clock.rs @@ -1,3 +1,4 @@ +use super::{error_to_backoff, status_error_to_backoff}; use crate::types::alpaca::api::incoming::clock::Clock; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -18,17 +19,13 @@ pub async fn get( client .get(&format!("https://{}.alpaca.markets/v2/clock", api_base)) .send() - .await? + .await + .map_err(error_to_backoff)? .error_for_status() - .map_err(|e| match e.status() { - Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { - backoff::Error::Permanent(e) - } - _ => e.into(), - })? + .map_err(status_error_to_backoff)? .json::() .await - .map_err(backoff::Error::Permanent) + .map_err(error_to_backoff) }, |e, duration: Duration| { warn!( diff --git a/src/lib/alpaca/mod.rs b/src/lib/alpaca/mod.rs index 90dd11b..65477a9 100644 --- a/src/lib/alpaca/mod.rs +++ b/src/lib/alpaca/mod.rs @@ -6,3 +6,27 @@ pub mod clock; pub mod news; pub mod orders; pub mod positions; + +use reqwest::StatusCode; + +pub fn status_error_to_backoff(err: reqwest::Error) -> backoff::Error { + match err.status() { + Some(StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND) | None => { + backoff::Error::Permanent(err) + } + _ => err.into(), + } +} + +pub fn error_to_backoff(err: reqwest::Error) -> backoff::Error { + if err.is_status() { + return status_error_to_backoff(err); + } + + if err.is_builder() || err.is_request() || err.is_redirect() || err.is_decode() || err.is_body() + { + return backoff::Error::Permanent(err); + } + + err.into() +} diff --git a/src/lib/alpaca/news.rs b/src/lib/alpaca/news.rs index 6464845..be1812b 100644 --- a/src/lib/alpaca/news.rs +++ b/src/lib/alpaca/news.rs @@ -1,3 +1,4 @@ +use super::{error_to_backoff, status_error_to_backoff}; use crate::types::alpaca::api::{incoming::news::News, outgoing, ALPACA_NEWS_DATA_API_URL}; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -28,17 +29,13 @@ pub async fn get( .get(ALPACA_NEWS_DATA_API_URL) .query(query) .send() - .await? + .await + .map_err(error_to_backoff)? .error_for_status() - .map_err(|e| match e.status() { - Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { - backoff::Error::Permanent(e) - } - _ => e.into(), - })? + .map_err(status_error_to_backoff)? .json::() .await - .map_err(backoff::Error::Permanent) + .map_err(error_to_backoff) }, |e, duration: Duration| { warn!( diff --git a/src/lib/alpaca/orders.rs b/src/lib/alpaca/orders.rs index 8c65738..04eccc3 100644 --- a/src/lib/alpaca/orders.rs +++ b/src/lib/alpaca/orders.rs @@ -1,3 +1,4 @@ +use super::{error_to_backoff, status_error_to_backoff}; use crate::types::alpaca::{api::outgoing, shared::order}; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -22,17 +23,13 @@ pub async fn get( .get(&format!("https://{}.alpaca.markets/v2/orders", api_base)) .query(query) .send() - .await? + .await + .map_err(error_to_backoff)? .error_for_status() - .map_err(|e| match e.status() { - Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { - backoff::Error::Permanent(e) - } - _ => e.into(), - })? + .map_err(status_error_to_backoff)? .json::>() .await - .map_err(backoff::Error::Permanent) + .map_err(error_to_backoff) }, |e, duration: Duration| { warn!( diff --git a/src/lib/alpaca/positions.rs b/src/lib/alpaca/positions.rs index 79b619d..6b1da18 100644 --- a/src/lib/alpaca/positions.rs +++ b/src/lib/alpaca/positions.rs @@ -1,3 +1,4 @@ +use super::{error_to_backoff, status_error_to_backoff}; use crate::types::alpaca::api::incoming::position::Position; use backoff::{future::retry_notify, ExponentialBackoff}; use governor::DefaultDirectRateLimiter; @@ -18,17 +19,13 @@ pub async fn get( client .get(&format!("https://{}.alpaca.markets/v2/positions", api_base)) .send() - .await? + .await + .map_err(error_to_backoff)? .error_for_status() - .map_err(|e| match e.status() { - Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { - backoff::Error::Permanent(e) - } - _ => e.into(), - })? + .map_err(status_error_to_backoff)? .json::>() .await - .map_err(backoff::Error::Permanent) + .map_err(error_to_backoff) }, |e, duration: Duration| { warn!( @@ -58,7 +55,8 @@ pub async fn get_by_symbol( api_base, symbol )) .send() - .await?; + .await + .map_err(error_to_backoff)?; if response.status() == reqwest::StatusCode::NOT_FOUND { return Ok(None); @@ -66,15 +64,10 @@ pub async fn get_by_symbol( response .error_for_status() - .map_err(|e| match e.status() { - Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => { - backoff::Error::Permanent(e) - } - _ => e.into(), - })? + .map_err(status_error_to_backoff)? .json::() .await - .map_err(backoff::Error::Permanent) + .map_err(error_to_backoff) .map(Some) }, |e, duration: Duration| {