diff --git a/Cargo.lock b/Cargo.lock index 314bfe2..1008425 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,6 +195,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bitflags" version = "1.3.2" @@ -1649,6 +1655,7 @@ version = "0.1.0" dependencies = [ "axum", "backoff", + "bimap", "clickhouse", "dotenv", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index d75b292..e439373 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,3 +52,4 @@ backoff = { version = "0.4.0", features = [ regex = "1.10.3" html-escape = "0.2.13" rust-bert = "0.22.0" +bimap = "0.6.3" diff --git a/src/config.rs b/src/config.rs index 8a03e63..a19c4ac 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,12 +11,8 @@ use rust_bert::{ }, resources::LocalResource, }; -use std::{ - env, - num::NonZeroU32, - path::PathBuf, - sync::{Arc, Mutex}, -}; +use std::{env, num::NonZeroU32, path::PathBuf, sync::Arc}; +use tokio::sync::Mutex; pub const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets"; pub const ALPACA_CLOCK_API_URL: &str = "https://api.alpaca.markets/v2/clock"; diff --git a/src/database/assets.rs b/src/database/assets.rs index e5b908e..38d052c 100644 --- a/src/database/assets.rs +++ b/src/database/assets.rs @@ -15,8 +15,7 @@ where T: AsRef + Serialize + Send + Sync, { clickhouse_client - .query("SELECT ?fields FROM assets FINAL WHERE symbol = ? OR abbreviation = ?") - .bind(symbol) + .query("SELECT ?fields FROM assets FINAL WHERE symbol = ?") .bind(symbol) .fetch_optional::() .await diff --git a/src/database/backfills.rs b/src/database/backfills.rs index 2ef38fe..0639cb4 100644 --- a/src/database/backfills.rs +++ b/src/database/backfills.rs @@ -1,4 +1,4 @@ -use crate::{database::assets, threads::data::ThreadType, types::Backfill}; +use crate::{threads::data::ThreadType, types::Backfill}; use clickhouse::Client; use serde::Serialize; use tokio::join; @@ -58,23 +58,9 @@ pub async fn delete_where_symbols( } pub async fn cleanup(clickhouse_client: &Client) { - let assets = assets::select(clickhouse_client).await; - - let bars_symbols = assets - .clone() - .into_iter() - .map(|asset| asset.symbol) - .collect::>(); - - let news_symbols = assets - .into_iter() - .map(|asset| asset.abbreviation) - .collect::>(); - let delete_bars_future = async { clickhouse_client - .query("DELETE FROM backfills_bars WHERE symbol NOT IN ?") - .bind(bars_symbols) + .query("DELETE FROM backfills_bars WHERE symbol NOT IN (SELECT symbol FROM assets)") .execute() .await .unwrap(); @@ -82,8 +68,7 @@ pub async fn cleanup(clickhouse_client: &Client) { let delete_news_future = async { clickhouse_client - .query("DELETE FROM backfills_news WHERE symbol NOT IN ?") - .bind(news_symbols) + .query("DELETE FROM backfills_news WHERE symbol NOT IN (SELECT symbol FROM assets)") .execute() .await .unwrap(); diff --git a/src/database/bars.rs b/src/database/bars.rs index 7f739da..137f58e 100644 --- a/src/database/bars.rs +++ b/src/database/bars.rs @@ -1,4 +1,3 @@ -use super::assets; use crate::types::Bar; use clickhouse::Client; use serde::Serialize; @@ -34,16 +33,8 @@ where } pub async fn cleanup(clickhouse_client: &Client) { - let assets = assets::select(clickhouse_client).await; - - let symbols = assets - .into_iter() - .map(|asset| asset.symbol) - .collect::>(); - clickhouse_client - .query("DELETE FROM bars WHERE symbol NOT IN ?") - .bind(symbols) + .query("DELETE FROM bars WHERE symbol NOT IN (SELECT symbol FROM assets)") .execute() .await .unwrap(); diff --git a/src/database/news.rs b/src/database/news.rs index 6b0903b..c89164b 100644 --- a/src/database/news.rs +++ b/src/database/news.rs @@ -1,4 +1,3 @@ -use super::assets; use crate::types::News; use clickhouse::Client; use serde::Serialize; @@ -25,31 +24,19 @@ pub async fn delete_where_symbols(clickhouse_client: &Client, symbols: &[T]) where T: AsRef + Serialize + Send + Sync, { - let remaining_symbols = assets::select(clickhouse_client) - .await - .into_iter() - .map(|asset| asset.abbreviation) - .collect::>(); - clickhouse_client - .query("DELETE FROM news WHERE hasAny(symbols, ?) AND NOT hasAny(symbols, ?)") + .query("DELETE FROM news WHERE hasAny(symbols, ?) AND NOT hasAny(symbols, (SELECT groupArray(symbol) FROM assets))") .bind(symbols) - .bind(remaining_symbols) .execute() .await .unwrap(); } pub async fn cleanup(clickhouse_client: &Client) { - let remaining_symbols = assets::select(clickhouse_client) - .await - .into_iter() - .map(|asset| asset.abbreviation) - .collect::>(); - clickhouse_client - .query("DELETE FROM news WHERE NOT hasAny(symbols, ?)") - .bind(remaining_symbols) + .query( + "DELETE FROM news WHERE NOT hasAny(symbols, (SELECT groupArray(symbol) FROM assets))", + ) .execute() .await .unwrap(); diff --git a/src/threads/data/asset_status.rs b/src/threads/data/asset_status.rs index be79a4a..b93360a 100644 --- a/src/threads/data/asset_status.rs +++ b/src/threads/data/asset_status.rs @@ -78,20 +78,20 @@ async fn handle_asset_status_message( .assets .clone() .into_iter() - .map(|asset| match thread_type { - ThreadType::Bars(_) => asset.symbol, - ThreadType::News => asset.abbreviation, - }) + .map(|asset| asset.symbol) .collect::>(); match message.action { Action::Add => { let mut guard = guard.write().await; - guard.symbols.extend(symbols.clone()); - guard - .pending_subscriptions - .extend(symbols.clone().into_iter().zip(message.assets.clone())); + guard.assets.extend( + message + .assets + .iter() + .map(|asset| (asset.clone(), asset.symbol.clone())), + ); + guard.pending_subscriptions.extend(message.assets.clone()); info!("{:?} - Added {:?}.", thread_type, symbols); @@ -108,7 +108,7 @@ async fn handle_asset_status_message( .await .send(tungstenite::Message::Text( to_string(&websocket::outgoing::Message::Subscribe( - websocket_market_message_factory(thread_type, symbols), + create_websocket_market_message(thread_type, symbols), )) .unwrap(), )) @@ -121,10 +121,10 @@ async fn handle_asset_status_message( Action::Remove => { let mut guard = guard.write().await; - guard.symbols.retain(|symbol| !symbols.contains(symbol)); guard - .pending_unsubscriptions - .extend(symbols.clone().into_iter().zip(message.assets.clone())); + .assets + .retain(|asset, _| !message.assets.contains(asset)); + guard.pending_unsubscriptions.extend(message.assets); info!("{:?} - Removed {:?}.", thread_type, symbols); @@ -140,7 +140,7 @@ async fn handle_asset_status_message( .await .send(tungstenite::Message::Text( to_string(&websocket::outgoing::Message::Unsubscribe( - websocket_market_message_factory(thread_type, symbols), + create_websocket_market_message(thread_type, symbols), )) .unwrap(), )) @@ -155,7 +155,7 @@ async fn handle_asset_status_message( message.response.send(()).unwrap(); } -fn websocket_market_message_factory( +fn create_websocket_market_message( thread_type: ThreadType, symbols: Vec, ) -> websocket::outgoing::subscribe::Message { diff --git a/src/threads/data/backfill.rs b/src/threads/data/backfill.rs index a6cda38..d02ceac 100644 --- a/src/threads/data/backfill.rs +++ b/src/threads/data/backfill.rs @@ -10,13 +10,14 @@ use crate::{ utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}, }; use backoff::{future::retry, ExponentialBackoff}; +use futures_util::future::join_all; use log::{error, info, warn}; use std::{collections::HashMap, sync::Arc}; use time::OffsetDateTime; use tokio::{ join, spawn, sync::{mpsc, oneshot, Mutex, RwLock}, - task::{spawn_blocking, JoinHandle}, + task::JoinHandle, time::sleep, }; @@ -87,16 +88,18 @@ async fn handle_backfill_message( let mut backfill_jobs = backfill_jobs.lock().await; let symbols = match message.assets { - Subset::All => guard.symbols.clone().into_iter().collect::>(), + Subset::All => guard + .assets + .clone() + .into_iter() + .map(|(_, symbol)| symbol) + .collect(), Subset::Some(assets) => assets .into_iter() - .map(|asset| match thread_type { - ThreadType::Bars(_) => asset.symbol, - ThreadType::News => asset.abbreviation, - }) + .map(|asset| asset.symbol) .filter(|symbol| match message.action { - Action::Backfill => guard.symbols.contains(symbol), - Action::Purge => !guard.symbols.contains(symbol), + Action::Backfill => guard.assets.contains_right(symbol), + Action::Purge => !guard.assets.contains_right(symbol), }) .collect::>(), }; @@ -365,33 +368,30 @@ async fn execute_backfill_news( return; } - let app_config_clone = app_config.clone(); let inputs = news .iter() .map(|news| format!("{}\n\n{}", news.headline, news.content)) .collect::>(); - let predictions: Vec = spawn_blocking(move || { - inputs - .chunks(app_config_clone.max_bert_inputs) - .flat_map(|inputs| { - app_config_clone - .sequence_classifier - .lock() - .unwrap() - .predict(inputs.iter().map(String::as_str).collect::>()) - .into_iter() - .map(|label| Prediction::try_from(label).unwrap()) - .collect::>() - }) - .collect() - }) + let predictions = join_all(inputs.chunks(app_config.max_bert_inputs).map(|inputs| { + let sequence_classifier = app_config.sequence_classifier.clone(); + async move { + sequence_classifier + .lock() + .await + .predict(inputs.iter().map(String::as_str).collect::>()) + .into_iter() + .map(|label| Prediction::try_from(label).unwrap()) + .collect::>() + } + })) .await - .unwrap(); + .into_iter() + .flatten(); let news = news .into_iter() - .zip(predictions.into_iter()) + .zip(predictions) .map(|(news, prediction)| News { sentiment: prediction.sentiment, confidence: prediction.confidence, diff --git a/src/threads/data/mod.rs b/src/threads/data/mod.rs index 8cd8e6d..cf37c1f 100644 --- a/src/threads/data/mod.rs +++ b/src/threads/data/mod.rs @@ -2,31 +2,22 @@ pub mod asset_status; pub mod backfill; pub mod websocket; -use super::clock; +use super::{clock, guard::Guard}; use crate::{ config::{ Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_NEWS_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL, }, - types::{Asset, Class, Subset}, + types::{Class, Subset}, utils::authenticate, }; use futures_util::StreamExt; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::sync::Arc; use tokio::{ join, select, spawn, sync::{mpsc, Mutex, RwLock}, }; use tokio_tungstenite::connect_async; -pub struct Guard { - pub symbols: HashSet, - pub pending_subscriptions: HashMap, - pub pending_unsubscriptions: HashMap, -} - #[derive(Clone, Copy, Debug)] pub enum ThreadType { Bars(Class), @@ -76,11 +67,7 @@ async fn init_thread( mpsc::Sender, mpsc::Sender, ) { - let guard = Arc::new(RwLock::new(Guard { - symbols: HashSet::new(), - pending_subscriptions: HashMap::new(), - pending_unsubscriptions: HashMap::new(), - })); + let guard = Arc::new(RwLock::new(Guard::new())); let websocket_url = match thread_type { ThreadType::Bars(Class::UsEquity) => format!( diff --git a/src/threads/data/websocket.rs b/src/threads/data/websocket.rs index 4a72c0e..806ce64 100644 --- a/src/threads/data/websocket.rs +++ b/src/threads/data/websocket.rs @@ -3,6 +3,7 @@ use crate::{ config::Config, database, types::{alpaca::websocket, news::Prediction, Bar, News, Subset}, + utils::add_slash_to_pair, }; use futures_util::{ stream::{SplitSink, SplitStream}, @@ -10,16 +11,12 @@ use futures_util::{ }; use log::{error, info, warn}; use serde_json::from_str; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::{collections::HashSet, sync::Arc}; use tokio::{ join, net::TcpStream, spawn, sync::{mpsc, Mutex, RwLock}, - task::spawn_blocking, }; use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; @@ -106,20 +103,24 @@ async fn handle_parsed_websocket_message( websocket::incoming::Message::Subscription(message) => { let symbols = match message { websocket::incoming::subscription::Message::Market(message) => message.bars, - websocket::incoming::subscription::Message::News(message) => message.news, + websocket::incoming::subscription::Message::News(message) => message + .news + .into_iter() + .map(|symbol| add_slash_to_pair(&symbol)) + .collect(), }; let mut guard = guard.write().await; let newly_subscribed = guard .pending_subscriptions - .extract_if(|symbol, _| symbols.contains(symbol)) - .collect::>(); + .extract_if(|asset| symbols.contains(&asset.symbol)) + .collect::>(); let newly_unsubscribed = guard .pending_unsubscriptions - .extract_if(|symbol, _| !symbols.contains(symbol)) - .collect::>(); + .extract_if(|asset| !symbols.contains(&asset.symbol)) + .collect::>(); drop(guard); @@ -128,12 +129,15 @@ async fn handle_parsed_websocket_message( info!( "{:?} - Subscribed to {:?}.", thread_type, - newly_subscribed.keys().collect::>() + newly_subscribed + .iter() + .map(|asset| asset.symbol.clone()) + .collect::>() ); let (backfill_message, backfill_receiver) = backfill::Message::new( backfill::Action::Backfill, - Subset::Some(newly_subscribed.into_values().collect::>()), + Subset::Some(newly_subscribed.into_iter().collect::>()), ); backfill_sender.send(backfill_message).await.unwrap(); @@ -146,12 +150,15 @@ async fn handle_parsed_websocket_message( info!( "{:?} - Unsubscribed from {:?}.", thread_type, - newly_unsubscribed.keys().collect::>() + newly_unsubscribed + .iter() + .map(|asset| asset.symbol.clone()) + .collect::>() ); let (purge_message, purge_receiver) = backfill::Message::new( backfill::Action::Purge, - Subset::Some(newly_unsubscribed.into_values().collect::>()), + Subset::Some(newly_unsubscribed.into_iter().collect::>()), ); backfill_sender.send(purge_message).await.unwrap(); @@ -166,7 +173,7 @@ async fn handle_parsed_websocket_message( let bar = Bar::from(message); let guard = guard.read().await; - if guard.symbols.get(&bar.symbol).is_none() { + if !guard.assets.contains_right(&bar.symbol) { warn!( "{:?} - Race condition: received bar for unsubscribed symbol: {:?}.", thread_type, bar.symbol @@ -182,10 +189,13 @@ async fn handle_parsed_websocket_message( } websocket::incoming::Message::News(message) => { let news = News::from(message); - let symbols = news.symbols.clone().into_iter().collect::>(); let guard = guard.read().await; - if !guard.symbols.iter().any(|symbol| symbols.contains(symbol)) { + if !news + .symbols + .iter() + .any(|symbol| guard.assets.contains_right(symbol)) + { warn!( "{:?} - Race condition: received news for unsubscribed symbols: {:?}.", thread_type, news.symbols @@ -198,21 +208,16 @@ async fn handle_parsed_websocket_message( thread_type, news.symbols, news.time_created ); - let app_config_clone = app_config.clone(); let input = format!("{}\n\n{}", news.headline, news.content); - let prediction = spawn_blocking(move || { - app_config_clone - .sequence_classifier - .lock() - .unwrap() - .predict(vec![input.as_str()]) - .into_iter() - .map(|label| Prediction::try_from(label).unwrap()) - .collect::>()[0] - }) - .await - .unwrap(); + let prediction = app_config + .sequence_classifier + .lock() + .await + .predict(vec![input.as_str()]) + .into_iter() + .map(|label| Prediction::try_from(label).unwrap()) + .collect::>()[0]; let news = News { sentiment: prediction.sentiment, diff --git a/src/threads/guard.rs b/src/threads/guard.rs new file mode 100644 index 0000000..6e67641 --- /dev/null +++ b/src/threads/guard.rs @@ -0,0 +1,19 @@ +use crate::types::Asset; +use bimap::BiMap; +use std::collections::HashSet; + +pub struct Guard { + pub assets: BiMap, + pub pending_subscriptions: HashSet, + pub pending_unsubscriptions: HashSet, +} + +impl Guard { + pub fn new() -> Self { + Self { + assets: BiMap::new(), + pending_subscriptions: HashSet::new(), + pending_unsubscriptions: HashSet::new(), + } + } +} diff --git a/src/threads/mod.rs b/src/threads/mod.rs index 5f09b94..227caf0 100644 --- a/src/threads/mod.rs +++ b/src/threads/mod.rs @@ -1,2 +1,3 @@ pub mod clock; pub mod data; +pub mod guard; diff --git a/src/types/alpaca/api/incoming/asset.rs b/src/types/alpaca/api/incoming/asset.rs index ebc8edd..93bc193 100644 --- a/src/types/alpaca/api/incoming/asset.rs +++ b/src/types/alpaca/api/incoming/asset.rs @@ -43,6 +43,15 @@ pub enum Status { Inactive, } +impl From for bool { + fn from(item: Status) -> Self { + match item { + Status::Active => true, + Status::Inactive => false, + } + } +} + #[allow(clippy::struct_excessive_bools)] #[derive(Clone, Debug, PartialEq, Deserialize)] pub struct Asset { @@ -64,8 +73,7 @@ pub struct Asset { impl From for types::Asset { fn from(item: Asset) -> Self { Self { - symbol: item.symbol.clone(), - abbreviation: item.symbol.replace('/', ""), + symbol: item.symbol, class: item.class.into(), exchange: item.exchange.into(), time_added: time::OffsetDateTime::now_utc(), diff --git a/src/types/alpaca/api/incoming/news.rs b/src/types/alpaca/api/incoming/news.rs index 67742ec..4cd32e4 100644 --- a/src/types/alpaca/api/incoming/news.rs +++ b/src/types/alpaca/api/incoming/news.rs @@ -1,4 +1,7 @@ -use crate::{types, utils::normalize_news_content}; +use crate::{ + types, + utils::{add_slash_to_pair, normalize_news_content}, +}; use serde::Deserialize; use time::OffsetDateTime; @@ -41,7 +44,11 @@ impl From for types::News { id: news.id, time_created: news.time_created, time_updated: news.time_updated, - symbols: news.symbols, + symbols: news + .symbols + .into_iter() + .map(|symbol| add_slash_to_pair(&symbol)) + .collect(), headline: normalize_news_content(&news.headline), author: normalize_news_content(&news.author), source: normalize_news_content(&news.source), diff --git a/src/types/alpaca/api/outgoing/news.rs b/src/types/alpaca/api/outgoing/news.rs index 8bc64c2..5a3c2ee 100644 --- a/src/types/alpaca/api/outgoing/news.rs +++ b/src/types/alpaca/api/outgoing/news.rs @@ -1,4 +1,5 @@ use super::serialize_symbols; +use crate::utils::remove_slash_from_pair; use serde::Serialize; use time::OffsetDateTime; @@ -18,7 +19,7 @@ pub struct News { } impl News { - pub const fn new( + pub fn new( symbols: Vec, start: OffsetDateTime, end: OffsetDateTime, @@ -28,7 +29,10 @@ impl News { page_token: Option, ) -> Self { Self { - symbols, + symbols: symbols + .into_iter() + .map(|symbol| remove_slash_from_pair(&symbol)) + .collect(), start, end, limit, diff --git a/src/types/alpaca/websocket/incoming/news.rs b/src/types/alpaca/websocket/incoming/news.rs index 682c3b5..1d3b009 100644 --- a/src/types/alpaca/websocket/incoming/news.rs +++ b/src/types/alpaca/websocket/incoming/news.rs @@ -1,4 +1,7 @@ -use crate::{types, utils::normalize_news_content}; +use crate::{ + types, + utils::{add_slash_to_pair, normalize_news_content}, +}; use serde::Deserialize; use time::OffsetDateTime; @@ -26,7 +29,11 @@ impl From for types::News { id: news.id, time_created: news.time_created, time_updated: news.time_updated, - symbols: news.symbols, + symbols: news + .symbols + .into_iter() + .map(|symbol| add_slash_to_pair(&symbol)) + .collect(), headline: normalize_news_content(&news.headline), author: normalize_news_content(&news.author), source: normalize_news_content(&news.source), diff --git a/src/types/alpaca/websocket/outgoing/subscribe.rs b/src/types/alpaca/websocket/outgoing/subscribe.rs index 311d888..465ab62 100644 --- a/src/types/alpaca/websocket/outgoing/subscribe.rs +++ b/src/types/alpaca/websocket/outgoing/subscribe.rs @@ -1,3 +1,4 @@ +use crate::utils::remove_slash_from_pair; use serde::Serialize; #[derive(Serialize)] @@ -24,7 +25,12 @@ pub struct NewsMessage { impl NewsMessage { pub fn new(symbols: Vec) -> Self { - Self { news: symbols } + Self { + news: symbols + .into_iter() + .map(|symbol| remove_slash_from_pair(&symbol)) + .collect(), + } } } diff --git a/src/types/asset.rs b/src/types/asset.rs index 67bebac..7a3335c 100644 --- a/src/types/asset.rs +++ b/src/types/asset.rs @@ -1,6 +1,7 @@ use clickhouse::Row; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; +use std::hash::{Hash, Hasher}; use time::OffsetDateTime; #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize_repr, Deserialize_repr)] @@ -26,9 +27,14 @@ pub enum Exchange { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Row)] pub struct Asset { pub symbol: String, - pub abbreviation: String, pub class: Class, pub exchange: Exchange, #[serde(with = "clickhouse::serde::time::datetime")] pub time_added: OffsetDateTime, } + +impl Hash for Asset { + fn hash(&self, state: &mut H) { + self.symbol.hash(state); + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 9d111eb..4ed449d 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -4,6 +4,6 @@ pub mod time; pub mod websocket; pub use cleanup::cleanup; -pub use news::normalize_news_content; +pub use news::{add_slash_to_pair, normalize_news_content, remove_slash_from_pair}; pub use time::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}; pub use websocket::authenticate; diff --git a/src/utils/news.rs b/src/utils/news.rs index c5559c6..799d361 100644 --- a/src/utils/news.rs +++ b/src/utils/news.rs @@ -13,3 +13,16 @@ pub fn normalize_news_content(content: &str) -> String { content.to_string() } + +pub fn add_slash_to_pair(pair: &str) -> String { + let regex = Regex::new(r"^(.+)(BTC|USD.?)$").unwrap(); + + regex.captures(pair).map_or_else( + || pair.to_string(), + |caps| format!("{}/{}", &caps[1], &caps[2]), + ) +} + +pub fn remove_slash_from_pair(pair: &str) -> String { + pair.replace('/', "") +} diff --git a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql index ed4ff7d..c8650fe 100644 --- a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql +++ b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql @@ -1,6 +1,5 @@ CREATE TABLE IF NOT EXISTS qrust.assets ( symbol LowCardinality(String), - abbreviation LowCardinality(String), class Enum('us_equity' = 1, 'crypto' = 2), exchange Enum( 'AMEX' = 1, @@ -13,7 +12,6 @@ CREATE TABLE IF NOT EXISTS qrust.assets ( 'CRYPTO' = 8 ), time_added DateTime DEFAULT now(), - CONSTRAINT abbreviation ASSUME replace(symbol, '/', '') = abbreviation ) ENGINE = ReplacingMergeTree() PRIMARY KEY symbol;