Add initial ML implementation

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2024-03-29 13:40:49 +00:00
parent f715881b07
commit d7e9350257
26 changed files with 4441 additions and 104 deletions

View File

@@ -22,7 +22,7 @@ build:
cache:
<<: *global_cache
script:
- cargo +nightly build
- cargo +nightly build --workspace
test:
image: registry.karaolidis.com/karaolidis/qrust/rust
@@ -30,7 +30,7 @@ test:
cache:
<<: *global_cache
script:
- cargo +nightly test
- cargo +nightly test --workspace
lint:
image: registry.karaolidis.com/karaolidis/qrust/rust
@@ -39,7 +39,7 @@ lint:
<<: *global_cache
script:
- cargo +nightly fmt --all -- --check
- cargo +nightly clippy --all-targets --all-features
- cargo +nightly clippy --workspace --all-targets --all-features
depcheck:
image: registry.karaolidis.com/karaolidis/qrust/rust
@@ -48,7 +48,7 @@ depcheck:
<<: *global_cache
script:
- cargo +nightly outdated
- cargo +nightly udeps --all-targets
- cargo +nightly udeps --workspace --all-targets
build-release:
image: registry.karaolidis.com/karaolidis/qrust/rust

2601
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -11,6 +11,10 @@ path = "src/lib/qrust/mod.rs"
name = "qrust"
path = "src/bin/qrust/mod.rs"
[[bin]]
name = "trainer"
path = "src/bin/trainer/mod.rs"
[profile.release]
panic = 'abort'
strip = true
@@ -70,7 +74,14 @@ lazy_static = "1.4.0"
nonempty = { version = "0.10.0", features = [
"serialize",
] }
rand = "0.8.5"
rayon = "1.9.0"
burn = { version = "0.12.1", features = [
"wgpu",
"cuda",
"tui",
"train",
] }
[dev-dependencies]
claims = "0.7.1"
serde_test = "1.0.176"

View File

