Compare commits

10 Commits

Author SHA1 Message Date
90b7f10a77 Update and fix bugs
It's good to be back

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2024-05-10 17:49:16 +01:00
d7e9350257 Add initial ML implementation
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2024-03-29 13:40:49 +00:00
f715881b07 Remove unsafe blocks
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2024-03-26 09:51:52 +00:00
d0ad9f65b1 Remove vwap
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2024-03-26 09:51:16 +00:00
ce8c4db422 Remove local XKCD image
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2024-03-26 09:50:39 +00:00
46508d1b4f Update reqwest
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2024-03-20 20:00:35 +00:00
2ad42c5462 Foldify for loops
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2024-03-20 19:43:48 +00:00
733e6373e9 Add tests
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2024-03-20 19:43:47 +00:00
d072b849c0 Reorganize crate source code
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2024-03-20 19:43:46 +00:00
718e794f51 Reorder Bar struct
Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
2024-03-19 15:48:45 +00:00
114 changed files with 5106 additions and 392 deletions

View File

@@ -22,15 +22,15 @@ build:
cache:
<<: *global_cache
script:
- cargo +nightly build
- cargo +nightly build --workspace
# test:
# image: registry.karaolidis.com/karaolidis/qrust/rust
# stage: test
# cache:
# <<: *global_cache
# script:
# - cargo +nightly test
test:
image: registry.karaolidis.com/karaolidis/qrust/rust
stage: test
cache:
<<: *global_cache
script:
- cargo +nightly test --workspace
lint:
image: registry.karaolidis.com/karaolidis/qrust/rust
@@ -39,7 +39,7 @@ lint:
<<: *global_cache
script:
- cargo +nightly fmt --all -- --check
- cargo +nightly clippy --all-targets --all-features
- cargo +nightly clippy --workspace --all-targets --all-features
depcheck:
image: registry.karaolidis.com/karaolidis/qrust/rust
@@ -48,7 +48,7 @@ depcheck:
<<: *global_cache
script:
- cargo +nightly outdated
- cargo +nightly udeps --all-targets
- cargo +nightly udeps --workspace --all-targets
build-release:
image: registry.karaolidis.com/karaolidis/qrust/rust

