Improve alpaca request error handling

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-03-13 19:45:06 +00:00
parent f3f9c6336b
commit 1707d74cf7
10 changed files with 73 additions and 86 deletions

View File

@@ -5,7 +5,6 @@ use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue}, header::{HeaderMap, HeaderName, HeaderValue},
Client, Client,
}; };
use std::{env, num::NonZeroU32, sync::Arc}; use std::{env, num::NonZeroU32, sync::Arc};
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
@@ -67,7 +66,7 @@ impl Config {
.unwrap(), .unwrap(),
alpaca_rate_limiter: RateLimiter::direct(Quota::per_minute(match *ALPACA_SOURCE { alpaca_rate_limiter: RateLimiter::direct(Quota::per_minute(match *ALPACA_SOURCE {
Source::Iex => unsafe { NonZeroU32::new_unchecked(200) }, Source::Iex => unsafe { NonZeroU32::new_unchecked(200) },
Source::Sip => unsafe { NonZeroU32::new_unchecked(10000) }, Source::Sip => unsafe { NonZeroU32::new_unchecked(10_000) },
Source::Otc => unimplemented!("OTC rate limit not implemented."), Source::Otc => unimplemented!("OTC rate limit not implemented."),
})), })),
clickhouse_client: clickhouse::Client::default() clickhouse_client: clickhouse::Client::default()

View File

@@ -1,3 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use crate::types::alpaca::api::incoming::account::Account; use crate::types::alpaca::api::incoming::account::Account;
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
@@ -18,17 +19,13 @@ pub async fn get(
client client
.get(&format!("https://{}.alpaca.markets/v2/account", api_base)) .get(&format!("https://{}.alpaca.markets/v2/account", api_base))
.send() .send()
.await? .await
.map_err(error_to_backoff)?
.error_for_status() .error_for_status()
.map_err(|e| match e.status() { .map_err(status_error_to_backoff)?
Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => {
backoff::Error::Permanent(e)
}
_ => e.into(),
})?
.json::<Account>() .json::<Account>()
.await .await
.map_err(backoff::Error::Permanent) .map_err(error_to_backoff)
}, },
|e, duration: Duration| { |e, duration: Duration| {
warn!( warn!(

View File

@@ -1,3 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use crate::types::alpaca::api::{ use crate::types::alpaca::api::{
incoming::asset::{Asset, Class}, incoming::asset::{Asset, Class},
outgoing, outgoing,
@@ -25,19 +26,13 @@ pub async fn get(
.get(&format!("https://{}.alpaca.markets/v2/assets", api_base)) .get(&format!("https://{}.alpaca.markets/v2/assets", api_base))
.query(query) .query(query)
.send() .send()
.await? .await
.map_err(error_to_backoff)?
.error_for_status() .error_for_status()
.map_err(|e| match e.status() { .map_err(status_error_to_backoff)?
Some(
reqwest::StatusCode::BAD_REQUEST
| reqwest::StatusCode::FORBIDDEN
| reqwest::StatusCode::NOT_FOUND,
) => backoff::Error::Permanent(e),
_ => e.into(),
})?
.json::<Vec<Asset>>() .json::<Vec<Asset>>()
.await .await
.map_err(backoff::Error::Permanent) .map_err(error_to_backoff)
}, },
|e, duration: Duration| { |e, duration: Duration| {
warn!( warn!(
@@ -67,19 +62,13 @@ pub async fn get_by_symbol(
api_base, symbol api_base, symbol
)) ))
.send() .send()
.await? .await
.map_err(error_to_backoff)?
.error_for_status() .error_for_status()
.map_err(|e| match e.status() { .map_err(status_error_to_backoff)?
Some(
reqwest::StatusCode::BAD_REQUEST
| reqwest::StatusCode::FORBIDDEN
| reqwest::StatusCode::NOT_FOUND,
) => backoff::Error::Permanent(e),
_ => e.into(),
})?
.json::<Asset>() .json::<Asset>()
.await .await
.map_err(backoff::Error::Permanent) .map_err(error_to_backoff)
}, },
|e, duration: Duration| { |e, duration: Duration| {
warn!( warn!(

View File

@@ -1,3 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use crate::types::alpaca::api::{incoming::bar::Bar, outgoing}; use crate::types::alpaca::api::{incoming::bar::Bar, outgoing};
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
@@ -29,17 +30,13 @@ pub async fn get(
.get(data_url) .get(data_url)
.query(query) .query(query)
.send() .send()
.await? .await
.map_err(error_to_backoff)?
.error_for_status() .error_for_status()
.map_err(|e| match e.status() { .map_err(status_error_to_backoff)?
Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => {
backoff::Error::Permanent(e)
}
_ => e.into(),
})?
.json::<Message>() .json::<Message>()
.await .await
.map_err(backoff::Error::Permanent) .map_err(error_to_backoff)
}, },
|e, duration: Duration| { |e, duration: Duration| {
warn!( warn!(

View File

@@ -1,3 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use crate::types::alpaca::api::{incoming::calendar::Calendar, outgoing}; use crate::types::alpaca::api::{incoming::calendar::Calendar, outgoing};
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
@@ -20,17 +21,13 @@ pub async fn get(
.get(&format!("https://{}.alpaca.markets/v2/calendar", api_base)) .get(&format!("https://{}.alpaca.markets/v2/calendar", api_base))
.query(query) .query(query)
.send() .send()
.await? .await
.map_err(error_to_backoff)?
.error_for_status() .error_for_status()
.map_err(|e| match e.status() { .map_err(status_error_to_backoff)?
Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => {
backoff::Error::Permanent(e)
}
_ => e.into(),
})?
.json::<Vec<Calendar>>() .json::<Vec<Calendar>>()
.await .await
.map_err(backoff::Error::Permanent) .map_err(error_to_backoff)
}, },
|e, duration: Duration| { |e, duration: Duration| {
warn!( warn!(

View File

@@ -1,3 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use crate::types::alpaca::api::incoming::clock::Clock; use crate::types::alpaca::api::incoming::clock::Clock;
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
@@ -18,17 +19,13 @@ pub async fn get(
client client
.get(&format!("https://{}.alpaca.markets/v2/clock", api_base)) .get(&format!("https://{}.alpaca.markets/v2/clock", api_base))
.send() .send()
.await? .await
.map_err(error_to_backoff)?
.error_for_status() .error_for_status()
.map_err(|e| match e.status() { .map_err(status_error_to_backoff)?
Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => {
backoff::Error::Permanent(e)
}
_ => e.into(),
})?
.json::<Clock>() .json::<Clock>()
.await .await
.map_err(backoff::Error::Permanent) .map_err(error_to_backoff)
}, },
|e, duration: Duration| { |e, duration: Duration| {
warn!( warn!(

View File

@@ -6,3 +6,27 @@ pub mod clock;
pub mod news; pub mod news;
pub mod orders; pub mod orders;
pub mod positions; pub mod positions;
use reqwest::StatusCode;
pub fn status_error_to_backoff(err: reqwest::Error) -> backoff::Error<reqwest::Error> {
match err.status() {
Some(StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND) | None => {
backoff::Error::Permanent(err)
}
_ => err.into(),
}
}
pub fn error_to_backoff(err: reqwest::Error) -> backoff::Error<reqwest::Error> {
if err.is_status() {
return status_error_to_backoff(err);
}
if err.is_builder() || err.is_request() || err.is_redirect() || err.is_decode() || err.is_body()
{
return backoff::Error::Permanent(err);
}
err.into()
}

View File

@@ -1,3 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use crate::types::alpaca::api::{incoming::news::News, outgoing, ALPACA_NEWS_DATA_API_URL}; use crate::types::alpaca::api::{incoming::news::News, outgoing, ALPACA_NEWS_DATA_API_URL};
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
@@ -28,17 +29,13 @@ pub async fn get(
.get(ALPACA_NEWS_DATA_API_URL) .get(ALPACA_NEWS_DATA_API_URL)
.query(query) .query(query)
.send() .send()
.await? .await
.map_err(error_to_backoff)?
.error_for_status() .error_for_status()
.map_err(|e| match e.status() { .map_err(status_error_to_backoff)?
Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => {
backoff::Error::Permanent(e)
}
_ => e.into(),
})?
.json::<Message>() .json::<Message>()
.await .await
.map_err(backoff::Error::Permanent) .map_err(error_to_backoff)
}, },
|e, duration: Duration| { |e, duration: Duration| {
warn!( warn!(

View File

@@ -1,3 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use crate::types::alpaca::{api::outgoing, shared::order}; use crate::types::alpaca::{api::outgoing, shared::order};
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
@@ -22,17 +23,13 @@ pub async fn get(
.get(&format!("https://{}.alpaca.markets/v2/orders", api_base)) .get(&format!("https://{}.alpaca.markets/v2/orders", api_base))
.query(query) .query(query)
.send() .send()
.await? .await
.map_err(error_to_backoff)?
.error_for_status() .error_for_status()
.map_err(|e| match e.status() { .map_err(status_error_to_backoff)?
Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => {
backoff::Error::Permanent(e)
}
_ => e.into(),
})?
.json::<Vec<Order>>() .json::<Vec<Order>>()
.await .await
.map_err(backoff::Error::Permanent) .map_err(error_to_backoff)
}, },
|e, duration: Duration| { |e, duration: Duration| {
warn!( warn!(

View File

@@ -1,3 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use crate::types::alpaca::api::incoming::position::Position; use crate::types::alpaca::api::incoming::position::Position;
use backoff::{future::retry_notify, ExponentialBackoff}; use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter; use governor::DefaultDirectRateLimiter;
@@ -18,17 +19,13 @@ pub async fn get(
client client
.get(&format!("https://{}.alpaca.markets/v2/positions", api_base)) .get(&format!("https://{}.alpaca.markets/v2/positions", api_base))
.send() .send()
.await? .await
.map_err(error_to_backoff)?
.error_for_status() .error_for_status()
.map_err(|e| match e.status() { .map_err(status_error_to_backoff)?
Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => {
backoff::Error::Permanent(e)
}
_ => e.into(),
})?
.json::<Vec<Position>>() .json::<Vec<Position>>()
.await .await
.map_err(backoff::Error::Permanent) .map_err(error_to_backoff)
}, },
|e, duration: Duration| { |e, duration: Duration| {
warn!( warn!(
@@ -58,7 +55,8 @@ pub async fn get_by_symbol(
api_base, symbol api_base, symbol
)) ))
.send() .send()
.await?; .await
.map_err(error_to_backoff)?;
if response.status() == reqwest::StatusCode::NOT_FOUND { if response.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None); return Ok(None);
@@ -66,15 +64,10 @@ pub async fn get_by_symbol(
response response
.error_for_status() .error_for_status()
.map_err(|e| match e.status() { .map_err(status_error_to_backoff)?
Some(reqwest::StatusCode::BAD_REQUEST | reqwest::StatusCode::FORBIDDEN) => {
backoff::Error::Permanent(e)
}
_ => e.into(),
})?
.json::<Position>() .json::<Position>()
.await .await
.map_err(backoff::Error::Permanent) .map_err(error_to_backoff)
.map(Some) .map(Some)
}, },
|e, duration: Duration| { |e, duration: Duration| {