@@ -9,9 +9,9 @@ use qrust::{
utils::{backoff, duration_until},
};
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::{join, sync::mpsc, time::sleep};
#[derive(PartialEq, Eq)]
pub enum Status {
Open,
Closed,
@@ -19,21 +19,16 @@ pub enum Status {
pub struct Message {
pub status: Status,
pub next_switch: OffsetDateTime,
}
impl From<types::alpaca::api::incoming::clock::Clock> for Message {
fn from(clock: types::alpaca::api::incoming::clock::Clock) -> Self {
if clock.is_open {
Self {
status: Status::Open,
next_switch: clock.next_close,
}
} else {
Self {
status: Status::Closed,
next_switch: clock.next_open,
}
Self {
status: if clock.is_open {
Status::Open
} else {
Status::Closed
},
}
}
}

View File

@@ -86,12 +86,13 @@ pub async fn run(
message,
));
}
Some(_) = clock_receiver.recv() => {
Some(message) = clock_receiver.recv() => {
spawn(handle_clock_message(
config.clone(),
bars_us_equity_backfill_sender.clone(),
bars_crypto_backfill_sender.clone(),
news_backfill_sender.clone(),
message,
));
}
else => panic!("Communication channel unexpectedly closed.")
@@ -125,7 +126,7 @@ fn init_thread(
let websocket_handler = match thread_type {
ThreadType::Bars(_) => websocket::bars::create_handler(config, thread_type),
ThreadType::News => websocket::news::create_handler(config),
ThreadType::News => websocket::news::create_handler(&config),
};
let (websocket_sender, websocket_receiver) = mpsc::channel(100);
@@ -322,13 +323,16 @@ async fn handle_clock_message(
bars_us_equity_backfill_sender: mpsc::Sender<backfill::Message>,
bars_crypto_backfill_sender: mpsc::Sender<backfill::Message>,
news_backfill_sender: mpsc::Sender<backfill::Message>,
message: clock::Message,
) {
database::cleanup_all(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter,
)
.await
.unwrap();
if message.status == clock::Status::Closed {
database::cleanup_all(
&config.clickhouse_client,
&config.clickhouse_concurrency_limiter,
)
.await
.unwrap();
}
let assets = database::assets::select(
&config.clickhouse_client,

View File

@@ -12,7 +12,6 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::{Mutex, RwLock};
pub struct Handler {
pub config: Arc<Config>,
pub inserter: Arc<Mutex<Inserter<News>>>,
}
@@ -102,7 +101,7 @@ impl super::Handler for Handler {
}
}
pub fn create_handler(config: Arc<Config>) -> Box<dyn super::Handler> {
pub fn create_handler(config: &Arc<Config>) -> Box<dyn super::Handler> {
let inserter = Arc::new(Mutex::new(
config
.clickhouse_client
@@ -112,5 +111,5 @@ pub fn create_handler(config: Arc<Config>) -> Box<dyn super::Handler> {
.with_max_entries((*CLICKHOUSE_BATCH_NEWS_SIZE).try_into().unwrap()),
));
Box::new(Handler { config, inserter })
Box::new(Handler { inserter })
}

133
src/bin/trainer/mod.rs Normal file
View File

@@ -0,0 +1,133 @@
#![warn(clippy::all, clippy::pedantic, clippy::nursery)]
use burn::{
config::Config,
data::{
dataloader::{DataLoaderBuilder, Dataset},
dataset::transform::{PartialDataset, ShuffledDataset},
},
module::Module,
optim::AdamConfig,
record::CompactRecorder,
tensor::backend::AutodiffBackend,
train::LearnerBuilder,
};
use dotenv::dotenv;
use log::info;
use qrust::{
database,
ml::{
BarWindow, BarWindowBatcher, ModelConfig, MultipleSymbolDataset, MyAutodiffBackend, DEVICE,
},
types::Bar,
};
use std::{env, fs, path::Path, sync::Arc};
use tokio::sync::Semaphore;
#[derive(Config)]
pub struct TrainingConfig {
pub model: ModelConfig,
pub optimizer: AdamConfig,
#[config(default = 100)]
pub epochs: usize,
#[config(default = 256)]
pub batch_size: usize,
#[config(default = 16)]
pub num_workers: usize,
#[config(default = 0)]
pub seed: u64,
#[config(default = 0.2)]
pub valid_pct: f64,
#[config(default = 1.0e-4)]
pub learning_rate: f64,
}
#[tokio::main]
async fn main() {
dotenv().ok();
let dir = Path::new(file!()).parent().unwrap();
let model_config = ModelConfig::new();
let optimizer = AdamConfig::new();
let training_config = TrainingConfig::new(model_config, optimizer);
let clickhouse_client = clickhouse::Client::default()
.with_url(env::var("CLICKHOUSE_URL").expect("CLICKHOUSE_URL must be set."))
.with_user(env::var("CLICKHOUSE_USER").expect("CLICKHOUSE_USER must be set."))
.with_password(env::var("CLICKHOUSE_PASSWORD").expect("CLICKHOUSE_PASSWORD must be set."))
.with_database(env::var("CLICKHOUSE_DB").expect("CLICKHOUSE_DB must be set."));
let clickhouse_concurrency_limiter = Arc::new(Semaphore::new(Semaphore::MAX_PERMITS));
let bars = database::ta::select(&clickhouse_client, &clickhouse_concurrency_limiter)
.await
.unwrap();
info!("Loaded {} bars.", bars.len());
train::<MyAutodiffBackend>(
bars,
&training_config,
dir.join("artifacts").to_str().unwrap(),
&DEVICE,
);
}
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_sign_loss)]
#[allow(clippy::cast_precision_loss)]
fn train<B: AutodiffBackend<FloatElem = f32, IntElem = i32>>(
bars: Vec<Bar>,
config: &TrainingConfig,
dir: &str,
device: &B::Device,
) {
B::seed(config.seed);
fs::create_dir_all(dir).unwrap();
let dataset = MultipleSymbolDataset::new(bars);
let dataset = ShuffledDataset::with_seed(dataset, config.seed);
let dataset = Arc::new(dataset);
let split = (dataset.len() as f64 * (1.0 - config.valid_pct)) as usize;
let train: PartialDataset<Arc<ShuffledDataset<MultipleSymbolDataset, BarWindow>>, BarWindow> =
PartialDataset::new(dataset.clone(), 0, split);
let batcher_train = BarWindowBatcher::<B> {
device: device.clone(),
};
let dataloader_train = DataLoaderBuilder::new(batcher_train)
.batch_size(config.batch_size)
.num_workers(config.num_workers)
.build(train);
let valid: PartialDataset<Arc<ShuffledDataset<MultipleSymbolDataset, BarWindow>>, BarWindow> =
PartialDataset::new(dataset.clone(), split, dataset.len());
let batcher_valid = BarWindowBatcher::<B::InnerBackend> {
device: device.clone(),
};
let dataloader_valid = DataLoaderBuilder::new(batcher_valid)
.batch_size(config.batch_size)
.num_workers(config.num_workers)
.build(valid);
let learner = LearnerBuilder::new(dir)
.with_file_checkpointer(CompactRecorder::new())
.devices(vec![device.clone()])
.num_epochs(config.epochs)
.build(
config.model.init::<B>(device),
config.optimizer.init(),
config.learning_rate,
);
let trained = learner.fit(dataloader_train, dataloader_valid);
trained.save_file(dir, &CompactRecorder::new()).unwrap();
}

View File

@@ -5,6 +5,7 @@ pub mod bars;
pub mod calendar;
pub mod news;
pub mod orders;
pub mod ta;
use clickhouse::{error::Error, Client};
use tokio::try_join;

View File

@@ -0,0 +1,30 @@
use crate::types::Bar;
use clickhouse::{error::Error, Client};
use std::sync::Arc;
use tokio::sync::Semaphore;
pub async fn select(
client: &Client,
concurrency_limiter: &Arc<Semaphore>,
) -> Result<Vec<Bar>, Error> {
let _ = concurrency_limiter.acquire().await.unwrap();
client
.query(
"
SELECT symbol,
toStartOfHour(bars.time) AS time,
any(bars.open) AS open,
max(bars.high) AS high,
min(bars.low) AS low,
anyLast(bars.close) AS close,
sum(bars.volume) AS volume,
sum(bars.trades) AS trades
FROM bars FINAL
GROUP BY ALL
ORDER BY symbol,
time
",
)
.fetch_all::<Bar>()
.await
}

View File

@@ -0,0 +1,74 @@
use super::BarWindow;
use burn::{
data::dataloader::batcher::Batcher,
tensor::{self, backend::Backend, Tensor},
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
pub struct BarWindowBatcher<B: Backend> {
pub device: B::Device,
}
#[derive(Clone, Debug)]
pub struct BarWindowBatch<B: Backend> {
pub hour_tensor: Tensor<B, 2, tensor::Int>,
pub day_tensor: Tensor<B, 2, tensor::Int>,
pub numerical_tensor: Tensor<B, 3>,
pub target_tensor: Tensor<B, 2>,
}
impl<B: Backend<FloatElem = f32, IntElem = i32>> Batcher<BarWindow, BarWindowBatch<B>>
for BarWindowBatcher<B>
{
fn batch(&self, items: Vec<BarWindow>) -> BarWindowBatch<B> {
let batch_size = items.len();
let (hour_tensors, day_tensors, numerical_tensors, target_tensors) = items
.into_par_iter()
.fold(
|| {
(
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
)
},
|(mut hour_tensors, mut day_tensors, mut numerical_tensors, mut target_tensors),
item| {
hour_tensors.push(Tensor::from_data(item.hours, &self.device));
day_tensors.push(Tensor::from_data(item.days, &self.device));
numerical_tensors.push(Tensor::from_data(item.numerical, &self.device));
target_tensors.push(Tensor::from_data(item.target, &self.device));
(hour_tensors, day_tensors, numerical_tensors, target_tensors)
},
)
.reduce(
|| {
(
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
Vec::with_capacity(batch_size),
)
},
|(mut hour_tensors, mut day_tensors, mut numerical_tensors, mut target_tensors),
item| {
hour_tensors.extend(item.0);
day_tensors.extend(item.1);
numerical_tensors.extend(item.2);
target_tensors.extend(item.3);
(hour_tensors, day_tensors, numerical_tensors, target_tensors)
},
);
BarWindowBatch {
hour_tensor: Tensor::stack(hour_tensors, 0).to_device(&self.device),
day_tensor: Tensor::stack(day_tensors, 0).to_device(&self.device),
numerical_tensor: Tensor::stack(numerical_tensors, 0).to_device(&self.device),
target_tensor: Tensor::stack(target_tensors, 0).to_device(&self.device),
}
}
}

245
src/lib/qrust/ml/dataset.rs Normal file
View File

@@ -0,0 +1,245 @@
use crate::types::{
ta::{calculate_indicators, HEAD_SIZE, NUMERICAL_FIELD_COUNT},
Bar,
};
use burn::{
data::dataset::{transform::ComposedDataset, Dataset},
tensor::Data,
};
use itertools::Itertools;
pub const WINDOW_SIZE: usize = 48;
#[derive(Clone, Debug)]
pub struct BarWindow {
pub hours: Data<i32, 1>,
pub days: Data<i32, 1>,
pub numerical: Data<f32, 2>,
pub target: Data<f32, 1>,
}
#[derive(Clone, Debug)]
struct SingleSymbolDataset {
hours: Vec<i32>,
days: Vec<i32>,
numerical: Vec<[f32; NUMERICAL_FIELD_COUNT]>,
targets: Vec<f32>,
}
impl SingleSymbolDataset {
#[allow(clippy::cast_possible_truncation)]
pub fn new(bars: Vec<Bar>) -> Self {
let bars = calculate_indicators(&bars);
let (hours, days, numerical, targets) = bars.windows(2).skip(HEAD_SIZE - 1).fold(
(
Vec::with_capacity(bars.len() - 1),
Vec::with_capacity(bars.len() - 1),
Vec::with_capacity(bars.len() - 1),
Vec::with_capacity(bars.len() - 1),
),
|(mut hours, mut days, mut numerical, mut targets), bar| {
hours.push(i32::from(bar[0].hour));
days.push(i32::from(bar[0].day));
numerical.push([
bar[0].open as f32,
(bar[0].open_pct as f32).min(f32::MAX),
bar[0].high as f32,
(bar[0].high_pct as f32).min(f32::MAX),
bar[0].low as f32,
(bar[0].low_pct as f32).min(f32::MAX),
bar[0].close as f32,
(bar[0].close_pct as f32).min(f32::MAX),
bar[0].volume as f32,
(bar[0].volume_pct as f32).min(f32::MAX),
bar[0].trades as f32,
(bar[0].trades_pct as f32).min(f32::MAX),
bar[0].close_deriv as f32,
(bar[0].close_deriv_pct as f32).min(f32::MAX),
bar[0].sma_3 as f32,
(bar[0].sma_3_pct as f32).min(f32::MAX),
bar[0].sma_6 as f32,
(bar[0].sma_6_pct as f32).min(f32::MAX),
bar[0].sma_12 as f32,
(bar[0].sma_12_pct as f32).min(f32::MAX),
bar[0].sma_24 as f32,
(bar[0].sma_24_pct as f32).min(f32::MAX),
bar[0].sma_48 as f32,
(bar[0].sma_48_pct as f32).min(f32::MAX),
bar[0].sma_72 as f32,
(bar[0].sma_72_pct as f32).min(f32::MAX),
bar[0].ema_3 as f32,
(bar[0].ema_3_pct as f32).min(f32::MAX),
bar[0].ema_6 as f32,
(bar[0].ema_6_pct as f32).min(f32::MAX),
bar[0].ema_12 as f32,
(bar[0].ema_12_pct as f32).min(f32::MAX),
bar[0].ema_24 as f32,
(bar[0].ema_24_pct as f32).min(f32::MAX),
bar[0].ema_48 as f32,
(bar[0].ema_48_pct as f32).min(f32::MAX),
bar[0].ema_72 as f32,
(bar[0].ema_72_pct as f32).min(f32::MAX),
bar[0].macd as f32,
(bar[0].macd_pct as f32).min(f32::MAX),
bar[0].macd_signal as f32,
(bar[0].macd_signal_pct as f32).min(f32::MAX),
bar[0].obv as f32,
(bar[0].obv_pct as f32).min(f32::MAX),
bar[0].rsi as f32,
(bar[0].rsi_pct as f32).min(f32::MAX),
bar[0].bbands_lower as f32,
(bar[0].bbands_lower_pct as f32).min(f32::MAX),
bar[0].bbands_mean as f32,
(bar[0].bbands_mean_pct as f32).min(f32::MAX),
bar[0].bbands_upper as f32,
(bar[0].bbands_upper_pct as f32).min(f32::MAX),
]);
targets.push(bar[1].close as f32);
(hours, days, numerical, targets)
},
);
Self {
hours,
days,
numerical,
targets,
}
}
}
impl Dataset<BarWindow> for SingleSymbolDataset {
fn len(&self) -> usize {
self.targets.len() - WINDOW_SIZE + 1
}
#[allow(clippy::single_range_in_vec_init)]
fn get(&self, idx: usize) -> Option<BarWindow> {
if idx >= self.len() {
return None;
}
let hours: [i32; WINDOW_SIZE] = self.hours[idx..idx + WINDOW_SIZE].try_into().unwrap();
let days: [i32; WINDOW_SIZE] = self.days[idx..idx + WINDOW_SIZE].try_into().unwrap();
let numerical: [[f32; NUMERICAL_FIELD_COUNT]; WINDOW_SIZE] =
self.numerical[idx..idx + WINDOW_SIZE].try_into().unwrap();
let target: [f32; 1] = [self.targets[idx + WINDOW_SIZE - 1]];
Some(BarWindow {
hours: Data::from(hours),
days: Data::from(days),
numerical: Data::from(numerical),
target: Data::from(target),
})
}
}
pub struct MultipleSymbolDataset {
composed_dataset: ComposedDataset<SingleSymbolDataset>,
}
impl MultipleSymbolDataset {
pub fn new(bars: Vec<Bar>) -> Self {
let groups = bars
.into_iter()
.group_by(|bar| bar.symbol.clone())
.into_iter()
.map(|(_, group)| group.collect::<Vec<_>>())
.map(SingleSymbolDataset::new)
.collect();
Self {
composed_dataset: ComposedDataset::new(groups),
}
}
}
impl Dataset<BarWindow> for MultipleSymbolDataset {
fn len(&self) -> usize {
self.composed_dataset.len()
}
fn get(&self, idx: usize) -> Option<BarWindow> {
self.composed_dataset.get(idx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::{
distributions::{Distribution, Uniform},
Rng,
};
use time::OffsetDateTime;
fn generate_random_dataset(length: usize) -> SingleSymbolDataset {
let mut rng = rand::thread_rng();
let uniform = Uniform::new(1.0, 100.0);
let mut bars = Vec::with_capacity(length);
for _ in 0..=(length + (HEAD_SIZE - 1) + (WINDOW_SIZE - 1)) {
bars.push(Bar {
symbol: "AAPL".to_string(),
time: OffsetDateTime::now_utc(),
open: uniform.sample(&mut rng),
high: uniform.sample(&mut rng),
low: uniform.sample(&mut rng),
close: uniform.sample(&mut rng),
volume: uniform.sample(&mut rng),
trades: rng.gen_range(1..100),
});
}
SingleSymbolDataset::new(bars)
}
#[test]
fn test_single_symbol_dataset() {
let length = 100;
let dataset = generate_random_dataset(length);
assert_eq!(dataset.len(), length);
}
#[test]
fn test_single_symbol_dataset_window() {
let length = 100;
let dataset = generate_random_dataset(length);
let item = dataset.get(0).unwrap();
assert_eq!(item.hours.shape.dims, [WINDOW_SIZE]);
assert_eq!(item.days.shape.dims, [WINDOW_SIZE]);
assert_eq!(
item.numerical.shape.dims,
[WINDOW_SIZE, NUMERICAL_FIELD_COUNT]
);
assert_eq!(item.target.shape.dims, [1]);
}
#[test]
fn test_single_symbol_dataset_last_window() {
let length = 100;
let dataset = generate_random_dataset(length);
let item = dataset.get(dataset.len() - 1).unwrap();
assert_eq!(item.hours.shape.dims, [WINDOW_SIZE]);
assert_eq!(item.days.shape.dims, [WINDOW_SIZE]);
assert_eq!(
item.numerical.shape.dims,
[WINDOW_SIZE, NUMERICAL_FIELD_COUNT]
);
assert_eq!(item.target.shape.dims, [1]);
}
#[test]
fn test_single_symbol_dataset_out_of_bounds() {
let length = 100;
let dataset = generate_random_dataset(length);
assert!(dataset.get(dataset.len()).is_none());
}
}

21
src/lib/qrust/ml/mod.rs Normal file
View File

@@ -0,0 +1,21 @@
pub mod batcher;
pub mod dataset;
pub mod model;
pub use batcher::{BarWindowBatch, BarWindowBatcher};
pub use dataset::{BarWindow, MultipleSymbolDataset};
pub use model::{Model, ModelConfig};
use burn::{
backend::{
wgpu::{AutoGraphicsApi, WgpuDevice},
Autodiff, Wgpu,
},
tensor::backend::Backend,
};
pub type MyBackend = Wgpu<AutoGraphicsApi, f32, i32>;
pub type MyAutodiffBackend = Autodiff<MyBackend>;
pub type MyDevice = <Autodiff<Wgpu> as Backend>::Device;
pub const DEVICE: MyDevice = WgpuDevice::BestAvailable;

160
src/lib/qrust/ml/model.rs Normal file
View File

@@ -0,0 +1,160 @@
use super::BarWindowBatch;
use crate::types::ta::NUMERICAL_FIELD_COUNT;
use burn::{
config::Config,
module::Module,
nn::{
loss::{MSELoss, Reduction},
Dropout, DropoutConfig, Embedding, EmbeddingConfig, Linear, LinearConfig, Lstm, LstmConfig,
},
tensor::{
self,
backend::{AutodiffBackend, Backend},
Tensor,
},
train::{RegressionOutput, TrainOutput, TrainStep, ValidStep},
};
#[derive(Module, Debug)]
pub struct Model<B: Backend> {
hour_embedding: Embedding<B>,
day_embedding: Embedding<B>,
lstm_1: Lstm<B>,
dropout_1: Dropout,
lstm_2: Lstm<B>,
dropout_2: Dropout,
lstm_3: Lstm<B>,
dropout_3: Dropout,
lstm_4: Lstm<B>,
dropout_4: Dropout,
linear: Linear<B>,
}
#[derive(Config, Debug)]
pub struct ModelConfig {
#[config(default = "3")]
pub hour_features: usize,
#[config(default = "2")]
pub day_features: usize,
#[config(default = "{NUMERICAL_FIELD_COUNT}")]
pub numerical_features: usize,
#[config(default = "0.2")]
pub dropout: f64,
}
impl ModelConfig {
pub fn init<B: Backend>(&self, device: &B::Device) -> Model<B> {
let num_features = self.numerical_features + self.hour_features + self.day_features;
let lstm_1_hidden_size = 512;
let lstm_2_hidden_size = 256;
let lstm_3_hidden_size = 64;
let lstm_4_hidden_size = 32;
Model {
hour_embedding: EmbeddingConfig::new(24, self.hour_features).init(device),
day_embedding: EmbeddingConfig::new(7, self.day_features).init(device),
lstm_1: LstmConfig::new(num_features, lstm_1_hidden_size, true).init(device),
dropout_1: DropoutConfig::new(self.dropout).init(),
lstm_2: LstmConfig::new(lstm_1_hidden_size, lstm_2_hidden_size, true).init(device),
dropout_2: DropoutConfig::new(self.dropout).init(),
lstm_3: LstmConfig::new(lstm_2_hidden_size, lstm_3_hidden_size, true).init(device),
dropout_3: DropoutConfig::new(self.dropout).init(),
lstm_4: LstmConfig::new(lstm_3_hidden_size, lstm_4_hidden_size, true).init(device),
dropout_4: DropoutConfig::new(self.dropout).init(),
linear: LinearConfig::new(lstm_4_hidden_size, 1).init(device),
}
}
}
impl<B: Backend> Model<B> {
pub fn forward(
&self,
hour: Tensor<B, 2, tensor::Int>,
day: Tensor<B, 2, tensor::Int>,
numerical: Tensor<B, 3>,
) -> Tensor<B, 2> {
let hour = self.hour_embedding.forward(hour);
let day = self.day_embedding.forward(day);
let x = Tensor::cat(vec![hour, day, numerical], 2);
let (x, _) = self.lstm_1.forward(x, None);
let x = self.dropout_1.forward(x);
let (x, _) = self.lstm_2.forward(x, None);
let x = self.dropout_2.forward(x);
let (x, _) = self.lstm_3.forward(x, None);
let x = self.dropout_3.forward(x);
let (x, _) = self.lstm_4.forward(x, None);
let x = self.dropout_4.forward(x);
let [batch_size, window_size, features] = x.shape().dims;
let x = x.slice([0..batch_size, window_size - 1..window_size, 0..features]);
let x = x.squeeze(1);
self.linear.forward(x)
}
pub fn forward_regression(
&self,
hour: Tensor<B, 2, tensor::Int>,
day: Tensor<B, 2, tensor::Int>,
numerical: Tensor<B, 3>,
target: Tensor<B, 2>,
) -> RegressionOutput<B> {
let output = self.forward(hour, day, numerical);
let loss = MSELoss::new().forward(output.clone(), target.clone(), Reduction::Mean);
RegressionOutput::new(loss, output, target)
}
}
impl<B: AutodiffBackend> TrainStep<BarWindowBatch<B>, RegressionOutput<B>> for Model<B> {
fn step(&self, batch: BarWindowBatch<B>) -> TrainOutput<RegressionOutput<B>> {
let item = self.forward_regression(
batch.hour_tensor,
batch.day_tensor,
batch.numerical_tensor,
batch.target_tensor,
);
TrainOutput::new(self, item.loss.backward(), item)
}
}
impl<B: Backend> ValidStep<BarWindowBatch<B>, RegressionOutput<B>> for Model<B> {
fn step(&self, batch: BarWindowBatch<B>) -> RegressionOutput<B> {
self.forward_regression(
batch.hour_tensor,
batch.day_tensor,
batch.numerical_tensor,
batch.target_tensor,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use burn::{backend::Wgpu, tensor::Distribution};
#[test]
#[ignore]
fn test_model() {
let device = Default::default();
let distribution = Distribution::Normal(0.0, 1.0);
let config = ModelConfig::new().with_numerical_features(7);
let model = config.init::<Wgpu>(&device);
let hour = Tensor::ones([2, 10], &device);
let day = Tensor::ones([2, 10], &device);
let numerical = Tensor::random([2, 10, 7], distribution, &device);
let output = model.forward(hour, day, numerical);
assert_eq!(output.shape().dims, [2, 1]);
}
}

View File

@@ -1,4 +1,6 @@
pub mod alpaca;
pub mod database;
pub mod ml;
pub mod ta;
pub mod types;
pub mod utils;

149
src/lib/qrust/ta/bbands.rs Normal file
View File

@@ -0,0 +1,149 @@
use std::{borrow::Borrow, collections::VecDeque, iter::Scan, num::NonZeroUsize};
pub struct BbandsState {
window: VecDeque<f64>,
sum: f64,
squared_sum: f64,
multiplier: f64,
}
#[allow(clippy::type_complexity)]
pub trait Bbands<T>: Iterator + Sized {
fn bbands(
self,
period: NonZeroUsize,
multiplier: f64, // Typically 2.0
) -> Scan<Self, BbandsState, fn(&mut BbandsState, T) -> Option<(f64, f64, f64)>>;
}
impl<I, T> Bbands<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn bbands(
self,
period: NonZeroUsize,
multiplier: f64,
) -> Scan<Self, BbandsState, fn(&mut BbandsState, T) -> Option<(f64, f64, f64)>> {
self.scan(
BbandsState {
window: VecDeque::from(vec![0.0; period.get()]),
sum: 0.0,
squared_sum: 0.0,
multiplier,
},
|state: &mut BbandsState, value: T| {
let value = *value.borrow();
let front = state.window.pop_front().unwrap();
state.sum -= front;
state.squared_sum -= front.powi(2);
state.window.push_back(value);
state.sum += value;
state.squared_sum += value.powi(2);
let mean = state.sum / state.window.len() as f64;
let variance =
((state.squared_sum / state.window.len() as f64) - mean.powi(2)).max(0.0);
let standard_deviation = variance.sqrt();
let upper_band = mean + state.multiplier * standard_deviation;
let lower_band = mean - state.multiplier * standard_deviation;
Some((upper_band, mean, lower_band))
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bbands() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let bbands = data
.into_iter()
.bbands(NonZeroUsize::new(3).unwrap(), 2.0)
.map(|(upper, mean, lower)| {
(
(upper * 100.0).round() / 100.0,
(mean * 100.0).round() / 100.0,
(lower * 100.0).round() / 100.0,
)
})
.collect::<Vec<_>>();
assert_eq!(
bbands,
vec![
(1.28, 0.33, -0.61),
(2.63, 1.0, -0.63),
(3.63, 2.0, 0.37),
(4.63, 3.0, 1.37),
(5.63, 4.0, 2.37)
]
);
}
#[test]
fn test_bbands_empty() {
let data = Vec::<f64>::new();
let bbands = data
.into_iter()
.bbands(NonZeroUsize::new(3).unwrap(), 2.0)
.collect::<Vec<_>>();
assert_eq!(bbands, Vec::<(f64, f64, f64)>::new());
}
#[test]
fn test_bbands_1_period() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let bbands = data
.into_iter()
.bbands(NonZeroUsize::new(1).unwrap(), 2.0)
.collect::<Vec<_>>();
assert_eq!(
bbands,
vec![
(1.0, 1.0, 1.0),
(2.0, 2.0, 2.0),
(3.0, 3.0, 3.0),
(4.0, 4.0, 4.0),
(5.0, 5.0, 5.0)
]
);
}
#[test]
fn test_bbands_borrow() {
let data = [1.0, 2.0, 3.0, 4.0, 5.0];
let bbands = data
.iter()
.bbands(NonZeroUsize::new(3).unwrap(), 2.0)
.map(|(upper, mean, lower)| {
(
(upper * 100.0).round() / 100.0,
(mean * 100.0).round() / 100.0,
(lower * 100.0).round() / 100.0,
)
})
.collect::<Vec<_>>();
assert_eq!(
bbands,
vec![
(1.28, 0.33, -0.61),
(2.63, 1.0, -0.63),
(3.63, 2.0, 0.37),
(4.63, 3.0, 1.37),
(5.63, 4.0, 2.37)
]
);
}
}

59
src/lib/qrust/ta/deriv.rs Normal file
View File

@@ -0,0 +1,59 @@
use std::{borrow::Borrow, iter::Scan};
pub struct DerivState {
pub last: f64,
}
#[allow(clippy::type_complexity)]
pub trait Deriv<T>: Iterator + Sized {
fn deriv(self) -> Scan<Self, DerivState, fn(&mut DerivState, T) -> Option<f64>>;
}
impl<I, T> Deriv<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn deriv(self) -> Scan<Self, DerivState, fn(&mut DerivState, T) -> Option<f64>> {
self.scan(
DerivState { last: 0.0 },
|state: &mut DerivState, value: T| {
let value = *value.borrow();
let deriv = value - state.last;
state.last = value;
Some(deriv)
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_deriv() {
let data = vec![1.0, 3.0, 6.0, 3.0, 1.0];
let deriv = data.into_iter().deriv().collect::<Vec<_>>();
assert_eq!(deriv, vec![1.0, 2.0, 3.0, -3.0, -2.0]);
}
#[test]
fn test_deriv_empty() {
let data = Vec::<f64>::new();
let deriv = data.into_iter().deriv().collect::<Vec<_>>();
assert_eq!(deriv, Vec::<f64>::new());
}
#[test]
fn test_deriv_borrow() {
let data = [1.0, 3.0, 6.0, 3.0, 1.0];
let deriv = data.iter().deriv().collect::<Vec<_>>();
assert_eq!(deriv, vec![1.0, 2.0, 3.0, -3.0, -2.0]);
}
}

95
src/lib/qrust/ta/ema.rs Normal file
View File

@@ -0,0 +1,95 @@
use std::{
borrow::Borrow,
iter::{Peekable, Scan},
num::NonZeroUsize,
};
pub struct EmaState {
weight: f64,
ema: f64,
}
#[allow(clippy::type_complexity)]
pub trait Ema<T>: Iterator + Sized {
fn ema(
self,
period: NonZeroUsize,
) -> Scan<Peekable<Self>, EmaState, fn(&mut EmaState, T) -> Option<f64>>;
}
impl<I, T> Ema<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn ema(
self,
period: NonZeroUsize,
) -> Scan<Peekable<Self>, EmaState, fn(&mut EmaState, T) -> Option<f64>> {
let smoothing = 2.0;
let weight = smoothing / (1.0 + period.get() as f64);
let mut iter = self.peekable();
let first = iter.peek().map(|value| *value.borrow()).unwrap_or_default();
iter.scan(
EmaState { weight, ema: first },
|state: &mut EmaState, value: T| {
let value = *value.borrow();
state.ema = (value * state.weight) + (state.ema * (1.0 - state.weight));
Some(state.ema)
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ema() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let ema = data
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(ema, vec![1.0, 1.5, 2.25, 3.125, 4.0625]);
}
#[test]
fn test_ema_empty() {
let data = Vec::<f64>::new();
let ema = data
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(ema, Vec::<f64>::new());
}
#[test]
fn test_ema_1_period() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let ema = data
.into_iter()
.ema(NonZeroUsize::new(1).unwrap())
.collect::<Vec<_>>();
assert_eq!(ema, vec![1.0, 2.0, 3.0, 4.0, 5.0]);
}
#[test]
fn test_ema_borrow() {
let data = [1.0, 2.0, 3.0, 4.0, 5.0];
let ema = data
.iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(ema, vec![1.0, 1.5, 2.25, 3.125, 4.0625]);
}
}

216
src/lib/qrust/ta/macd.rs Normal file
View File

@@ -0,0 +1,216 @@
use std::{
borrow::Borrow,
iter::{Peekable, Scan},
num::NonZeroUsize,
};
pub struct MacdState {
short_weight: f64,
long_weight: f64,
signal_weight: f64,
short_ema: f64,
long_ema: f64,
signal_ema: f64,
}
#[allow(clippy::type_complexity)]
pub trait Macd<T>: Iterator + Sized {
fn macd(
self,
short_period: NonZeroUsize, // Typically 12
long_period: NonZeroUsize, // Typically 26
signal_period: NonZeroUsize, // Typically 9
) -> Scan<Peekable<Self>, MacdState, fn(&mut MacdState, T) -> Option<(f64, f64)>>;
}
impl<I, T> Macd<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn macd(
self,
short_period: NonZeroUsize,
long_period: NonZeroUsize,
signal_period: NonZeroUsize,
) -> Scan<Peekable<Self>, MacdState, fn(&mut MacdState, T) -> Option<(f64, f64)>> {
let smoothing = 2.0;
let short_weight = smoothing / (1.0 + short_period.get() as f64);
let long_weight = smoothing / (1.0 + long_period.get() as f64);
let signal_weight = smoothing / (1.0 + signal_period.get() as f64);
let mut iter = self.peekable();
let first = iter.peek().map(|value| *value.borrow()).unwrap_or_default();
iter.scan(
MacdState {
short_weight,
long_weight,
signal_weight,
short_ema: first,
long_ema: first,
signal_ema: 0.0,
},
|state: &mut MacdState, value: T| {
let value = *value.borrow();
state.short_ema =
(value * state.short_weight) + (state.short_ema * (1.0 - state.short_weight));
state.long_ema =
(value * state.long_weight) + (state.long_ema * (1.0 - state.long_weight));
let macd = state.short_ema - state.long_ema;
state.signal_ema =
(macd * state.signal_weight) + (state.signal_ema * (1.0 - state.signal_weight));
Some((macd, state.signal_ema))
},
)
}
}
#[cfg(test)]
mod tests {
use super::super::ema::Ema;
use super::*;
#[test]
fn test_macd() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
let short_ema = data
.clone()
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
let long_ema = data
.clone()
.into_iter()
.ema(NonZeroUsize::new(5).unwrap())
.collect::<Vec<_>>();
let macd = short_ema
.into_iter()
.zip(long_ema)
.map(|(short, long)| short - long)
.collect::<Vec<_>>();
let signal = macd
.clone()
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
let expected = macd.into_iter().zip(signal).collect::<Vec<_>>();
assert_eq!(
data.into_iter()
.macd(
NonZeroUsize::new(3).unwrap(),
NonZeroUsize::new(5).unwrap(),
NonZeroUsize::new(3).unwrap()
)
.collect::<Vec<_>>(),
expected
);
}
#[test]
fn test_macd_empty() {
let data = Vec::<f64>::new();
assert_eq!(
data.into_iter()
.macd(
NonZeroUsize::new(3).unwrap(),
NonZeroUsize::new(5).unwrap(),
NonZeroUsize::new(3).unwrap()
)
.collect::<Vec<_>>(),
vec![]
);
}
#[test]
fn test_macd_1_period() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let short_ema = data
.clone()
.into_iter()
.ema(NonZeroUsize::new(1).unwrap())
.collect::<Vec<_>>();
let long_ema = data
.clone()
.into_iter()
.ema(NonZeroUsize::new(1).unwrap())
.collect::<Vec<_>>();
let macd = short_ema
.into_iter()
.zip(long_ema)
.map(|(short, long)| short - long)
.collect::<Vec<_>>();
let signal = macd
.clone()
.into_iter()
.ema(NonZeroUsize::new(1).unwrap())
.collect::<Vec<_>>();
let expected = macd.into_iter().zip(signal).collect::<Vec<_>>();
assert_eq!(
data.into_iter()
.macd(
NonZeroUsize::new(1).unwrap(),
NonZeroUsize::new(1).unwrap(),
NonZeroUsize::new(1).unwrap()
)
.collect::<Vec<_>>(),
expected
);
}
#[test]
fn test_macd_borrow() {
let data = [1.0, 2.0, 3.0, 4.0, 5.0];
let short_ema = data
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
let long_ema = data
.into_iter()
.ema(NonZeroUsize::new(5).unwrap())
.collect::<Vec<_>>();
let macd = short_ema
.into_iter()
.zip(long_ema)
.map(|(short, long)| short - long)
.collect::<Vec<_>>();
let signal = macd
.clone()
.into_iter()
.ema(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
let expected = macd.into_iter().zip(signal).collect::<Vec<_>>();
assert_eq!(
data.iter()
.macd(
NonZeroUsize::new(3).unwrap(),
NonZeroUsize::new(5).unwrap(),
NonZeroUsize::new(3).unwrap()
)
.collect::<Vec<_>>(),
expected
);
}
}

17
src/lib/qrust/ta/mod.rs Normal file
View File

@@ -0,0 +1,17 @@
pub mod bbands;
pub mod deriv;
pub mod ema;
pub mod macd;
pub mod obv;
pub mod pct;
pub mod rsi;
pub mod sma;
pub use bbands::Bbands;
pub use deriv::Deriv;
pub use ema::Ema;
pub use macd::Macd;
pub use obv::Obv;
pub use pct::Pct;
pub use rsi::Rsi;
pub use sma::Sma;

73
src/lib/qrust/ta/obv.rs Normal file
View File

@@ -0,0 +1,73 @@
use std::{
borrow::Borrow,
iter::{Peekable, Scan},
};
pub struct ObvState {
last: f64,
obv: f64,
}
#[allow(clippy::type_complexity)]
pub trait Obv<T>: Iterator + Sized {
fn obv(self) -> Scan<Peekable<Self>, ObvState, fn(&mut ObvState, T) -> Option<f64>>;
}
impl<I, T> Obv<T> for I
where
I: Iterator<Item = T>,
T: Borrow<(f64, f64)>,
{
fn obv(self) -> Scan<Peekable<Self>, ObvState, fn(&mut ObvState, T) -> Option<f64>> {
let mut iter = self.peekable();
let first = iter.peek().map(|value| *value.borrow()).unwrap_or_default();
iter.scan(
ObvState {
last: first.0,
obv: 0.0,
},
|state: &mut ObvState, value: T| {
let (close, volume) = *value.borrow();
if close > state.last {
state.obv += volume;
} else if close < state.last {
state.obv -= volume;
}
state.last = close;
Some(state.obv)
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_obv() {
let data = vec![(1.0, 1.0), (2.0, 2.0), (3.0, 3.0), (2.0, 4.0), (1.0, 5.0)];
let obv = data.into_iter().obv().collect::<Vec<_>>();
assert_eq!(obv, vec![0.0, 2.0, 5.0, 1.0, -4.0]);
}
#[test]
fn test_obv_empty() {
let data = Vec::<(f64, f64)>::new();
let obv = data.into_iter().obv().collect::<Vec<_>>();
assert_eq!(obv, Vec::<f64>::new());
}
#[test]
fn test_obv_borrow() {
let data = [(1.0, 1.0), (2.0, 2.0), (3.0, 3.0), (2.0, 4.0), (1.0, 5.0)];
let obv = data.iter().obv().collect::<Vec<_>>();
assert_eq!(obv, vec![0.0, 2.0, 5.0, 1.0, -4.0]);
}
}

64
src/lib/qrust/ta/pct.rs Normal file
View File

@@ -0,0 +1,64 @@
use std::{borrow::Borrow, iter::Scan};
pub struct PctState {
pub last: f64,
}
#[allow(clippy::type_complexity)]
pub trait Pct<T>: Iterator + Sized {
fn pct(self) -> Scan<Self, PctState, fn(&mut PctState, T) -> Option<f64>>;
}
impl<I, T> Pct<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn pct(self) -> Scan<Self, PctState, fn(&mut PctState, T) -> Option<f64>> {
self.scan(PctState { last: 0.0 }, |state: &mut PctState, value: T| {
let value = *value.borrow();
let pct = value / state.last - 1.0;
state.last = value;
Some(pct)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pct() {
let data = vec![1.0, 2.0, 4.0, 2.0, 1.0];
let pct = data.into_iter().pct().collect::<Vec<_>>();
assert_eq!(pct, vec![f64::INFINITY, 1.0, 1.0, -0.5, -0.5]);
}
#[test]
fn test_pct_empty() {
let data = Vec::<f64>::new();
let pct = data.into_iter().pct().collect::<Vec<_>>();
assert_eq!(pct, Vec::<f64>::new());
}
#[test]
fn test_pct_0() {
let data = vec![1.0, 0.0, 4.0, 2.0, 1.0];
let pct = data.into_iter().pct().collect::<Vec<_>>();
assert_eq!(pct, vec![f64::INFINITY, -1.0, f64::INFINITY, -0.5, -0.5]);
}
#[test]
fn test_pct_borrow() {
let data = [1.0, 2.0, 4.0, 2.0, 1.0];
let pct = data.iter().pct().collect::<Vec<_>>();
assert_eq!(pct, vec![f64::INFINITY, 1.0, 1.0, -0.5, -0.5]);
}
}

135
src/lib/qrust/ta/rsi.rs Normal file
View File

@@ -0,0 +1,135 @@
use std::{
borrow::Borrow,
collections::VecDeque,
iter::{Peekable, Scan},
num::NonZeroUsize,
};
pub struct RsiState {
last: f64,
window_gains: VecDeque<f64>,
window_losses: VecDeque<f64>,
sum_gains: f64,
sum_losses: f64,
}
#[allow(clippy::type_complexity)]
pub trait Rsi<T>: Iterator + Sized {
fn rsi(
self,
period: NonZeroUsize, // Typically 14
) -> Scan<Peekable<Self>, RsiState, fn(&mut RsiState, T) -> Option<f64>>;
}
impl<I, T> Rsi<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn rsi(
self,
period: NonZeroUsize,
) -> Scan<Peekable<Self>, RsiState, fn(&mut RsiState, T) -> Option<f64>> {
let mut iter = self.peekable();
let first = iter.peek().map(|value| *value.borrow()).unwrap_or_default();
iter.scan(
RsiState {
last: first,
window_gains: VecDeque::from(vec![0.0; period.get()]),
window_losses: VecDeque::from(vec![0.0; period.get()]),
sum_gains: 0.0,
sum_losses: 0.0,
},
|state, value| {
let value = *value.borrow();
state.sum_gains -= state.window_gains.pop_front().unwrap();
state.sum_losses -= state.window_losses.pop_front().unwrap();
let gain = (value - state.last).max(0.0);
let loss = (state.last - value).max(0.0);
state.last = value;
state.window_gains.push_back(gain);
state.window_losses.push_back(loss);
state.sum_gains += gain;
state.sum_losses += loss;
let avg_loss = state.sum_losses / state.window_losses.len() as f64;
if avg_loss == 0.0 {
return Some(100.0);
}
let avg_gain = state.sum_gains / state.window_gains.len() as f64;
let rs = avg_gain / avg_loss;
Some(100.0 - (100.0 / (1.0 + rs)))
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rsi() {
let data = vec![1.0, 4.0, 7.0, 4.0, 1.0];
let rsi = data
.into_iter()
.rsi(NonZeroUsize::new(3).unwrap())
.map(|v| (v * 100.0).round() / 100.0)
.collect::<Vec<_>>();
assert_eq!(rsi, vec![100.0, 100.0, 100.0, 66.67, 33.33]);
}
#[test]
fn test_rsi_empty() {
let data = Vec::<f64>::new();
let rsi = data
.into_iter()
.rsi(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(rsi, Vec::<f64>::new());
}
#[test]
fn test_rsi_no_loss() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let rsi = data
.into_iter()
.rsi(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(rsi, vec![100.0, 100.0, 100.0, 100.0, 100.0]);
}
#[test]
fn test_rsi_no_gain() {
let data = vec![5.0, 4.0, 3.0, 2.0, 1.0];
let rsi = data
.into_iter()
.rsi(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(rsi, vec![100.0, 0.0, 0.0, 0.0, 0.0]);
}
#[test]
fn test_rsi_borrow() {
let data = [1.0, 4.0, 7.0, 4.0, 1.0];
let rsi = data
.iter()
.rsi(NonZeroUsize::new(3).unwrap())
.map(|v| (v * 100.0).round() / 100.0)
.collect::<Vec<_>>();
assert_eq!(rsi, vec![100.0, 100.0, 100.0, 66.67, 33.33]);
}
}

88
src/lib/qrust/ta/sma.rs Normal file
View File

@@ -0,0 +1,88 @@
use std::{borrow::Borrow, collections::VecDeque, iter::Scan, num::NonZeroUsize};
pub struct SmaState {
window: VecDeque<f64>,
sum: f64,
}
#[allow(clippy::type_complexity)]
pub trait Sma<T>: Iterator + Sized {
fn sma(self, period: NonZeroUsize)
-> Scan<Self, SmaState, fn(&mut SmaState, T) -> Option<f64>>;
}
impl<I, T> Sma<T> for I
where
I: Iterator<Item = T>,
T: Borrow<f64>,
{
fn sma(
self,
period: NonZeroUsize,
) -> Scan<Self, SmaState, fn(&mut SmaState, T) -> Option<f64>> {
self.scan(
SmaState {
window: VecDeque::from(vec![0.0; period.get()]),
sum: 0.0,
},
|state: &mut SmaState, value: T| {
let value = *value.borrow();
state.sum -= state.window.pop_front().unwrap();
state.window.push_back(value);
state.sum += value;
Some(state.sum / state.window.len() as f64)
},
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sma() {
let data = vec![3.0, 6.0, 9.0, 12.0, 15.0];
let sma = data
.into_iter()
.sma(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(sma, vec![1.0, 3.0, 6.0, 9.0, 12.0]);
}
#[test]
fn test_sma_empty() {
let data = Vec::<f64>::new();
let sma = data
.into_iter()
.sma(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(sma, Vec::<f64>::new());
}
#[test]
fn test_sma_1_period() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let sma = data
.into_iter()
.sma(NonZeroUsize::new(1).unwrap())
.collect::<Vec<_>>();
assert_eq!(sma, vec![1.0, 2.0, 3.0, 4.0, 5.0]);
}
#[test]
fn test_sma_borrow() {
let data = [3.0, 6.0, 9.0, 12.0, 15.0];
let sma = data
.iter()
.sma(NonZeroUsize::new(3).unwrap())
.collect::<Vec<_>>();
assert_eq!(sma, vec![1.0, 3.0, 6.0, 9.0, 12.0]);
}
}

View File

@@ -5,6 +5,7 @@ pub mod bar;
pub mod calendar;
pub mod news;
pub mod order;
pub mod ta;
pub use asset::{Asset, Class, Exchange};
pub use backfill::Backfill;

315
src/lib/qrust/types/ta.rs Normal file
View File

@@ -0,0 +1,315 @@
use super::Bar;
use crate::ta::{Bbands, Deriv, Ema, Macd, Obv, Pct, Rsi, Sma};
use rayon::scope;
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
pub const HEAD_SIZE: usize = 72;
pub const FIELD_COUNT: usize = 54;
pub const NUMERICAL_FIELD_COUNT: usize = FIELD_COUNT - 2;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct IndicatedBar {
pub hour: u8,
pub day: u8,
pub open: f64,
pub open_pct: f64,
pub high: f64,
pub high_pct: f64,
pub low: f64,
pub low_pct: f64,
pub close: f64,
pub close_pct: f64,
pub volume: f64,
pub volume_pct: f64,
pub trades: f64,
pub trades_pct: f64,
pub close_deriv: f64,
pub close_deriv_pct: f64,
pub sma_3: f64,
pub sma_3_pct: f64,
pub sma_6: f64,
pub sma_6_pct: f64,
pub sma_12: f64,
pub sma_12_pct: f64,
pub sma_24: f64,
pub sma_24_pct: f64,
pub sma_48: f64,
pub sma_48_pct: f64,
pub sma_72: f64,
pub sma_72_pct: f64,
pub ema_3: f64,
pub ema_3_pct: f64,
pub ema_6: f64,
pub ema_6_pct: f64,
pub ema_12: f64,
pub ema_12_pct: f64,
pub ema_24: f64,
pub ema_24_pct: f64,
pub ema_48: f64,
pub ema_48_pct: f64,
pub ema_72: f64,
pub ema_72_pct: f64,
pub macd: f64,
pub macd_pct: f64,
pub macd_signal: f64,
pub macd_signal_pct: f64,
pub obv: f64,
pub obv_pct: f64,
pub rsi: f64,
pub rsi_pct: f64,
pub bbands_lower: f64,
pub bbands_lower_pct: f64,
pub bbands_mean: f64,
pub bbands_mean_pct: f64,
pub bbands_upper: f64,
pub bbands_upper_pct: f64,
}
#[allow(clippy::too_many_lines)]
pub fn calculate_indicators(bars: &[Bar]) -> Vec<IndicatedBar> {
let length = bars.len();
let (hour, day, open, high, low, close, volume, trades) = bars.iter().fold(
(
Vec::with_capacity(length),
Vec::with_capacity(length),
Vec::with_capacity(length),
Vec::with_capacity(length),
Vec::with_capacity(length),
Vec::with_capacity(length),
Vec::with_capacity(length),
Vec::with_capacity(length),
),
|(mut hour, mut day, mut open, mut high, mut low, mut close, mut volume, mut trades),
bar| {
hour.push(bar.time.hour());
day.push(bar.time.day());
open.push(bar.open);
high.push(bar.high);
low.push(bar.low);
close.push(bar.close);
volume.push(bar.volume);
trades.push(bar.trades as f64);
(hour, day, open, high, low, close, volume, trades)
},
);
let mut close_deriv = Vec::with_capacity(length);
let mut sma_3 = Vec::with_capacity(length);
let mut sma_6 = Vec::with_capacity(length);
let mut sma_12 = Vec::with_capacity(length);
let mut sma_24 = Vec::with_capacity(length);
let mut sma_48 = Vec::with_capacity(length);
let mut sma_72 = Vec::with_capacity(length);
let mut ema_3 = Vec::with_capacity(length);
let mut ema_6 = Vec::with_capacity(length);
let mut ema_12 = Vec::with_capacity(length);
let mut ema_24 = Vec::with_capacity(length);
let mut ema_48 = Vec::with_capacity(length);
let mut ema_72 = Vec::with_capacity(length);
let mut macd = Vec::with_capacity(length);
let mut macd_signal = Vec::with_capacity(length);
let mut obv = Vec::with_capacity(length);
let mut rsi = Vec::with_capacity(length);
let mut bbands_upper = Vec::with_capacity(length);
let mut bbands_mean = Vec::with_capacity(length);
let mut bbands_lower = Vec::with_capacity(length);
scope(|s| {
s.spawn(|_| close_deriv.extend(close.iter().deriv()));
s.spawn(|_| sma_3.extend(close.iter().sma(NonZeroUsize::new(3).unwrap())));
s.spawn(|_| sma_6.extend(close.iter().sma(NonZeroUsize::new(6).unwrap())));
s.spawn(|_| sma_12.extend(close.iter().sma(NonZeroUsize::new(12).unwrap())));
s.spawn(|_| sma_24.extend(close.iter().sma(NonZeroUsize::new(24).unwrap())));
s.spawn(|_| sma_48.extend(close.iter().sma(NonZeroUsize::new(48).unwrap())));
s.spawn(|_| sma_72.extend(close.iter().sma(NonZeroUsize::new(72).unwrap())));
s.spawn(|_| ema_3.extend(close.iter().ema(NonZeroUsize::new(3).unwrap())));
s.spawn(|_| ema_6.extend(close.iter().ema(NonZeroUsize::new(6).unwrap())));
s.spawn(|_| ema_12.extend(close.iter().ema(NonZeroUsize::new(12).unwrap())));
s.spawn(|_| ema_24.extend(close.iter().ema(NonZeroUsize::new(24).unwrap())));
s.spawn(|_| ema_48.extend(close.iter().ema(NonZeroUsize::new(48).unwrap())));
s.spawn(|_| ema_72.extend(close.iter().ema(NonZeroUsize::new(72).unwrap())));
s.spawn(|_| {
close
.iter()
.macd(
NonZeroUsize::new(12).unwrap(),
NonZeroUsize::new(26).unwrap(),
NonZeroUsize::new(9).unwrap(),
)
.for_each(|(macd_val, signal_val)| {
macd.push(macd_val);
macd_signal.push(signal_val);
});
});
s.spawn(|_| {
obv.extend(bars.iter().map(|bar| (bar.close, bar.volume)).obv());
});
s.spawn(|_: &_| {
rsi.extend(close.iter().rsi(NonZeroUsize::new(14).unwrap()));
});
s.spawn(|_| {
close
.iter()
.bbands(NonZeroUsize::new(20).unwrap(), 2.0)
.for_each(|(upper, mean, lower)| {
bbands_upper.push(upper);
bbands_mean.push(mean);
bbands_lower.push(lower);
});
})
});
let mut open_pct = Vec::with_capacity(length);
let mut high_pct = Vec::with_capacity(length);
let mut low_pct = Vec::with_capacity(length);
let mut close_pct = Vec::with_capacity(length);
let mut volume_pct = Vec::with_capacity(length);
let mut trades_pct = Vec::with_capacity(length);
let mut close_deriv_pct = Vec::with_capacity(length);
let mut sma_3_pct = Vec::with_capacity(length);
let mut sma_6_pct = Vec::with_capacity(length);
let mut sma_12_pct = Vec::with_capacity(length);
let mut sma_24_pct = Vec::with_capacity(length);
let mut sma_48_pct = Vec::with_capacity(length);
let mut sma_72_pct = Vec::with_capacity(length);
let mut ema_3_pct = Vec::with_capacity(length);
let mut ema_6_pct = Vec::with_capacity(length);
let mut ema_12_pct = Vec::with_capacity(length);
let mut ema_24_pct = Vec::with_capacity(length);
let mut ema_48_pct = Vec::with_capacity(length);
let mut ema_72_pct = Vec::with_capacity(length);
let mut macd_pct = Vec::with_capacity(length);
let mut macd_signal_pct = Vec::with_capacity(length);
let mut obv_pct = Vec::with_capacity(length);
let mut rsi_pct = Vec::with_capacity(length);
let mut bbands_upper_pct = Vec::with_capacity(length);
let mut bbands_mean_pct = Vec::with_capacity(length);
let mut bbands_lower_pct = Vec::with_capacity(length);
scope(|s| {
s.spawn(|_| open_pct.extend(open.iter().pct()));
s.spawn(|_| high_pct.extend(high.iter().pct()));
s.spawn(|_| low_pct.extend(low.iter().pct()));
s.spawn(|_| close_pct.extend(close.iter().pct()));
s.spawn(|_| volume_pct.extend(volume.iter().pct()));
s.spawn(|_| trades_pct.extend(trades.iter().pct()));
s.spawn(|_| close_deriv_pct.extend(close_deriv.iter().pct()));
s.spawn(|_| sma_3_pct.extend(sma_3.iter().pct()));
s.spawn(|_| sma_6_pct.extend(sma_6.iter().pct()));
s.spawn(|_| sma_12_pct.extend(sma_12.iter().pct()));
s.spawn(|_| sma_24_pct.extend(sma_24.iter().pct()));
s.spawn(|_| sma_48_pct.extend(sma_48.iter().pct()));
s.spawn(|_| sma_72_pct.extend(sma_72.iter().pct()));
s.spawn(|_| ema_3_pct.extend(ema_3.iter().pct()));
s.spawn(|_| ema_6_pct.extend(ema_6.iter().pct()));
s.spawn(|_| ema_12_pct.extend(ema_12.iter().pct()));
s.spawn(|_| ema_24_pct.extend(ema_24.iter().pct()));
s.spawn(|_| ema_48_pct.extend(ema_48.iter().pct()));
s.spawn(|_| ema_72_pct.extend(ema_72.iter().pct()));
s.spawn(|_| macd_pct.extend(macd.iter().pct()));
s.spawn(|_| macd_signal_pct.extend(macd_signal.iter().pct()));
s.spawn(|_| obv_pct.extend(obv.iter().pct()));
s.spawn(|_| rsi_pct.extend(rsi.iter().pct()));
s.spawn(|_| bbands_upper_pct.extend(bbands_upper.iter().pct()));
s.spawn(|_| bbands_mean_pct.extend(bbands_mean.iter().pct()));
s.spawn(|_| bbands_lower_pct.extend(bbands_lower.iter().pct()));
});
bars.iter()
.enumerate()
.map(|(i, _)| IndicatedBar {
hour: hour[i],
day: day[i],
open: open[i],
open_pct: open_pct[i],
high: high[i],
high_pct: high_pct[i],
low: low[i],
low_pct: low_pct[i],
close: close[i],
close_pct: close_pct[i],
volume: volume[i],
volume_pct: volume_pct[i],
trades: trades[i],
trades_pct: trades_pct[i],
close_deriv: close_deriv[i],
close_deriv_pct: close_deriv_pct[i],
sma_3: sma_3[i],
sma_3_pct: sma_3_pct[i],
sma_6: sma_6[i],
sma_6_pct: sma_6_pct[i],
sma_12: sma_12[i],
sma_12_pct: sma_12_pct[i],
sma_24: sma_24[i],
sma_24_pct: sma_24_pct[i],
sma_48: sma_48[i],
sma_48_pct: sma_48_pct[i],
sma_72: sma_72[i],
sma_72_pct: sma_72_pct[i],
ema_3: ema_3[i],
ema_3_pct: ema_3_pct[i],
ema_6: ema_6[i],
ema_6_pct: ema_6_pct[i],
ema_12: ema_12[i],
ema_12_pct: ema_12_pct[i],
ema_24: ema_24[i],
ema_24_pct: ema_24_pct[i],
ema_48: ema_48[i],
ema_48_pct: ema_48_pct[i],
ema_72: ema_72[i],
ema_72_pct: ema_72_pct[i],
macd: macd[i],
macd_pct: macd_pct[i],
macd_signal: macd_signal[i],
macd_signal_pct: macd_signal_pct[i],
obv: obv[i],
obv_pct: obv_pct[i],
rsi: rsi[i],
rsi_pct: rsi_pct[i],
bbands_lower: bbands_lower[i],
bbands_lower_pct: bbands_lower_pct[i],
bbands_mean: bbands_mean[i],
bbands_mean_pct: bbands_mean_pct[i],
bbands_upper: bbands_upper[i],
bbands_upper_pct: bbands_upper_pct[i],
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use rand::{
distributions::{Distribution, Uniform},
Rng,
};
use time::OffsetDateTime;
#[test]
fn test_calculate_indicators() {
let length = 1_000_000;
let mut rng = rand::thread_rng();
let uniform = Uniform::new(1.0, 100.0);
let mut bars = Vec::with_capacity(length);
for _ in 0..length {
bars.push(Bar {
symbol: "AAPL".to_string(),
time: OffsetDateTime::now_utc(),
open: uniform.sample(&mut rng),
high: uniform.sample(&mut rng),
low: uniform.sample(&mut rng),
close: uniform.sample(&mut rng),
volume: uniform.sample(&mut rng),
trades: rng.gen_range(1..100),
});
}
let indicated_bars = calculate_indicators(&bars);
assert_eq!(indicated_bars.len(), length);
}
}

View File

@@ -108,7 +108,6 @@ mod tests {
use super::*;
use serde::Deserialize;
use serde_test::{assert_de_tokens, Token};
use time::Time;
#[test]
fn test_add_slash() {