2921
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,11 +5,15 @@ edition = "2021"
[lib]
name = "qrust"
path = "src/lib/mod.rs"
path = "src/lib/qrust/mod.rs"
[[bin]]
name = "qrust"
path = "src/main.rs"
path = "src/bin/qrust/mod.rs"
[[bin]]
name = "trainer"
path = "src/bin/trainer/mod.rs"
[profile.release]
panic = 'abort'
@@ -20,9 +24,9 @@ codegen-units = 1
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
axum = "0.7.4"
axum = "0.7.5"
dotenv = "0.15.0"
tokio = { version = "1.32.0", features = [
tokio = { version = "1.37.0", features = [
"macros",
"rt-multi-thread",
] }
@@ -30,30 +34,29 @@ tokio-tungstenite = { version = "0.21.0", features = [
"tokio-native-tls",
"native-tls",
] }
log = "0.4.20"
log4rs = "1.2.0"
serde = "1.0.188"
serde_json = "1.0.105"
serde_repr = "0.1.18"
serde_with = "3.6.1"
serde-aux = "4.4.0"
futures-util = "0.3.28"
reqwest = { version = "0.11.20", features = [
log = "0.4.21"
log4rs = "1.3.0"
serde = "1.0.201"
serde_json = "1.0.117"
serde_repr = "0.1.19"
serde_with = "3.8.1"
serde-aux = "4.5.0"
futures-util = "0.3.30"
reqwest = { version = "0.12.4", features = [
"json",
"serde_json",
] }
http = "1.0.0"
governor = "0.6.0"
http = "1.1.0"
governor = "0.6.3"
clickhouse = { version = "0.11.6", features = [
"watch",
"time",
"uuid",
] }
uuid = { version = "1.6.1", features = [
uuid = { version = "1.8.0", features = [
"serde",
"v4",
] }
time = { version = "0.3.31", features = [
time = { version = "0.3.36", features = [
"serde",
"serde-well-known",
"serde-human-readable",
@@ -64,10 +67,22 @@ time = { version = "0.3.31", features = [
backoff = { version = "0.4.0", features = [
"tokio",
] }
regex = "1.10.3"
async-trait = "0.1.77"
regex = "1.10.4"
async-trait = "0.1.80"
itertools = "0.12.1"
lazy_static = "1.4.0"
nonempty = { version = "0.10.0", features = [
"serialize",
] }
rand = "0.8.5"
rayon = "1.10.0"
burn = { version = "0.13.2", features = [
"wgpu",
"cuda",
"tui",
"metrics",
"train",
] }
[dev-dependencies]
serde_test = "1.0.176"

View File

@@ -1,5 +1,5 @@
# qrust
![XKCD - Engineer Syllogism](./static/engineer-syllogism.png)
![XKCD - Engineer Syllogism](https://imgs.xkcd.com/comics/engineer_syllogism.png)
`qrust` (/kɹʌst/, QuantitativeRust) is an algorithmic trading library written in Rust.

View File

@@ -65,8 +65,8 @@ impl Config {
.build()
.unwrap(),
alpaca_rate_limiter: RateLimiter::direct(Quota::per_minute(match *ALPACA_SOURCE {
Source::Iex => unsafe { NonZeroU32::new_unchecked(200) },
Source::Sip => unsafe { NonZeroU32::new_unchecked(10_000) },
Source::Iex => NonZeroU32::new(200).unwrap(),
Source::Sip => NonZeroU32::new(10_000).unwrap(),
Source::Otc => unimplemented!("OTC rate limit not implemented."),
})),
clickhouse_client: clickhouse::Client::default()

View File

@@ -81,12 +81,7 @@ pub async fn add(
&ALPACA_API_BASE,
)
.await
.map_err(|e| {
e.status()
.map_or(StatusCode::INTERNAL_SERVER_ERROR, |status| {
StatusCode::from_u16(status.as_u16()).unwrap()
})
})?
.map_err(|e| e.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR))?
.into_iter()
.map(|asset| (asset.symbol.clone(), asset))
.collect::<HashMap<_, _>>();
@@ -158,12 +153,7 @@ pub async fn add_symbol(
&ALPACA_API_BASE,
)
.await
.map_err(|e| {
e.status()
.map_or(StatusCode::INTERNAL_SERVER_ERROR, |status| {
StatusCode::from_u16(status.as_u16()).unwrap()
})
})?;
.map_err(|e| e.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR))?;
if asset.status != types::alpaca::api::incoming::asset::Status::Active
|| !asset.tradable

View File

@@ -9,9 +9,9 @@ use qrust::{
utils::{backoff, duration_until},
};
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::{join, sync::mpsc, time::sleep};
#[derive(PartialEq, Eq)]
pub enum Status {
Open,
Closed,
@@ -19,21 +19,16 @@ pub enum Status {
pub struct Message {
pub status: Status,
pub next_switch: OffsetDateTime,
}
impl From<types::alpaca::api::incoming::clock::Clock> for Message {
fn from(clock: types::alpaca::api::incoming::clock::Clock) -> Self {
if clock.is_open {
Self {
status: Status::Open,
next_switch: clock.next_close,
}
} else {
Self {
status: Status::Closed,
next_switch: clock.next_open,
}
Self {
status: if clock.is_open {
Status::Open
} else {
Status::Closed
},
}
}
}

View File

@@ -178,26 +178,25 @@ async fn handle_message(
});
}
let jobs = jobs
let mut current_minutes = 0;
let job_groups = jobs
.into_iter()
.sorted_unstable_by_key(|job| job.fetch_from)
.collect::<Vec<_>>();
.fold(Vec::<NonEmpty<Job>>::new(), |mut job_groups, job| {
let minutes = (job.fetch_to - job.fetch_from).whole_minutes();
let mut job_groups: Vec<NonEmpty<Job>> = vec![];
let mut current_minutes = 0;
if let Some(job_group) = job_groups.last_mut() {
if current_minutes + minutes <= max_limit {
job_group.push(job);
current_minutes += minutes;
return job_groups;
}
}
for job in jobs {
let minutes = (job.fetch_to - job.fetch_from).whole_minutes();
if job_groups.last().is_some() && current_minutes + minutes <= max_limit {
let job_group = job_groups.last_mut().unwrap();
job_group.push(job);
current_minutes += minutes;
} else {
job_groups.push(nonempty![job]);
current_minutes = minutes;
}
}
job_groups
});
for job_group in job_groups {
let symbols = job_group

View File

@@ -86,12 +86,13 @@ pub async fn run(
message,
));
}
Some(_) = clock_receiver.recv() => {
Some(message) = clock_receiver.recv() => {
spawn(handle_clock_message(
config.clone(),
bars_us_equity_backfill_sender.clone(),
bars_crypto_backfill_sender.clone(),
news_backfill_sender.clone(),
message,
));
}
else => panic!("Communication channel unexpectedly closed.")
@@ -125,7 +126,7 @@ fn init_thread(
let websocket_handler = match thread_type {
ThreadType::Bars(_) => websocket::bars::create_handler(config, thread_type),
ThreadType::News => websocket::news::create_handler(config),
ThreadType::News => websocket::news::create_handler(&config),
};
let (websocket_sender, websocket_receiver) = mpsc::channel(100);
@@ -236,16 +237,18 @@ async fn handle_message(
let (mut assets, mut positions) = join!(assets, positions);
let mut batch = Vec::with_capacity(symbols.len());
for symbol in &symbols {
if let Some(asset) = assets.remove(symbol) {
let position = positions.remove(symbol);
batch.push(Asset::from((asset, position)));
} else {
error!("Failed to find asset for symbol: {}.", symbol);
}
}
let batch =
symbols
.iter()
.fold(Vec::with_capacity(symbols.len()), |mut batch, symbol| {
if let Some(asset) = assets.remove(symbol) {
let position = positions.remove(symbol);
batch.push(Asset::from((asset, position)));
} else {
error!("Failed to find asset for symbol: {}.", symbol);
}
batch
});
database::assets::upsert_batch(
&config.clickhouse_client,
@@ -320,13 +323,16 @@ async fn handle_clock_message(
bars_us_equity_backfill_sender: mpsc::Sender<backfill::Message>,
bars_crypto_backfill_sender: mpsc::Sender<backfill::Message>,
news_backfill_sender: mpsc::Sender<backfill::Message>,
message: clock::Message,
) {
database::cleanup_all(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter,
)
.await
.unwrap();
if message.status == clock::Status::Closed {
database::cleanup_all(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter,
)
.await
.unwrap();
}
let assets = database::assets::select(
&config.clickhouse_client,

View File

@@ -8,11 +8,13 @@ use qrust::{
types::{alpaca::websocket, News},
utils::ONE_SECOND,
};
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::sync::{Mutex, RwLock};
pub struct Handler {
pub config: Arc<Config>,
pub inserter: Arc<Mutex<Inserter<News>>>,
}
@@ -38,6 +40,7 @@ impl super::Handler for Handler {
unreachable!()
};
let symbols = symbols.into_iter().collect::<HashSet<_>>();
let mut state = state.write().await;
let newly_subscribed = state
@@ -102,7 +105,7 @@ impl super::Handler for Handler {
}
}
pub fn create_handler(config: Arc<Config>) -> Box<dyn super::Handler> {
pub fn create_handler(config: &Arc<Config>) -> Box<dyn super::Handler> {
let inserter = Arc::new(Mutex::new(
config
.clickhouse_client
@@ -112,5 +115,5 @@ pub fn create_handler(config: Arc<Config>) -> Box<dyn super::Handler> {
.with_max_entries((*CLICKHOUSE_BATCH_NEWS_SIZE).try_into().unwrap()),
));
Box::new(Handler { config, inserter })
Box::new(Handler { inserter })
}

133
src/bin/trainer/mod.rs Normal file
View File

@@ -0,0 +1,133 @@
#![warn(clippy::all, clippy::pedantic, clippy::nursery)]
use burn::{
config::Config,
data::{
dataloader::{DataLoaderBuilder, Dataset},
dataset::transform::{PartialDataset, ShuffledDataset},
},
module::Module,
optim::AdamConfig,
record::CompactRecorder,
tensor::backend::AutodiffBackend,
train::LearnerBuilder,
};
use dotenv::dotenv;
use log::info;
use qrust::{
database,
ml::{
BarWindow, BarWindowBatcher, ModelConfig, MultipleSymbolDataset, MyAutodiffBackend, DEVICE,
},
types::Bar,
};
use std::{env, fs, path::Path, sync::Arc};
use tokio::sync::Semaphore;
#[derive(Config)]
pub struct TrainingConfig {
pub model: ModelConfig,
pub optimizer: AdamConfig,
#[config(default = 100)]
pub epochs: usize,
#[config(default = 256)]
pub batch_size: usize,
#[config(default = 16)]
pub num_workers: usize,
#[config(default = 0)]
pub seed: u64,
#[config(default = 0.2)]
pub valid_pct: f64,
#[config(default = 1.0e-4)]
pub learning_rate: f64,
}
#[tokio::main]
async fn main() {
dotenv().ok();
let dir = Path::new(file!()).parent().unwrap();
let model_config = ModelConfig::new();
let optimizer = AdamConfig::new();
let training_config = TrainingConfig::new(model_config, optimizer);
let clickhouse_client = clickhouse::Client::default()
.with_url(env::var("CLICKHOUSE_URL").expect("CLICKHOUSE_URL must be set."))
.with_user(env::var("CLICKHOUSE_USER").expect("CLICKHOUSE_USER must be set."))
.with_password(env::var("CLICKHOUSE_PASSWORD").expect("CLICKHOUSE_PASSWORD must be set."))
.with_database(env::var("CLICKHOUSE_DB").expect("CLICKHOUSE_DB must be set."));
let clickhouse_concurrency_limiter = Arc::new(Semaphore::new(Semaphore::MAX_PERMITS));
let bars = database::ta::select(&clickhouse_client, &clickhouse_concurrency_limiter)
.await
.unwrap();
info!("Loaded {} bars.", bars.len());
train::<MyAutodiffBackend>(
bars,
&training_config,
dir.join("artifacts").to_str().unwrap(),
&DEVICE,
);
}
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_sign_loss)]
#[allow(clippy::cast_precision_loss)]
fn train<B: AutodiffBackend<FloatElem = f32, IntElem = i32>>(
bars: Vec<Bar>,
config: &TrainingConfig,
dir: &str,
device: &B::Device,
) {
B::seed(config.seed);
fs::create_dir_all(dir).unwrap();
let dataset = MultipleSymbolDataset::new(bars);
let dataset = ShuffledDataset::with_seed(dataset, config.seed);
let dataset = Arc::new(dataset);
let split = (dataset.len() as f64 * (1.0 - config.valid_pct)) as usize;
let train: PartialDataset<Arc<ShuffledDataset<MultipleSymbolDataset, BarWindow>>, BarWindow> =
PartialDataset::new(dataset.clone(), 0, split);
let batcher_train = BarWindowBatcher::<B> {
device: device.clone(),
};
let dataloader_train = DataLoaderBuilder::new(batcher_train)
.batch_size(config.batch_size)
.num_workers(config.num_workers)
.build(train);
let valid: PartialDataset<Arc<ShuffledDataset<MultipleSymbolDataset, BarWindow>>, BarWindow> =
PartialDataset::new(dataset.clone(), split, dataset.len());
let batcher_valid = BarWindowBatcher::<B::InnerBackend> {
device: device.clone(),
};
let dataloader_valid = DataLoaderBuilder::new(batcher_valid)
.batch_size(config.batch_size)
.num_workers(config.num_workers)
.build(valid);
let learner = LearnerBuilder::new(dir)
.with_file_checkpointer(CompactRecorder::new())
.devices(vec![device.clone()])
.num_epochs(config.epochs)
.build(
config.model.init::<B>(device),
config.optimizer.init(),
config.learning_rate,
);
let trained = learner.fit(dataloader_train, dataloader_valid);
trained.save_file(dir, &CompactRecorder::new()).unwrap();
}

View File

@@ -7,7 +7,7 @@ pub mod news;
pub mod orders;
pub mod positions;
use reqwest::StatusCode;
use http::StatusCode;
pub fn error_to_backoff(err: reqwest::Error) -> backoff::Error<reqwest::Error> {
if err.is_status() {

View File

@@ -2,6 +2,7 @@ use super::error_to_backoff;
use crate::types::alpaca::api::incoming::position::Position;
use backoff::{future::retry_notify, ExponentialBackoff};
use governor::DefaultDirectRateLimiter;
use http::StatusCode;
use log::warn;
use reqwest::Client;
use std::{collections::HashSet, time::Duration};
@@ -58,7 +59,7 @@ pub async fn get_by_symbol(
.await
.map_err(error_to_backoff)?;
if response.status() == reqwest::StatusCode::NOT_FOUND {
if response.status() == StatusCode::NOT_FOUND {
return Ok(None);
}

View File

@@ -5,6 +5,7 @@ pub mod bars;
pub mod calendar;
pub mod news;
pub mod orders;
pub mod ta;
use clickhouse::{error::Error, Client};
use tokio::try_join;

View File

@@ -0,0 +1,30 @@
use crate::types::Bar;
use clickhouse::{error::Error, Client};
use std::sync::Arc;
use tokio::sync::Semaphore;
pub async fn select(
client: &Client,
concurrency_limiter: &Arc<Semaphore>,
) -> Result<Vec<Bar>, Error> {
let _ = concurrency_limiter.acquire().await.unwrap();
client
.query(
"
SELECT symbol,
toStartOfHour(bars.time) AS time,
any(bars.open) AS open,
max(bars.high) AS high,
min(bars.low) AS low,
anyLast(bars.close) AS close,
sum(bars.volume) AS volume,
sum(bars.trades) AS trades
FROM bars FINAL
GROUP BY ALL
ORDER BY symbol,
time
",
)
.fetch_all::<Bar>()
.await
}

View File

@@ -0,0 +1,75 @@
use super::BarWindow;
use burn::{
data::dataloader::batcher::Batcher,
tensor::{self, backend::Backend, Tensor},
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
#[derive(Clone, Debug)]
pub struct BarWindowBatcher<B: Backend> {
pub device: B::Device,
}
#[derive(Clone, Debug)]
pub struct BarWindowBatch<B: Backend> {
pub hour_tensor: Tensor<B, 2, tensor::Int>,
pub day_tensor: Tensor<B, 2, tensor::Int>,
pub numerical_tensor: Tensor<B, 3>,
pub target_tensor: Tensor<B, 2>,
}
impl<B: Backend<FloatElem = f32, IntElem = i32>> Batcher<BarWindow, BarWindowBatch<B>>
for BarWindowBatcher<B>
{
fn batch(&self, items: Vec<BarWindow>) -> BarWindowBatch<B> {
let batch_size = items.len();
let (hour_tensors, day_tensors, numerical_tensors, target_tensors) = items
.into_par_iter()
.fold(
|| {
(
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
)
},
|(mut hour_tensors, mut day_tensors, mut numerical_tensors, mut target_tensors),
item| {
hour_tensors.push(Tensor::from_data(item.hours, &self.device));
day_tensors.push(Tensor::from_data(item.days, &self.device));
numerical_tensors.push(Tensor::from_data(item.numerical, &self.device));
target_tensors.push(Tensor::from_data(item.target, &self.device));
(hour_tensors, day_tensors, numerical_tensors, target_tensors)
},
)
.reduce(
|| {
(
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
)
},
|(mut hour_tensors, mut day_tensors, mut numerical_tensors, mut target_tensors),
item| {
hour_tensors.extend(item.0);
day_tensors.extend(item.1);
numerical_tensors.extend(item.2);
target_tensors.extend(item.3);
(hour_tensors, day_tensors, numerical_tensors, target_tensors)
},
);
BarWindowBatch {
hour_tensor: Tensor::stack(hour_tensors, 0).to_device(&self.device),
day_tensor: Tensor::stack(day_tensors, 0).to_device(&self.device),
numerical_tensor: Tensor::stack(numerical_tensors, 0).to_device(&self.device),
target_tensor: Tensor::stack(target_tensors, 0).to_device(&self.device),
}
}
}

219
src/lib/qrust/ml/dataset.rs Normal file
View File

@@ -0,0 +1,219 @@
use crate::types::{
ta::{calculate_indicators, IndicatedBar, HEAD_SIZE, NUMERICAL_FIELD_COUNT},
Bar,
};
use burn::{
data::dataset::{transform::ComposedDataset, Dataset},
tensor::Data,
};
pub const WINDOW_SIZE: usize = 48;
#[derive(Clone, Debug)]
pub struct BarWindow {
pub hours: Data<i32, 1>,
pub days: Data<i32, 1>,
pub numerical: Data<f32, 2>,
pub target: Data<f32, 1>,
}
#[derive(Clone, Debug)]
struct SingleSymbolDataset {
hours: Vec<i32>,
days: Vec<i32>,
numerical: Vec<[f32; NUMERICAL_FIELD_COUNT]>,
targets: Vec<f32>,
}
impl SingleSymbolDataset {
#[allow(clippy::cast_possible_truncation)]
pub fn new(bars: Vec<IndicatedBar>) -> Self {
if !bars.is_empty() {
let symbol = &bars[0].symbol;
assert!(bars.iter().all(|bar| bar.symbol == *symbol));
}
let (hours, days, numerical, targets) = bars.windows(2).skip(HEAD_SIZE - 1).fold(
(
Vec::with_capacity(bars.len() - 1),
Vec::with_capacity(bars.len() - 1),
Vec::with_capacity(bars.len() - 1),
Vec::with_capacity(bars.len() - 1),
),
|(mut hours, mut days, mut numerical, mut targets), bar| {
hours.push(i32::from(bar[0].hour));
days.push(i32::from(bar[0].day));
numerical.push([
bar[0].open as f32,
(bar[0].open_pct as f32).min(f32::MAX),
bar[0].high as f32,
(bar[0].high_pct as f32).min(f32::MAX),
bar[0].low as f32,
(bar[0].low_pct as f32).min(f32::MAX),
bar[0].close as f32,
(bar[0].close_pct as f32).min(f32::MAX),
bar[0].volume as f32,
(bar[0].volume_pct as f32).min(f32::MAX),
bar[0].trades as f32,
(bar[0].trades_pct as f32).min(f32::MAX),
bar[0].sma_3 as f32,
bar[0].sma_6 as f32,
bar[0].sma_12 as f32,
bar[0].sma_24 as f32,
bar[0].sma_48 as f32,
bar[0].sma_72 as f32,
bar[0].ema_3 as f32,
bar[0].ema_6 as f32,
bar[0].ema_12 as f32,
bar[0].ema_24 as f32,
bar[0].ema_48 as f32,
bar[0].ema_72 as f32,
bar[0].macd as f32,
bar[0].macd_signal as f32,
bar[0].obv as f32,
bar[0].rsi as f32,
bar[0].bbands_lower as f32,
bar[0].bbands_mean as f32,
bar[0].bbands_upper as f32,
]);
targets.push(bar[1].close_pct as f32);
(hours, days, numerical, targets)
},
);
Self {
hours,
days,
numerical,
targets,
}
}
}
impl Dataset<BarWindow> for SingleSymbolDataset {
fn len(&self) -> usize {
self.targets.len() - WINDOW_SIZE + 1
}
#[allow(clippy::single_range_in_vec_init)]
fn get(&self, idx: usize) -> Option<BarWindow> {
if idx >= self.len() {
return None;
}
let hours: [i32; WINDOW_SIZE] = self.hours[idx..idx + WINDOW_SIZE].try_into().unwrap();
let days: [i32; WINDOW_SIZE] = self.days[idx..idx + WINDOW_SIZE].try_into().unwrap();
let numerical: [[f32; NUMERICAL_FIELD_COUNT]; WINDOW_SIZE] =
self.numerical[idx..idx + WINDOW_SIZE].try_into().unwrap();
let target: [f32; 1] = [self.targets[idx + WINDOW_SIZE - 1]];
Some(BarWindow {
hours: Data::from(hours),
days: Data::from(days),
numerical: Data::from(numerical),
target: Data::from(target),
})
}
}
pub struct MultipleSymbolDataset {
composed_dataset: ComposedDataset<SingleSymbolDataset>,
}
impl MultipleSymbolDataset {
pub fn new(bars: Vec<Bar>) -> Self {
let groups = calculate_indicators(bars)
.into_iter()
.map(SingleSymbolDataset::new)
.collect::<Vec<_>>();
Self {
composed_dataset: ComposedDataset::new(groups),
}
}
}
impl Dataset<BarWindow> for MultipleSymbolDataset {
fn len(&self) -> usize {
self.composed_dataset.len()
}
fn get(&self, idx: usize) -> Option<BarWindow> {
self.composed_dataset.get(idx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::{
distributions::{Distribution, Uniform},
Rng,
};
use time::OffsetDateTime;
fn generate_random_dataset(length: usize) -> MultipleSymbolDataset {
let mut rng = rand::thread_rng();
let uniform = Uniform::new(1.0, 100.0);
let mut bars = Vec::with_capacity(length);
for _ in 0..=(length + (HEAD_SIZE - 1) + (WINDOW_SIZE - 1)) {
bars.push(Bar {
symbol: "AAPL".to_string(),
time: OffsetDateTime::now_utc(),
open: uniform.sample(&mut rng),
high: uniform.sample(&mut rng),
low: uniform.sample(&mut rng),
close: uniform.sample(&mut rng),
volume: uniform.sample(&mut rng),
trades: rng.gen_range(1..100),
});
}
MultipleSymbolDataset::new(bars)
}
#[test]
fn test_single_symbol_dataset() {
let length = 100;
let dataset = generate_random_dataset(length);
assert_eq!(dataset.len(), length);
}
#[test]
fn test_single_symbol_dataset_window() {
let length = 100;
let dataset = generate_random_dataset(length);
let item = dataset.get(0).unwrap();
assert_eq!(
item.numerical.shape.dims,
[WINDOW_SIZE, NUMERICAL_FIELD_COUNT]
);
assert_eq!(item.target.shape.dims, [1]);
}
#[test]
fn test_single_symbol_dataset_last_window() {
let length = 100;
let dataset = generate_random_dataset(length);
let item = dataset.get(dataset.len() - 1).unwrap();
assert_eq!(
item.numerical.shape.dims,
[WINDOW_SIZE, NUMERICAL_FIELD_COUNT]
);
assert_eq!(item.target.shape.dims, [1]);
}
#[test]
fn test_single_symbol_dataset_out_of_bounds() {
let length = 100;
let dataset = generate_random_dataset(length);
assert!(dataset.get(dataset.len()).is_none());
}
}

21
src/lib/qrust/ml/mod.rs Normal file
View File

@@ -0,0 +1,21 @@
pub mod batcher;
pub mod dataset;
pub mod model;
pub use batcher::{BarWindowBatch, BarWindowBatcher};
pub use dataset::{BarWindow, MultipleSymbolDataset};
pub use model::{Model, ModelConfig};
use burn::{
backend::{
wgpu::{AutoGraphicsApi, WgpuDevice},
Autodiff, Wgpu,
},
tensor::backend::Backend,
};
pub type MyBackend = Wgpu<AutoGraphicsApi, f32, i32>;
pub type MyAutodiffBackend = Autodiff<MyBackend>;
pub type MyDevice = <Autodiff<Wgpu> as Backend>::Device;
pub const DEVICE: MyDevice = WgpuDevice::BestAvailable;

160
src/lib/qrust/ml/model.rs Normal file
View File

@@ -0,0 +1,160 @@
use super::BarWindowBatch;
use crate::types::ta::NUMERICAL_FIELD_COUNT;
use burn::{
config::Config,
module::Module,
nn::{
loss::{MseLoss, Reduction},
Dropout, DropoutConfig, Embedding, EmbeddingConfig, Linear, LinearConfig, Lstm, LstmConfig,
},
tensor::{
self,
backend::{AutodiffBackend, Backend},
Tensor,
},
train::{RegressionOutput, TrainOutput, TrainStep, ValidStep},
};
#[derive(Module, Debug)]
pub struct Model<B: Backend> {
hour_embedding: Embedding<B>,
day_embedding: Embedding<B>,
lstm_1: Lstm<B>,
dropout_1: Dropout,
lstm_2: Lstm<B>,
dropout_2: Dropout,
lstm_3: Lstm<B>,
dropout_3: Dropout,
lstm_4: Lstm<B>,
dropout_4: Dropout,
linear: Linear<B>,
}
#[derive(Config, Debug)]
pub struct ModelConfig {
#[config(default = "3")]
pub hour_features: usize,
#[config(default = "2")]
pub day_features: usize,
#[config(default = "{NUMERICAL_FIELD_COUNT}")]
pub numerical_features: usize,
#[config(default = "0.2")]
pub dropout: f64,
}
impl ModelConfig {
pub fn init<B: Backend>(&self, device: &B::Device) -> Model<B> {
let num_features = self.numerical_features + self.hour_features + self.day_features;
let lstm_1_hidden_size = 512;
let lstm_2_hidden_size = 256;
let lstm_3_hidden_size = 64;
let lstm_4_hidden_size = 32;
Model {
hour_embedding: EmbeddingConfig::new(24, self.hour_features).init(device),
day_embedding: EmbeddingConfig::new(7, self.day_features).init(device),
lstm_1: LstmConfig::new(num_features, lstm_1_hidden_size, true).init(device),
dropout_1: DropoutConfig::new(self.dropout).init(),
lstm_2: LstmConfig::new(lstm_1_hidden_size, lstm_2_hidden_size, true).init(device),
dropout_2: DropoutConfig::new(self.dropout).init(),
lstm_3: LstmConfig::new(lstm_2_hidden_size, lstm_3_hidden_size, true).init(device),
dropout_3: DropoutConfig::new(self.dropout).init(),
lstm_4: LstmConfig::new(lstm_3_hidden_size, lstm_4_hidden_size, true).init(device),
dropout_4: DropoutConfig::new(self.dropout).init(),
linear: LinearConfig::new(lstm_4_hidden_size, 1).init(device),
}
}
}
impl<B: Backend> Model<B> {
pub fn forward(
&self,
hour: Tensor<B, 2, tensor::Int>,
day: Tensor<B, 2, tensor::Int>,
numerical: Tensor<B, 3>,
) -> Tensor<B, 2> {
let hour = self.hour_embedding.forward(hour);
let day = self.day_embedding.forward(day);
let x = Tensor::cat(vec![hour, day, numerical], 2);
let (_, x) = self.lstm_1.forward(x, None);
let x = self.dropout_1.forward(x);
let (_, x) = self.lstm_2.forward(x, None);
let x = self.dropout_2.forward(x);
let (_, x) = self.lstm_3.forward(x, None);
let x = self.dropout_3.forward(x);
let (_, x) = self.lstm_4.forward(x, None);
let x = self.dropout_4.forward(x);
let [batch_size, window_size, features] = x.shape().dims;
let x = x.slice([0..batch_size, window_size - 1..window_size, 0..features]);
let x = x.squeeze(1);
self.linear.forward(x)
}
pub fn forward_regression(
&self,
hour: Tensor<B, 2, tensor::Int>,
day: Tensor<B, 2, tensor::Int>,
numerical: Tensor<B, 3>,
target: Tensor<B, 2>,
) -> RegressionOutput<B> {
let output = self.forward(hour, day, numerical);
let loss = MseLoss::new().forward(output.clone(), target.clone(), Reduction::Mean);
RegressionOutput::new(loss, output, target)
}
}
impl<B: AutodiffBackend> TrainStep<BarWindowBatch<B>, RegressionOutput<B>> for Model<B> {
fn step(&self, batch: BarWindowBatch<B>) -> TrainOutput<RegressionOutput<B>> {
let item = self.forward_regression(
batch.hour_tensor,
batch.day_tensor,
batch.numerical_tensor,
batch.target_tensor,
);
TrainOutput::new(self, item.loss.backward(), item)
}
}
impl<B: Backend> ValidStep<BarWindowBatch<B>, RegressionOutput<B>> for Model<B> {
fn step(&self, batch: BarWindowBatch<B>) -> RegressionOutput<B> {
self.forward_regression(
batch.hour_tensor,
batch.day_tensor,
batch.numerical_tensor,
batch.target_tensor,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use burn::{backend::Wgpu, tensor::Distribution};
#[test]
#[ignore]
fn test_model() {
let device = Default::default();
let distribution = Distribution::Normal(0.0, 1.0);
let config = ModelConfig::new().with_numerical_features(7);
let model = config.init::<Wgpu>(&device);
let hour = Tensor::ones([2, 10], &device);
let day = Tensor::ones([2, 10], &device);
let numerical = Tensor::random([2, 10, 7], distribution, &device);
let output = model.forward(hour, day, numerical);
assert_eq!(output.shape().dims, [2, 1]);
}
}

View File

@@ -1,4 +1,6 @@
pub mod alpaca;
pub mod database;
pub mod ml;
pub mod ta;
pub mod types;
pub mod utils;

149
src/lib/qrust/ta/bbands.rs Normal file
View File

@@ -0,0 +1,149 @@
use std::{borrow::Borrow, collections::VecDeque, iter::Scan, num::NonZeroUsize};
pub struct BbandsState {
window: VecDeque<f64>,
sum: f64,
squared_sum: f64,
multiplier: f64,
}
#[allow(clippy::type_complexity)]
pub trait Bbands<T>: Iterator + Sized {
fn bbands(
self,
period: NonZeroUsize,
multiplier: f64, // Typically 2.0
) -> Scan<Self, BbandsState, fn(&mut BbandsState, T) -> Option<(f64, f64, f64)>>;
}
impl<I, T> Bbands<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn bbands(
self,
period: NonZeroUsize,
multiplier: f64,
) -> Scan<Self, BbandsState, fn(&mut BbandsState, T) -> Option<(f64, f64, f64)>> {
self.scan(
BbandsState {
window: VecDeque::from(vec![0.0; period.get()]),
sum: 0.0,
squared_sum: 0.0,
multiplier,
},
|state: &mut BbandsState, value: T| {
let value = *value.borrow();
let front = state.window.pop_front().unwrap();
state.sum -= front;
state.squared_sum -= front.powi(2);
state.window.push_back(value);
state.sum += value;
state.squared_sum += value.powi(2);
let mean = state.sum / state.window.len() as f64;
let variance =
((state.squared_sum / state.window.len() as f64) - mean.powi(2)).max(0.0);
let standard_deviation = variance.sqrt();
let upper_band = mean + state.multiplier * standard_deviation;
let lower_band = mean - state.multiplier * standard_deviation;
Some((upper_band, mean, lower_band))
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bbands() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let bbands = data
.into_iter()
.bbands(NonZeroUsize::new(3).unwrap(), 2.0)
.map(|(upper, mean, lower)| {
(
(upper * 100.0).round() / 100.0,
(mean * 100.0).round() / 100.0,
(lower * 100.0).round() / 100.0,
)
})
.collect::<Vec<_>>();
assert_eq!(
bbands,
vec![
(1.28, 0.33, -0.61),
(2.63, 1.0, -0.63),
(3.63, 2.0, 0.37),
(4.63, 3.0, 1.37),
(5.63, 4.0, 2.37)
]
);
}
#[test]
fn test_bbands_empty() {
let data = Vec::<f64>::new();
let bbands = data
.into_iter()
.bbands(NonZeroUsize::new(3).unwrap(), 2.0)
.collect::<Vec<_>>();
assert_eq!(bbands, Vec::<(f64, f64, f64)>::new());
}
#[test]
fn test_bbands_1_period() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let bbands = data
.into_iter()
.bbands(NonZeroUsize::new(1).unwrap(), 2.0)
.collect::<Vec<_>>();
assert_eq!(
bbands,
vec![
(1.0, 1.0, 1.0),
(2.0, 2.0, 2.0),
(3.0, 3.0, 3.0),
(4.0, 4.0, 4.0),
(5.0, 5.0, 5.0)
]
);
}
#[test]
fn test_bbands_borrow() {
let data = [1.0, 2.0, 3.0, 4.0, 5.0];
let bbands = data
.iter()
.bbands(NonZeroUsize::new(3).unwrap(), 2.0)
.map(|(upper, mean, lower)| {
(
(upper * 100.0).round() / 100.0,
(mean * 100.0).round() / 100.0,
(lower * 100.0).round() / 100.0,
)
})
.collect::<Vec<_>>();
assert_eq!(
bbands,
vec![
(1.28, 0.33, -0.61),
(2.63, 1.0, -0.63),
(3.63, 2.0, 0.37),
(4.63, 3.0, 1.37),
(5.63, 4.0, 2.37)
]
);
}
}

59
src/lib/qrust/ta/deriv.rs Normal file
View File

@@ -0,0 +1,59 @@
use std::{borrow::Borrow, iter::Scan};
pub struct DerivState {
pub last: f64,
}
#[allow(clippy::type_complexity)]
pub trait Deriv<T>: Iterator + Sized {
fn deriv(self) -> Scan<Self, DerivState, fn(&mut DerivState, T) -> Option<f64>>;
}
impl<I, T> Deriv<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn deriv(self) -> Scan<Self, DerivState, fn(&mut DerivState, T) -> Option<f64>> {
self.scan(
DerivState { last: 0.0 },
|state: &mut DerivState, value: T| {
let value = *value.borrow();
let deriv = value - state.last;
state.last = value;
Some(deriv)
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_deriv() {
let data = vec![1.0, 3.0, 6.0, 3.0, 1.0];
let deriv = data.into_iter().deriv().collect::<Vec<_>>();
assert_eq!(deriv, vec![1.0, 2.0, 3.0, -3.0, -2.0]);
}
#[test]
fn test_deriv_empty() {
let data = Vec::<f64>::new();
let deriv = data.into_iter().deriv().collect::<Vec<_>>();
assert_eq!(deriv, Vec::<f64>::new());
}
#[test]
fn test_deriv_borrow() {
let data = [1.0, 3.0, 6.0, 3.0, 1.0];
let deriv = data.iter().deriv().collect::<Vec<_>>();
assert_eq!(deriv, vec![1.0, 2.0, 3.0, -3.0, -2.0]);
}
}

95
src/lib/qrust/ta/ema.rs Normal file
View File

@@ -0,0 +1,95 @@
use std::{
borrow::Borrow,
iter::{Peekable, Scan},
num::NonZeroUsize,
};
pub struct EmaState {
weight: f64,
ema: f64,
}
#[allow(clippy::type_complexity)]
pub trait Ema<T>: Iterator + Sized {
fn ema(
self,
period: NonZeroUsize,
) -> Scan<Peekable<Self>, EmaState, fn(&mut EmaState, T) -> Option<f64>>;
}
impl<I, T> Ema<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn ema(
self,
period: NonZeroUsize,
) -> Scan<Peekable<Self>, EmaState, fn(&mut EmaState, T) -> Option<f64>> {
let smoothing = 2.0;
let weight = smoothing / (1.0 + period.get() as f64);
let mut iter = self.peekable();
let first = iter.peek().map(|value| *value.borrow()).unwrap_or_default();
iter.scan(
EmaState { weight, ema: first },
|state: &mut EmaState, value: T| {
let value = *value.borrow();
state.ema = (value * state.weight) + (state.ema * (1.0 - state.weight));
Some(state.ema)
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ema() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let ema = data
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(ema, vec![1.0, 1.5, 2.25, 3.125, 4.0625]);
}
#[test]
fn test_ema_empty() {
let data = Vec::<f64>::new();
let ema = data
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(ema, Vec::<f64>::new());
}
#[test]
fn test_ema_1_period() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let ema = data
.into_iter()
.ema(NonZeroUsize::new(1).unwrap())
.collect::<Vec<_>>();
assert_eq!(ema, vec![1.0, 2.0, 3.0, 4.0, 5.0]);
}
#[test]
fn test_ema_borrow() {
let data = [1.0, 2.0, 3.0, 4.0, 5.0];
let ema = data
.iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(ema, vec![1.0, 1.5, 2.25, 3.125, 4.0625]);
}
}

216
src/lib/qrust/ta/macd.rs Normal file
View File

@@ -0,0 +1,216 @@
use std::{
borrow::Borrow,
iter::{Peekable, Scan},
num::NonZeroUsize,
};
pub struct MacdState {
short_weight: f64,
long_weight: f64,
signal_weight: f64,
short_ema: f64,
long_ema: f64,
signal_ema: f64,
}
#[allow(clippy::type_complexity)]
pub trait Macd<T>: Iterator + Sized {
fn macd(
self,
short_period: NonZeroUsize, // Typically 12
long_period: NonZeroUsize, // Typically 26
signal_period: NonZeroUsize, // Typically 9
) -> Scan<Peekable<Self>, MacdState, fn(&mut MacdState, T) -> Option<(f64, f64)>>;
}
impl<I, T> Macd<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn macd(
self,
short_period: NonZeroUsize,
long_period: NonZeroUsize,
signal_period: NonZeroUsize,
) -> Scan<Peekable<Self>, MacdState, fn(&mut MacdState, T) -> Option<(f64, f64)>> {
let smoothing = 2.0;
let short_weight = smoothing / (1.0 + short_period.get() as f64);
let long_weight = smoothing / (1.0 + long_period.get() as f64);
let signal_weight = smoothing / (1.0 + signal_period.get() as f64);
let mut iter = self.peekable();
let first = iter.peek().map(|value| *value.borrow()).unwrap_or_default();
iter.scan(
MacdState {
short_weight,
long_weight,
signal_weight,
short_ema: first,
long_ema: first,
signal_ema: 0.0,
},
|state: &mut MacdState, value: T| {
let value = *value.borrow();
state.short_ema =
(value * state.short_weight) + (state.short_ema * (1.0 - state.short_weight));
state.long_ema =
(value * state.long_weight) + (state.long_ema * (1.0 - state.long_weight));
let macd = state.short_ema - state.long_ema;
state.signal_ema =
(macd * state.signal_weight) + (state.signal_ema * (1.0 - state.signal_weight));
Some((macd, state.signal_ema))
},
)
}
}
#[cfg(test)]
mod tests {
use super::super::ema::Ema;
use super::*;
#[test]
fn test_macd() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
let short_ema = data
.clone()
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
let long_ema = data
.clone()
.into_iter()
.ema(NonZeroUsize::new(5).unwrap())
.collect::<Vec<_>>();
let macd = short_ema
.into_iter()
.zip(long_ema)
.map(|(short, long)| short - long)
.collect::<Vec<_>>();
let signal = macd
.clone()
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
let expected = macd.into_iter().zip(signal).collect::<Vec<_>>();
assert_eq!(
data.into_iter()
.macd(
NonZeroUsize::new(3).unwrap(),
NonZeroUsize::new(5).unwrap(),
NonZeroUsize::new(3).unwrap()
)
.collect::<Vec<_>>(),
expected
);
}
#[test]
fn test_macd_empty() {
let data = Vec::<f64>::new();
assert_eq!(
data.into_iter()
.macd(
NonZeroUsize::new(3).unwrap(),
NonZeroUsize::new(5).unwrap(),
NonZeroUsize::new(3).unwrap()
)
.collect::<Vec<_>>(),
vec![]
);
}
#[test]
fn test_macd_1_period() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let short_ema = data
.clone()
.into_iter()
.ema(NonZeroUsize::new(1).unwrap())
.collect::<Vec<_>>();
let long_ema = data
.clone()
.into_iter()
.ema(NonZeroUsize::new(1).unwrap())
.collect::<Vec<_>>();
let macd = short_ema
.into_iter()
.zip(long_ema)
.map(|(short, long)| short - long)
.collect::<Vec<_>>();
let signal = macd
.clone()
.into_iter()
.ema(NonZeroUsize::new(1).unwrap())
.collect::<Vec<_>>();
let expected = macd.into_iter().zip(signal).collect::<Vec<_>>();
assert_eq!(
data.into_iter()
.macd(
NonZeroUsize::new(1).unwrap(),
NonZeroUsize::new(1).unwrap(),
NonZeroUsize::new(1).unwrap()
)
.collect::<Vec<_>>(),
expected
);
}
#[test]
fn test_macd_borrow() {
let data = [1.0, 2.0, 3.0, 4.0, 5.0];
let short_ema = data
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
let long_ema = data
.into_iter()
.ema(NonZeroUsize::new(5).unwrap())
.collect::<Vec<_>>();
let macd = short_ema
.into_iter()
.zip(long_ema)
.map(|(short, long)| short - long)
.collect::<Vec<_>>();
let signal = macd
.clone()
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
let expected = macd.into_iter().zip(signal).collect::<Vec<_>>();
assert_eq!(
data.iter()
.macd(
NonZeroUsize::new(3).unwrap(),
NonZeroUsize::new(5).unwrap(),
NonZeroUsize::new(3).unwrap()
)
.collect::<Vec<_>>(),
expected
);
}
}

17
src/lib/qrust/ta/mod.rs Normal file
View File

@@ -0,0 +1,17 @@
pub mod bbands;
pub mod deriv;
pub mod ema;
pub mod macd;
pub mod obv;
pub mod pct;
pub mod rsi;
pub mod sma;
pub use bbands::Bbands;
pub use deriv::Deriv;
pub use ema::Ema;
pub use macd::Macd;
pub use obv::Obv;
pub use pct::Pct;
pub use rsi::Rsi;
pub use sma::Sma;

73
src/lib/qrust/ta/obv.rs Normal file
View File

@@ -0,0 +1,73 @@
use std::{
borrow::Borrow,
iter::{Peekable, Scan},
};
pub struct ObvState {
last: f64,
obv: f64,
}
#[allow(clippy::type_complexity)]
pub trait Obv<T>: Iterator + Sized {
fn obv(self) -> Scan<Peekable<Self>, ObvState, fn(&mut ObvState, T) -> Option<f64>>;
}
impl<I, T> Obv<T> for I
where
I: Iterator<Item = T>,
T: Borrow<(f64, f64)>,
{
fn obv(self) -> Scan<Peekable<Self>, ObvState, fn(&mut ObvState, T) -> Option<f64>> {
let mut iter = self.peekable();
let first = iter.peek().map(|value| *value.borrow()).unwrap_or_default();
iter.scan(
ObvState {
last: first.0,
obv: 0.0,
},
|state: &mut ObvState, value: T| {
let (close, volume) = *value.borrow();
if close > state.last {
state.obv += volume;
} else if close < state.last {
state.obv -= volume;
}
state.last = close;
Some(state.obv)
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_obv() {
let data = vec![(1.0, 1.0), (2.0, 2.0), (3.0, 3.0), (2.0, 4.0), (1.0, 5.0)];
let obv = data.into_iter().obv().collect::<Vec<_>>();
assert_eq!(obv, vec![0.0, 2.0, 5.0, 1.0, -4.0]);
}
#[test]
fn test_obv_empty() {
let data = Vec::<(f64, f64)>::new();
let obv = data.into_iter().obv().collect::<Vec<_>>();
assert_eq!(obv, Vec::<f64>::new());
}
#[test]
fn test_obv_borrow() {
let data = [(1.0, 1.0), (2.0, 2.0), (3.0, 3.0), (2.0, 4.0), (1.0, 5.0)];
let obv = data.iter().obv().collect::<Vec<_>>();
assert_eq!(obv, vec![0.0, 2.0, 5.0, 1.0, -4.0]);
}
}

64
src/lib/qrust/ta/pct.rs Normal file
View File

@@ -0,0 +1,64 @@
use std::{borrow::Borrow, iter::Scan};
pub struct PctState {
pub last: f64,
}
#[allow(clippy::type_complexity)]
pub trait Pct<T>: Iterator + Sized {
fn pct(self) -> Scan<Self, PctState, fn(&mut PctState, T) -> Option<f64>>;
}
impl<I, T> Pct<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn pct(self) -> Scan<Self, PctState, fn(&mut PctState, T) -> Option<f64>> {
self.scan(PctState { last: 0.0 }, |state: &mut PctState, value: T| {
let value = *value.borrow();
let pct = value / state.last - 1.0;
state.last = value;
Some(pct)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pct() {
let data = vec![1.0, 2.0, 4.0, 2.0, 1.0];
let pct = data.into_iter().pct().collect::<Vec<_>>();
assert_eq!(pct, vec![f64::INFINITY, 1.0, 1.0, -0.5, -0.5]);
}
#[test]
fn test_pct_empty() {
let data = Vec::<f64>::new();
let pct = data.into_iter().pct().collect::<Vec<_>>();
assert_eq!(pct, Vec::<f64>::new());
}
#[test]
fn test_pct_0() {
let data = vec![1.0, 0.0, 4.0, 2.0, 1.0];
let pct = data.into_iter().pct().collect::<Vec<_>>();
assert_eq!(pct, vec![f64::INFINITY, -1.0, f64::INFINITY, -0.5, -0.5]);
}
#[test]
fn test_pct_borrow() {
let data = [1.0, 2.0, 4.0, 2.0, 1.0];
let pct = data.iter().pct().collect::<Vec<_>>();
assert_eq!(pct, vec![f64::INFINITY, 1.0, 1.0, -0.5, -0.5]);
}
}

135
src/lib/qrust/ta/rsi.rs Normal file
View File

@@ -0,0 +1,135 @@
use std::{
borrow::Borrow,
collections::VecDeque,
iter::{Peekable, Scan},
num::NonZeroUsize,
};
pub struct RsiState {
last: f64,
window_gains: VecDeque<f64>,
window_losses: VecDeque<f64>,
sum_gains: f64,
sum_losses: f64,
}
#[allow(clippy::type_complexity)]
pub trait Rsi<T>: Iterator + Sized {
fn rsi(
self,
period: NonZeroUsize, // Typically 14
) -> Scan<Peekable<Self>, RsiState, fn(&mut RsiState, T) -> Option<f64>>;
}
impl<I, T> Rsi<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn rsi(
self,
period: NonZeroUsize,
) -> Scan<Peekable<Self>, RsiState, fn(&mut RsiState, T) -> Option<f64>> {
let mut iter = self.peekable();
let first = iter.peek().map(|value| *value.borrow()).unwrap_or_default();
iter.scan(
RsiState {
last: first,
window_gains: VecDeque::from(vec![0.0; period.get()]),
window_losses: VecDeque::from(vec![0.0; period.get()]),
sum_gains: 0.0,
sum_losses: 0.0,
},
|state, value| {
let value = *value.borrow();
state.sum_gains -= state.window_gains.pop_front().unwrap();
state.sum_losses -= state.window_losses.pop_front().unwrap();
let gain = (value - state.last).max(0.0);
let loss = (state.last - value).max(0.0);
state.last = value;
state.window_gains.push_back(gain);
state.window_losses.push_back(loss);
state.sum_gains += gain;
state.sum_losses += loss;
let avg_loss = state.sum_losses / state.window_losses.len() as f64;
if avg_loss == 0.0 {
return Some(100.0);
}
let avg_gain = state.sum_gains / state.window_gains.len() as f64;
let rs = avg_gain / avg_loss;
Some(100.0 - (100.0 / (1.0 + rs)))
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rsi() {
let data = vec![1.0, 4.0, 7.0, 4.0, 1.0];
let rsi = data
.into_iter()
.rsi(NonZeroUsize::new(3).unwrap())
.map(|v| (v * 100.0).round() / 100.0)
.collect::<Vec<_>>();
assert_eq!(rsi, vec![100.0, 100.0, 100.0, 66.67, 33.33]);
}
#[test]
fn test_rsi_empty() {
let data = Vec::<f64>::new();
let rsi = data
.into_iter()
.rsi(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(rsi, Vec::<f64>::new());
}
#[test]
fn test_rsi_no_loss() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let rsi = data
.into_iter()
.rsi(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(rsi, vec![100.0, 100.0, 100.0, 100.0, 100.0]);
}
#[test]
fn test_rsi_no_gain() {
let data = vec![5.0, 4.0, 3.0, 2.0, 1.0];
let rsi = data
.into_iter()
.rsi(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(rsi, vec![100.0, 0.0, 0.0, 0.0, 0.0]);
}
#[test]
fn test_rsi_borrow() {
let data = [1.0, 4.0, 7.0, 4.0, 1.0];
let rsi = data
.iter()
.rsi(NonZeroUsize::new(3).unwrap())
.map(|v| (v * 100.0).round() / 100.0)
.collect::<Vec<_>>();
assert_eq!(rsi, vec![100.0, 100.0, 100.0, 66.67, 33.33]);
}
}

88
src/lib/qrust/ta/sma.rs Normal file
View File

@@ -0,0 +1,88 @@
use std::{borrow::Borrow, collections::VecDeque, iter::Scan, num::NonZeroUsize};
pub struct SmaState {
window: VecDeque<f64>,
sum: f64,
}
#[allow(clippy::type_complexity)]
pub trait Sma<T>: Iterator + Sized {
fn sma(self, period: NonZeroUsize)
-> Scan<Self, SmaState, fn(&mut SmaState, T) -> Option<f64>>;
}
impl<I, T> Sma<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn sma(
self,
period: NonZeroUsize,
) -> Scan<Self, SmaState, fn(&mut SmaState, T) -> Option<f64>> {
self.scan(
SmaState {
window: VecDeque::from(vec![0.0; period.get()]),
sum: 0.0,
},
|state: &mut SmaState, value: T| {
let value = *value.borrow();
state.sum -= state.window.pop_front().unwrap();
state.window.push_back(value);
state.sum += value;
Some(state.sum / state.window.len() as f64)
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sma() {
let data = vec![3.0, 6.0, 9.0, 12.0, 15.0];
let sma = data
.into_iter()
.sma(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(sma, vec![1.0, 3.0, 6.0, 9.0, 12.0]);
}
#[test]
fn test_sma_empty() {
let data = Vec::<f64>::new();
let sma = data
.into_iter()
.sma(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(sma, Vec::<f64>::new());
}
#[test]
fn test_sma_1_period() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let sma = data
.into_iter()
.sma(NonZeroUsize::new(1).unwrap())
.collect::<Vec<_>>();
assert_eq!(sma, vec![1.0, 2.0, 3.0, 4.0, 5.0]);
}
#[test]
fn test_sma_borrow() {
let data = [3.0, 6.0, 9.0, 12.0, 15.0];
let sma = data
.iter()
.sma(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(sma, vec![1.0, 3.0, 6.0, 9.0, 12.0]);
}
}

View File

@@ -26,15 +26,14 @@ pub struct Bar {
impl From<(Bar, String)> for types::Bar {
fn from((bar, symbol): (Bar, String)) -> Self {
Self {
time: bar.time,
symbol,
time: bar.time,
open: bar.open,
high: bar.high,
low: bar.low,
close: bar.close,
volume: bar.volume,
trades: bar.trades,
vwap: bar.vwap,
}
}
}

View File

@@ -13,3 +13,14 @@ pub fn strip(content: &str) -> String {
let content = content.trim();
content.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_strip() {
let content = "<p> <b> Hello, </b> <i> World! </i> </p>";
assert_eq!(strip(content), "Hello, World!");
}
}

View File

@@ -223,3 +223,53 @@ impl Order {
orders
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_normalize() {
let order_template = Order {
id: Uuid::new_v4(),
client_order_id: Uuid::new_v4(),
created_at: OffsetDateTime::now_utc(),
updated_at: None,
submitted_at: OffsetDateTime::now_utc(),
filled_at: None,
expired_at: None,
cancel_requested_at: None,
canceled_at: None,
failed_at: None,
replaced_at: None,
replaced_by: None,
replaces: None,
asset_id: Uuid::new_v4(),
symbol: "AAPL".to_string(),
asset_class: super::super::asset::Class::UsEquity,
notional: None,
qty: None,
filled_qty: 0.0,
filled_avg_price: None,
order_class: Class::Simple,
order_type: Type::Market,
side: Side::Buy,
time_in_force: TimeInForce::Day,
limit_price: None,
stop_price: None,
status: Status::New,
extended_hours: false,
legs: None,
trail_percent: None,
trail_price: None,
hwm: None,
};
let mut order = order_template.clone();
order.legs = Some(vec![order_template.clone(), order_template.clone()]);
order.legs.as_mut().unwrap()[0].legs = Some(vec![order_template.clone()]);
let orders = order.normalize();
assert_eq!(orders.len(), 4);
}
}

View File

@@ -28,15 +28,14 @@ pub struct Message {
impl From<Message> for Bar {
fn from(bar: Message) -> Self {
Self {
time: bar.time,
symbol: bar.symbol,
time: bar.time,
open: bar.open,
high: bar.high,
low: bar.low,
close: bar.close,
volume: bar.volume,
trades: bar.trades,
vwap: bar.vwap,
}
}
}

View File

@@ -6,13 +6,13 @@ use serde::Deserialize;
pub enum Message {
#[serde(rename_all = "camelCase")]
Market {
trades: Vec<String>,
quotes: Vec<String>,
bars: Vec<String>,
updated_bars: Vec<String>,
daily_bars: Vec<String>,
statuses: Vec<String>,
trades: Option<Vec<String>>,
quotes: Option<Vec<String>>,
daily_bars: Option<Vec<String>>,
orderbooks: Option<Vec<String>>,
statuses: Option<Vec<String>>,
lulds: Option<Vec<String>>,
cancel_errors: Option<Vec<String>>,
},

View File

@@ -4,14 +4,13 @@ use time::OffsetDateTime;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Row)]
pub struct Bar {
pub symbol: String,
#[serde(with = "clickhouse::serde::time::datetime")]
pub time: OffsetDateTime,
pub symbol: String,
pub open: f64,
pub high: f64,
pub low: f64,
pub close: f64,
pub volume: f64,
pub trades: i64,
pub vwap: f64,
}

Some files were not shown because too many files have changed in this diff Show More