Attempt to fix bugs related to empty vecs

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-03-14 22:38:20 +00:00
parent 8202255132
commit 10365745aa
25 changed files with 253 additions and 238 deletions

10
Cargo.lock generated
View File

@@ -1098,6 +1098,15 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nonempty"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "303e8749c804ccd6ca3b428de7fe0d86cb86bc7606bc15291f100fd487960bb8"
dependencies = [
"serde",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
@@ -1316,6 +1325,7 @@ dependencies = [
"lazy_static",
"log",
"log4rs",
"nonempty",
"regex",
"reqwest",
"serde",

View File

@@ -68,3 +68,6 @@ regex = "1.10.3"
async-trait = "0.1.77"
itertools = "0.12.1"
lazy_static = "1.4.0"
nonempty = { version = "0.10.0", features = [
"serialize",
] }

View File

@@ -44,21 +44,25 @@ pub async fn rehydrate_orders(config: &Arc<Config>) {
let mut orders = vec![];
let mut after = OffsetDateTime::UNIX_EPOCH;
while let Some(message) = alpaca::orders::get(
&config.alpaca_client,
&config.alpaca_rate_limiter,
&types::alpaca::api::outgoing::order::Order {
status: Some(types::alpaca::api::outgoing::order::Status::All),
after: Some(after),
..Default::default()
},
None,
&ALPACA_API_BASE,
)
.await
.ok()
.filter(|message| !message.is_empty())
{
loop {
let message = alpaca::orders::get(
&config.alpaca_client,
&config.alpaca_rate_limiter,
&types::alpaca::api::outgoing::order::Order {
status: Some(types::alpaca::api::outgoing::order::Status::All),
after: Some(after),
..Default::default()
},
None,
&ALPACA_API_BASE,
)
.await
.unwrap();
if message.is_empty() {
break;
}
orders.extend(message);
after = orders.last().unwrap().submitted_at;
}

View File

@@ -1,4 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use super::error_to_backoff;
use crate::types::alpaca::api::incoming::account::Account;
use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter;
@@ -22,7 +22,7 @@ pub async fn get(
.await
.map_err(error_to_backoff)?
.error_for_status()
.map_err(status_error_to_backoff)?
.map_err(error_to_backoff)?
.json::<Account>()
.await
.map_err(error_to_backoff)

View File

@@ -1,4 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use super::error_to_backoff;
use crate::types::alpaca::api::{
incoming::asset::{Asset, Class},
outgoing,
@@ -29,7 +29,7 @@ pub async fn get(
.await
.map_err(error_to_backoff)?
.error_for_status()
.map_err(status_error_to_backoff)?
.map_err(error_to_backoff)?
.json::<Vec<Asset>>()
.await
.map_err(error_to_backoff)
@@ -65,7 +65,7 @@ pub async fn get_by_symbol(
.await
.map_err(error_to_backoff)?
.error_for_status()
.map_err(status_error_to_backoff)?
.map_err(error_to_backoff)?
.json::<Asset>()
.await
.map_err(error_to_backoff)
@@ -88,6 +88,10 @@ pub async fn get_by_symbols(
backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Vec<Asset>, Error> {
if symbols.is_empty() {
return Ok(vec![]);
}
if symbols.len() == 1 {
let asset = get_by_symbol(client, rate_limiter, &symbols[0], backoff, api_base).await?;
return Ok(vec![asset]);

View File

@@ -1,4 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use super::error_to_backoff;
use crate::types::alpaca::api::{incoming::bar::Bar, outgoing};
use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter;
@@ -33,7 +33,7 @@ pub async fn get(
.await
.map_err(error_to_backoff)?
.error_for_status()
.map_err(status_error_to_backoff)?
.map_err(error_to_backoff)?
.json::<Message>()
.await
.map_err(error_to_backoff)

View File

@@ -1,4 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use super::error_to_backoff;
use crate::types::alpaca::api::{incoming::calendar::Calendar, outgoing};
use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter;
@@ -24,7 +24,7 @@ pub async fn get(
.await
.map_err(error_to_backoff)?
.error_for_status()
.map_err(status_error_to_backoff)?
.map_err(error_to_backoff)?
.json::<Vec<Calendar>>()
.await
.map_err(error_to_backoff)

View File

@@ -1,4 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use super::error_to_backoff;
use crate::types::alpaca::api::incoming::clock::Clock;
use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter;
@@ -22,7 +22,7 @@ pub async fn get(
.await
.map_err(error_to_backoff)?
.error_for_status()
.map_err(status_error_to_backoff)?
.map_err(error_to_backoff)?
.json::<Clock>()
.await
.map_err(error_to_backoff)

View File

@@ -9,18 +9,13 @@ pub mod positions;
use reqwest::StatusCode;
pub fn status_error_to_backoff(err: reqwest::Error) -> backoff::Error<reqwest::Error> {
match err.status() {
Some(StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND) | None => {
backoff::Error::Permanent(err)
}
_ => err.into(),
}
}
pub fn error_to_backoff(err: reqwest::Error) -> backoff::Error<reqwest::Error> {
if err.is_status() {
return status_error_to_backoff(err);
return match err.status() {
Some(StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND)
| None => backoff::Error::Permanent(err),
_ => err.into(),
};
}
if err.is_builder() || err.is_request() || err.is_redirect() || err.is_decode() || err.is_body()

View File

@@ -1,4 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use super::error_to_backoff;
use crate::types::alpaca::api::{incoming::news::News, outgoing, ALPACA_NEWS_DATA_API_URL};
use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter;
@@ -32,7 +32,7 @@ pub async fn get(
.await
.map_err(error_to_backoff)?
.error_for_status()
.map_err(status_error_to_backoff)?
.map_err(error_to_backoff)?
.json::<Message>()
.await
.map_err(error_to_backoff)

View File

@@ -1,4 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use super::error_to_backoff;
use crate::types::alpaca::{api::outgoing, shared::order};
use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter;
@@ -26,7 +26,7 @@ pub async fn get(
.await
.map_err(error_to_backoff)?
.error_for_status()
.map_err(status_error_to_backoff)?
.map_err(error_to_backoff)?
.json::<Vec<Order>>()
.await
.map_err(error_to_backoff)

View File

@@ -1,4 +1,4 @@
use super::{error_to_backoff, status_error_to_backoff};
use super::error_to_backoff;
use crate::types::alpaca::api::incoming::position::Position;
use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter;
@@ -22,7 +22,7 @@ pub async fn get(
.await
.map_err(error_to_backoff)?
.error_for_status()
.map_err(status_error_to_backoff)?
.map_err(error_to_backoff)?
.json::<Vec<Position>>()
.await
.map_err(error_to_backoff)
@@ -64,7 +64,7 @@ pub async fn get_by_symbol(
response
.error_for_status()
.map_err(status_error_to_backoff)?
.map_err(error_to_backoff)?
.json::<Position>()
.await
.map_err(error_to_backoff)
@@ -88,6 +88,10 @@ pub async fn get_by_symbols(
backoff: Option<ExponentialBackoff>,
api_base: &str,
) -> Result<Vec<Position>, reqwest::Error> {
if symbols.is_empty() {
return Ok(vec![]);
}
if symbols.len() == 1 {
let position = get_by_symbol(client, rate_limiter, &symbols[0], backoff, api_base).await?;
return Ok(position.into_iter().collect());

View File

@@ -6,14 +6,14 @@ use tokio::{sync::Semaphore, try_join};
optimize!("calendar");
pub async fn upsert_batch_and_delete<'a, T>(
pub async fn upsert_batch_and_delete<'a, I>(
client: &Client,
concurrency_limiter: &Arc<Semaphore>,
records: T,
records: I,
) -> Result<(), Error>
where
T: IntoIterator<Item = &'a Calendar> + Send + Sync + Clone,
T::IntoIter: Send,
I: IntoIterator<Item = &'a Calendar> + Send + Sync + Clone,
I::IntoIter: Send,
{
let upsert_future = async {
let mut insert = client.insert("calendar")?;

View File

@@ -92,14 +92,14 @@ macro_rules! upsert {
#[macro_export]
macro_rules! upsert_batch {
($record:ty, $table_name:expr) => {
pub async fn upsert_batch<'a, T>(
pub async fn upsert_batch<'a, I>(
client: &clickhouse::Client,
concurrency_limiter: &std::sync::Arc<tokio::sync::Semaphore>,
records: T,
records: I,
) -> Result<(), clickhouse::error::Error>
where
T: IntoIterator<Item = &'a $record> + Send + Sync,
T::IntoIter: Send,
I: IntoIterator<Item = &'a $record> + Send + Sync,
I::IntoIter: Send,
{
let _ = concurrency_limiter.acquire().await.unwrap();
let mut insert = client.insert($table_name)?;

View File

@@ -1,4 +1,5 @@
use crate::utils::ser;
use nonempty::NonEmpty;
use serde::Serialize;
#[derive(Serialize)]
@@ -6,14 +7,14 @@ use serde::Serialize;
pub enum Market {
#[serde(rename_all = "camelCase")]
UsEquity {
bars: Vec<String>,
updated_bars: Vec<String>,
statuses: Vec<String>,
bars: NonEmpty<String>,
updated_bars: NonEmpty<String>,
statuses: NonEmpty<String>,
},
#[serde(rename_all = "camelCase")]
Crypto {
bars: Vec<String>,
updated_bars: Vec<String>,
bars: NonEmpty<String>,
updated_bars: NonEmpty<String>,
},
}
@@ -23,12 +24,12 @@ pub enum Message {
Market(Market),
News {
#[serde(serialize_with = "ser::remove_slash_from_symbols")]
news: Vec<String>,
news: NonEmpty<String>,
},
}
impl Message {
pub fn new_market_us_equity(symbols: Vec<String>) -> Self {
pub fn new_market_us_equity(symbols: NonEmpty<String>) -> Self {
Self::Market(Market::UsEquity {
bars: symbols.clone(),
updated_bars: symbols.clone(),
@@ -36,14 +37,14 @@ impl Message {
})
}
pub fn new_market_crypto(symbols: Vec<String>) -> Self {
pub fn new_market_crypto(symbols: NonEmpty<String>) -> Self {
Self::Market(Market::Crypto {
bars: symbols.clone(),
updated_bars: symbols,
})
}
pub fn new_news(symbols: Vec<String>) -> Self {
pub fn new_news(symbols: NonEmpty<String>) -> Self {
Self::News { news: symbols }
}
}

View File

@@ -58,12 +58,13 @@ where
}
}
pub fn remove_slash_from_symbols<S>(pairs: &[String], serializer: S) -> Result<S::Ok, S::Error>
pub fn remove_slash_from_symbols<'a, S, I>(pairs: I, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
I: IntoIterator<Item = &'a String>,
{
let symbols = pairs
.iter()
.into_iter()
.map(|pair| remove_slash(pair))
.collect::<Vec<_>>();

View File

@@ -14,6 +14,7 @@ use config::{
use dotenv::dotenv;
use log::info;
use log4rs::config::Deserializers;
use nonempty::NonEmpty;
use qrust::{create_send_await, database};
use tokio::{join, spawn, sync::mpsc, try_join};
@@ -101,12 +102,14 @@ async fn main() {
spawn(threads::clock::run(config.clone(), clock_sender));
create_send_await!(
data_sender,
threads::data::Message::new,
threads::data::Action::Enable,
assets
);
if let Some(assets) = NonEmpty::from_vec(assets) {
create_send_await!(
data_sender,
threads::data::Message::new,
threads::data::Action::Enable,
assets
);
}
routes::run(config, data_sender).await;
}

View File

@@ -4,6 +4,7 @@ use crate::{
};
use axum::{extract::Path, Extension, Json};
use http::StatusCode;
use nonempty::{nonempty, NonEmpty};
use qrust::{
alpaca,
types::{self, Asset},
@@ -113,15 +114,17 @@ pub async fn add(
},
);
create_send_await!(
data_sender,
threads::data::Message::new,
threads::data::Action::Add,
assets.clone()
);
if let Some(assets) = NonEmpty::from_vec(assets.clone()) {
create_send_await!(
data_sender,
threads::data::Message::new,
threads::data::Action::Add,
assets
);
}
Ok((
StatusCode::CREATED,
StatusCode::OK,
Json(AddAssetsResponse {
added: assets.into_iter().map(|asset| asset.0).collect(),
skipped,
@@ -173,7 +176,7 @@ pub async fn add_symbol(
data_sender,
threads::data::Message::new,
threads::data::Action::Add,
vec![(asset.symbol, asset.class.into())]
nonempty![(asset.symbol, asset.class.into())]
);
Ok(StatusCode::CREATED)
@@ -197,7 +200,7 @@ pub async fn delete(
data_sender,
threads::data::Message::new,
threads::data::Action::Remove,
vec![(asset.symbol, asset.class)]
nonempty![(asset.symbol, asset.class)]
);
Ok(StatusCode::NO_CONTENT)

View File

@@ -6,6 +6,7 @@ use crate::{
};
use async_trait::async_trait;
use log::{error, info};
use nonempty::NonEmpty;
use qrust::{
alpaca,
types::{
@@ -98,27 +99,27 @@ impl super::Handler for Handler {
.await
}
async fn queue_backfill(&self, jobs: &HashMap<String, Job>) {
if jobs.is_empty() || *ALPACA_SOURCE == Source::Sip {
async fn queue_backfill(&self, jobs: &NonEmpty<Job>) {
if *ALPACA_SOURCE == Source::Sip {
return;
}
let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap();
let fetch_to = jobs.maximum_by_key(|job| job.fetch_to).fetch_to;
let run_delay = duration_until(fetch_to + FIFTEEN_MINUTES + ONE_MINUTE);
let symbols = jobs.keys().collect::<Vec<_>>();
let symbols = jobs.iter().map(|job| &job.symbol).collect::<Vec<_>>();
info!("Queing bar backfill for {:?} in {:?}.", symbols, run_delay);
sleep(run_delay).await;
}
async fn backfill(&self, jobs: HashMap<String, Job>) {
if jobs.is_empty() {
return;
}
let symbols = jobs.keys().cloned().collect::<Vec<_>>();
let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap();
let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap();
async fn backfill(&self, jobs: NonEmpty<Job>) {
let symbols = Vec::from(jobs.clone().map(|job| job.symbol));
let fetch_from = jobs.minimum_by_key(|job| job.fetch_from).fetch_from;
let fetch_to = jobs.maximum_by_key(|job| job.fetch_to).fetch_to;
let freshness = jobs
.into_iter()
.map(|job| (job.symbol, job.fresh))
.collect::<HashMap<_, _>>();
let mut bars = Vec::with_capacity(*CLICKHOUSE_BATCH_BARS_SIZE);
let mut last_times = HashMap::new();
@@ -173,7 +174,7 @@ impl super::Handler for Handler {
let backfilled = last_times
.drain()
.map(|(symbol, time)| Backfill {
fresh: jobs[&symbol].fresh,
fresh: freshness[&symbol],
symbol,
time,
})

View File

@@ -4,6 +4,7 @@ pub mod news;
use async_trait::async_trait;
use itertools::Itertools;
use log::{info, warn};
use nonempty::{nonempty, NonEmpty};
use qrust::{
types::Backfill,
utils::{last_minute, ONE_SECOND},
@@ -25,12 +26,12 @@ pub enum Action {
pub struct Message {
pub action: Action,
pub symbols: Vec<String>,
pub symbols: NonEmpty<String>,
pub response: oneshot::Sender<()>,
}
impl Message {
pub fn new(action: Action, symbols: Vec<String>) -> (Self, oneshot::Receiver<()>) {
pub fn new(action: Action, symbols: NonEmpty<String>) -> (Self, oneshot::Receiver<()>) {
let (sender, receiver) = oneshot::channel::<()>();
(
Self {
@@ -45,6 +46,7 @@ impl Message {
#[derive(Clone)]
pub struct Job {
pub symbol: String,
pub fetch_from: OffsetDateTime,
pub fetch_to: OffsetDateTime,
pub fresh: bool,
@@ -58,8 +60,8 @@ pub trait Handler: Send + Sync {
) -> Result<Vec<Backfill>, clickhouse::error::Error>;
async fn delete_backfills(&self, symbol: &[String]) -> Result<(), clickhouse::error::Error>;
async fn delete_data(&self, symbol: &[String]) -> Result<(), clickhouse::error::Error>;
async fn queue_backfill(&self, jobs: &HashMap<String, Job>);
async fn backfill(&self, jobs: HashMap<String, Job>);
async fn queue_backfill(&self, jobs: &NonEmpty<Job>);
async fn backfill(&self, jobs: NonEmpty<Job>);
fn max_limit(&self) -> i64;
fn log_string(&self) -> &'static str;
}
@@ -108,7 +110,7 @@ pub async fn run(handler: Arc<Box<dyn Handler>>, mut receiver: mpsc::Receiver<Me
loop {
let message = receiver.recv().await.unwrap();
spawn(handle_backfill_message(
spawn(handle_message(
handler.clone(),
backfill_jobs.clone(),
message,
@@ -116,13 +118,14 @@ pub async fn run(handler: Arc<Box<dyn Handler>>, mut receiver: mpsc::Receiver<Me
}
}
async fn handle_backfill_message(
async fn handle_message(
handler: Arc<Box<dyn Handler>>,
backfill_jobs: Arc<Mutex<Jobs>>,
message: Message,
) {
let backfill_jobs_clone = backfill_jobs.clone();
let mut backfill_jobs = backfill_jobs.lock().await;
let symbols = Vec::from(message.symbols);
match message.action {
Action::Backfill => {
@@ -130,16 +133,16 @@ async fn handle_backfill_message(
let max_limit = handler.max_limit();
let backfills = handler
.select_latest_backfills(&message.symbols)
.select_latest_backfills(&symbols)
.await
.unwrap()
.into_iter()
.map(|backfill| (backfill.symbol.clone(), backfill))
.collect::<HashMap<_, _>>();
let mut jobs = Vec::with_capacity(message.symbols.len());
let mut jobs = Vec::with_capacity(symbols.len());
for symbol in message.symbols {
for symbol in symbols {
if backfill_jobs.contains_key(&symbol) {
warn!(
"Backfill for {} {} is already running, skipping.",
@@ -148,11 +151,11 @@ async fn handle_backfill_message(
continue;
}
let fetch_from = backfills
.get(&symbol)
.map_or(OffsetDateTime::UNIX_EPOCH, |backfill| {
backfill.time + ONE_SECOND
});
let backfill = backfills.get(&symbol);
let fetch_from = backfill.map_or(OffsetDateTime::UNIX_EPOCH, |backfill| {
backfill.time + ONE_SECOND
});
let fetch_to = last_minute();
@@ -161,50 +164,42 @@ async fn handle_backfill_message(
return;
}
let fresh = backfills
.get(&symbol)
.map_or(false, |backfill| backfill.fresh);
let fresh = backfill.map_or(false, |backfill| backfill.fresh);
jobs.push((
jobs.push(Job {
symbol,
Job {
fetch_from,
fetch_to,
fresh,
},
));
}
if jobs.is_empty() {
return;
fetch_from,
fetch_to,
fresh,
});
}
let jobs = jobs
.into_iter()
.sorted_unstable_by_key(|job| job.1.fetch_from)
.sorted_unstable_by_key(|job| job.fetch_from)
.collect::<Vec<_>>();
let mut job_groups = vec![HashMap::new()];
let mut job_groups: Vec<NonEmpty<Job>> = vec![];
let mut current_minutes = 0;
for job in jobs {
let minutes = (job.1.fetch_to - job.1.fetch_from).whole_minutes();
let minutes = (job.fetch_to - job.fetch_from).whole_minutes();
if job_groups.last().unwrap().is_empty() || (current_minutes + minutes) <= max_limit
{
if job_groups.last().is_some() && current_minutes + minutes <= max_limit {
let job_group = job_groups.last_mut().unwrap();
job_group.insert(job.0, job.1);
job_group.push(job);
current_minutes += minutes;
} else {
let mut job_group = HashMap::new();
job_group.insert(job.0, job.1);
job_groups.push(job_group);
job_groups.push(nonempty![job]);
current_minutes = minutes;
}
}
for job_group in job_groups {
let symbols = job_group.keys().cloned().collect::<Vec<_>>();
let symbols = job_group
.iter()
.map(|job| job.symbol.clone())
.collect::<Vec<_>>();
let handler = handler.clone();
let symbols_clone = symbols.clone();
@@ -220,7 +215,7 @@ async fn handle_backfill_message(
}
}
Action::Purge => {
for symbol in &message.symbols {
for symbol in &symbols {
if let Some(job) = backfill_jobs.remove(symbol) {
job.abort();
let _ = job.await;
@@ -228,8 +223,8 @@ async fn handle_backfill_message(
}
try_join!(
handler.delete_backfills(&message.symbols),
handler.delete_data(&message.symbols)
handler.delete_backfills(&symbols),
handler.delete_data(&symbols)
)
.unwrap();
}

View File

@@ -5,6 +5,7 @@ use crate::{
};
use async_trait::async_trait;
use log::{error, info};
use nonempty::NonEmpty;
use qrust::{
alpaca,
types::{
@@ -56,14 +57,14 @@ impl super::Handler for Handler {
.await
}
async fn queue_backfill(&self, jobs: &HashMap<String, Job>) {
if jobs.is_empty() || *ALPACA_SOURCE == Source::Sip {
async fn queue_backfill(&self, jobs: &NonEmpty<Job>) {
if *ALPACA_SOURCE == Source::Sip {
return;
}
let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap();
let fetch_to = jobs.maximum_by_key(|job| job.fetch_to).fetch_to;
let run_delay = duration_until(fetch_to + FIFTEEN_MINUTES + ONE_MINUTE);
let symbols = jobs.keys().cloned().collect::<Vec<_>>();
let symbols = jobs.iter().map(|job| &job.symbol).collect::<Vec<_>>();
info!("Queing news backfill for {:?} in {:?}.", symbols, run_delay);
sleep(run_delay).await;
@@ -71,15 +72,15 @@ impl super::Handler for Handler {
#[allow(clippy::too_many_lines)]
#[allow(clippy::iter_with_drain)]
async fn backfill(&self, jobs: HashMap<String, Job>) {
if jobs.is_empty() {
return;
}
let symbols = jobs.keys().cloned().collect::<Vec<_>>();
async fn backfill(&self, jobs: NonEmpty<Job>) {
let symbols = Vec::from(jobs.clone().map(|job| job.symbol));
let symbols_set = symbols.clone().into_iter().collect::<HashSet<_>>();
let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap();
let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap();
let fetch_from = jobs.minimum_by_key(|job| job.fetch_from).fetch_from;
let fetch_to = jobs.maximum_by_key(|job| job.fetch_to).fetch_to;
let freshness = jobs
.into_iter()
.map(|job| (job.symbol, job.fresh))
.collect::<HashMap<_, _>>();
let mut news = Vec::with_capacity(*CLICKHOUSE_BATCH_NEWS_SIZE);
let mut last_times = HashMap::new();
@@ -137,7 +138,7 @@ impl super::Handler for Handler {
let backfilled = last_times
.drain()
.map(|(symbol, time)| Backfill {
fresh: jobs[&symbol].fresh,
fresh: freshness[&symbol],
symbol,
time,
})

View File

@@ -8,6 +8,7 @@ use crate::{
};
use itertools::{Either, Itertools};
use log::error;
use nonempty::NonEmpty;
use qrust::{
alpaca,
types::{
@@ -35,12 +36,12 @@ pub enum Action {
pub struct Message {
pub action: Action,
pub assets: Vec<(String, Class)>,
pub assets: NonEmpty<(String, Class)>,
pub response: oneshot::Sender<()>,
}
impl Message {
pub fn new(action: Action, assets: Vec<(String, Class)>) -> (Self, oneshot::Receiver<()>) {
pub fn new(action: Action, assets: NonEmpty<(String, Class)>) -> (Self, oneshot::Receiver<()>) {
let (sender, receiver) = oneshot::channel();
(
Self {
@@ -150,11 +151,6 @@ async fn handle_message(
news_backfill_sender: mpsc::Sender<backfill::Message>,
message: Message,
) {
if message.assets.is_empty() {
message.response.send(()).unwrap();
return;
}
let (us_equity_symbols, crypto_symbols): (Vec<_>, Vec<_>) = message
.assets
.clone()
@@ -164,36 +160,28 @@ async fn handle_message(
Class::Crypto => Either::Right(asset.0),
});
let symbols = message
.assets
.into_iter()
.map(|(symbol, _)| symbol)
.collect::<Vec<_>>();
let symbols = message.assets.map(|(symbol, _)| symbol);
let bars_us_equity_future = async {
if us_equity_symbols.is_empty() {
return;
if let Some(us_equity_symbols) = NonEmpty::from_vec(us_equity_symbols.clone()) {
create_send_await!(
bars_us_equity_websocket_sender,
websocket::Message::new,
message.action.into(),
us_equity_symbols
);
}
create_send_await!(
bars_us_equity_websocket_sender,
websocket::Message::new,
message.action.into(),
us_equity_symbols.clone()
);
};
let bars_crypto_future = async {
if crypto_symbols.is_empty() {
return;
if let Some(crypto_symbols) = NonEmpty::from_vec(crypto_symbols.clone()) {
create_send_await!(
bars_crypto_websocket_sender,
websocket::Message::new,
message.action.into(),
crypto_symbols
);
}
create_send_await!(
bars_crypto_websocket_sender,
websocket::Message::new,
message.action.into(),
crypto_symbols.clone()
);
};
let news_future = async {
@@ -207,8 +195,15 @@ async fn handle_message(
join!(bars_us_equity_future, bars_crypto_future, news_future);
if message.action == Action::Disable {
message.response.send(()).unwrap();
return;
}
match message.action {
Action::Add => {
Action::Add | Action::Enable => {
let symbols = Vec::from(symbols.clone());
let assets = async {
alpaca::assets::get_by_symbols(
&config.alpaca_client,
@@ -264,51 +259,42 @@ async fn handle_message(
database::assets::delete_where_symbols(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter,
&symbols,
&Vec::from(symbols.clone()),
)
.await
.unwrap();
}
_ => {}
}
if message.action == Action::Disable {
message.response.send(()).unwrap();
return;
Action::Disable => unreachable!(),
}
let bars_us_equity_future = async {
if us_equity_symbols.is_empty() {
return;
if let Some(us_equity_symbols) = NonEmpty::from_vec(us_equity_symbols) {
create_send_await!(
bars_us_equity_backfill_sender,
backfill::Message::new,
match message.action {
Action::Add | Action::Enable => backfill::Action::Backfill,
Action::Remove => backfill::Action::Purge,
Action::Disable => unreachable!(),
},
us_equity_symbols
);
}
create_send_await!(
bars_us_equity_backfill_sender,
backfill::Message::new,
match message.action {
Action::Add | Action::Enable => backfill::Action::Backfill,
Action::Remove => backfill::Action::Purge,
Action::Disable => unreachable!(),
},
us_equity_symbols
);
};
let bars_crypto_future = async {
if crypto_symbols.is_empty() {
return;
if let Some(crypto_symbols) = NonEmpty::from_vec(crypto_symbols) {
create_send_await!(
bars_crypto_backfill_sender,
backfill::Message::new,
match message.action {
Action::Add | Action::Enable => backfill::Action::Backfill,
Action::Remove => backfill::Action::Purge,
Action::Disable => unreachable!(),
},
crypto_symbols
);
}
create_send_await!(
bars_crypto_backfill_sender,
backfill::Message::new,
match message.action {
Action::Add | Action::Enable => backfill::Action::Backfill,
Action::Remove => backfill::Action::Purge,
Action::Disable => unreachable!(),
},
crypto_symbols
);
};
let news_future = async {
@@ -363,30 +349,36 @@ async fn handle_clock_message(
.collect::<Vec<_>>();
let bars_us_equity_future = async {
create_send_await!(
bars_us_equity_backfill_sender,
backfill::Message::new,
backfill::Action::Backfill,
us_equity_symbols
);
if let Some(us_equity_symbols) = NonEmpty::from_vec(us_equity_symbols) {
create_send_await!(
bars_us_equity_backfill_sender,
backfill::Message::new,
backfill::Action::Backfill,
us_equity_symbols
);
}
};
let bars_crypto_future = async {
create_send_await!(
bars_crypto_backfill_sender,
backfill::Message::new,
backfill::Action::Backfill,
crypto_symbols
);
if let Some(crypto_symbols) = NonEmpty::from_vec(crypto_symbols) {
create_send_await!(
bars_crypto_backfill_sender,
backfill::Message::new,
backfill::Action::Backfill,
crypto_symbols
);
}
};
let news_future = async {
create_send_await!(
news_backfill_sender,
backfill::Message::new,
backfill::Action::Backfill,
symbols
);
if let Some(symbols) = NonEmpty::from_vec(symbols) {
create_send_await!(
news_backfill_sender,
backfill::Message::new,
backfill::Action::Backfill,
symbols
);
}
};
join!(bars_us_equity_future, bars_crypto_future, news_future);

View File

@@ -7,6 +7,7 @@ use crate::{
use async_trait::async_trait;
use clickhouse::inserter::Inserter;
use log::{debug, error, info};
use nonempty::NonEmpty;
use qrust::{
types::{alpaca::websocket, Bar, Class},
utils::ONE_SECOND,
@@ -21,14 +22,14 @@ pub struct Handler {
pub config: Arc<Config>,
pub inserter: Arc<Mutex<Inserter<Bar>>>,
pub subscription_message_constructor:
fn(Vec<String>) -> websocket::data::outgoing::subscribe::Message,
fn(NonEmpty<String>) -> websocket::data::outgoing::subscribe::Message,
}
#[async_trait]
impl super::Handler for Handler {
fn create_subscription_message(
&self,
symbols: Vec<String>,
symbols: NonEmpty<String>,
) -> websocket::data::outgoing::subscribe::Message {
(self.subscription_message_constructor)(symbols)
}

View File

@@ -7,6 +7,7 @@ use backoff::{future::retry_notify, ExponentialBackoff};
use clickhouse::{inserter::Inserter, Row};
use futures_util::{future::join_all, SinkExt, StreamExt};
use log::error;
use nonempty::NonEmpty;
use qrust::types::alpaca::{self, websocket};
use serde::Serialize;
use serde_json::{from_str, to_string};
@@ -38,12 +39,12 @@ impl From<super::Action> for Option<Action> {
pub struct Message {
pub action: Option<Action>,
pub symbols: Vec<String>,
pub symbols: NonEmpty<String>,
pub response: oneshot::Sender<()>,
}
impl Message {
pub fn new(action: Option<Action>, symbols: Vec<String>) -> (Self, oneshot::Receiver<()>) {
pub fn new(action: Option<Action>, symbols: NonEmpty<String>) -> (Self, oneshot::Receiver<()>) {
let (sender, receiver) = oneshot::channel();
(
Self {
@@ -66,7 +67,7 @@ pub struct State {
pub trait Handler: Send + Sync + 'static {
fn create_subscription_message(
&self,
symbols: Vec<String>,
symbols: NonEmpty<String>,
) -> websocket::data::outgoing::subscribe::Message;
async fn handle_websocket_message(
&self,
@@ -206,7 +207,7 @@ async fn run_connection(
drop(state);
if !pending_subscriptions.is_empty() {
if let Some(pending_subscriptions) = NonEmpty::from_vec(pending_subscriptions) {
if let Err(err) = sink
.send(tungstenite::Message::Text(
to_string(&websocket::data::outgoing::Message::Subscribe(
@@ -277,11 +278,6 @@ async fn handle_message(
sink_sender: mpsc::Sender<tungstenite::Message>,
message: Message,
) {
if message.symbols.is_empty() {
message.response.send(()).unwrap();
return;
}
match message.action {
Some(Action::Subscribe) => {
let (pending_subscriptions, receivers) = message

View File

@@ -3,6 +3,7 @@ use crate::config::{Config, CLICKHOUSE_BATCH_NEWS_SIZE};
use async_trait::async_trait;
use clickhouse::inserter::Inserter;
use log::{debug, error, info};
use nonempty::NonEmpty;
use qrust::{
types::{alpaca::websocket, News},
utils::ONE_SECOND,
@@ -19,7 +20,7 @@ pub struct Handler {
impl super::Handler for Handler {
fn create_subscription_message(
&self,
symbols: Vec<String>,
symbols: NonEmpty<String>,
) -> websocket::data::outgoing::subscribe::Message {
websocket::data::outgoing::subscribe::Message::new_news(symbols)
}