From 2829bb2970cb4ab9df99aa07ebe6bf8c954d05e0 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Fri, 26 Jan 2024 13:26:52 +0000 Subject: [PATCH] Add news content normalization and storing Signed-off-by: Nikolaos Karaolidis --- Cargo.lock | 55 +++++++++++++++++++ Cargo.toml | 2 + src/types/alpaca/api/incoming/news.rs | 17 +++--- src/types/alpaca/websocket/incoming/news.rs | 17 +++--- src/types/news.rs | 8 ++- src/utils/mod.rs | 2 + src/utils/news.rs | 22 ++++++++ .../docker-entrypoint-initdb.d/0000_init.sql | 7 ++- 8 files changed, 111 insertions(+), 19 deletions(-) create mode 100644 src/utils/news.rs diff --git a/Cargo.lock b/Cargo.lock index 30e4677..31285ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aho-corasick" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +dependencies = [ + "memchr", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -686,6 +695,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "html-escape" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d1ad449764d627e22bfd7cd5e8868264fc9236e07c752972b4080cd351cb476" +dependencies = [ + "utf8-width", +] + [[package]] name = "http" version = "0.2.11" @@ -1282,9 +1300,11 @@ dependencies = [ "dotenv", "futures-util", "governor", + "html-escape", "http 1.0.0", "log", "log4rs", + "regex", "reqwest", "serde", "serde_json", @@ -1369,6 +1389,35 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "regex" +version = "1.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + [[package]] name = "reqwest" version = "0.11.23" @@ -2017,6 +2066,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8-width" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86bd8d4e895da8537e5315b8254664e6b769c4ff3db18321b297a1e7004392e3" + [[package]] name = "uuid" version = "1.7.0" diff --git a/Cargo.toml b/Cargo.toml index 1394c43..e24cf28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,3 +50,5 @@ time = { version = "0.3.31", features = [ backoff = { version = "0.4.0", features = [ "tokio", ] } +regex = "1.10.3" +html-escape = "0.2.13" diff --git a/src/types/alpaca/api/incoming/news.rs b/src/types/alpaca/api/incoming/news.rs index ef45f70..60715a3 100644 --- a/src/types/alpaca/api/incoming/news.rs +++ b/src/types/alpaca/api/incoming/news.rs @@ -1,4 +1,4 @@ -use crate::types; +use crate::{types, utils::normalize_news_content}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use time::OffsetDateTime; @@ -28,8 +28,10 @@ pub struct News { #[serde(rename = "updated_at")] pub time_updated: OffsetDateTime, pub symbols: Vec, - pub headline: String, - pub author: String, + #[serde_as(as = "NoneAsEmptyString")] + pub headline: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub author: Option, #[serde_as(as = "NoneAsEmptyString")] pub source: Option, #[serde_as(as = "NoneAsEmptyString")] @@ -48,10 +50,11 @@ impl From for types::News { time_created: news.time_created, time_updated: news.time_updated, symbols: news.symbols, - headline: news.headline, - author: news.author, - source: news.source, - summary: news.summary, + headline: normalize_news_content(news.headline), + author: normalize_news_content(news.author), + source: normalize_news_content(news.source), + summary: normalize_news_content(news.summary), + content: normalize_news_content(news.content), url: news.url, } } diff --git a/src/types/alpaca/websocket/incoming/news.rs b/src/types/alpaca/websocket/incoming/news.rs index f9565fd..5563cb5 100644 --- a/src/types/alpaca/websocket/incoming/news.rs +++ b/src/types/alpaca/websocket/incoming/news.rs @@ -1,4 +1,4 @@ -use crate::types; +use crate::{types, utils::normalize_news_content}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use time::OffsetDateTime; @@ -14,8 +14,10 @@ pub struct Message { #[serde(rename = "updated_at")] pub time_updated: OffsetDateTime, pub symbols: Vec, - pub headline: String, - pub author: String, + #[serde_as(as = "NoneAsEmptyString")] + pub headline: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub author: Option, #[serde_as(as = "NoneAsEmptyString")] pub source: Option, #[serde_as(as = "NoneAsEmptyString")] @@ -33,10 +35,11 @@ impl From for types::News { time_created: news.time_created, time_updated: news.time_updated, symbols: news.symbols, - headline: news.headline, - author: news.author, - source: news.source, - summary: news.summary, + headline: normalize_news_content(news.headline), + author: normalize_news_content(news.author), + source: normalize_news_content(news.source), + summary: normalize_news_content(news.summary), + content: normalize_news_content(news.content), url: news.url, } } diff --git a/src/types/news.rs b/src/types/news.rs index 9e97cd1..1900329 100644 --- a/src/types/news.rs +++ b/src/types/news.rs @@ -12,12 +12,16 @@ pub struct News { #[serde(with = "clickhouse::serde::time::datetime")] pub time_updated: OffsetDateTime, pub symbols: Vec, - pub headline: String, - pub author: String, + #[serde_as(as = "NoneAsEmptyString")] + pub headline: Option, + #[serde_as(as = "NoneAsEmptyString")] + pub author: Option, #[serde_as(as = "NoneAsEmptyString")] pub source: Option, #[serde_as(as = "NoneAsEmptyString")] pub summary: Option, #[serde_as(as = "NoneAsEmptyString")] + pub content: Option, + #[serde_as(as = "NoneAsEmptyString")] pub url: Option, } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 7918a26..9d111eb 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,7 +1,9 @@ pub mod cleanup; +pub mod news; pub mod time; pub mod websocket; pub use cleanup::cleanup; +pub use news::normalize_news_content; pub use time::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE}; pub use websocket::authenticate; diff --git a/src/utils/news.rs b/src/utils/news.rs new file mode 100644 index 0000000..084e217 --- /dev/null +++ b/src/utils/news.rs @@ -0,0 +1,22 @@ +use html_escape::decode_html_entities; +use regex::Regex; + +pub fn normalize_news_content(content: Option) -> Option { + content.as_ref()?; + let content = content.unwrap(); + + let re_tags = Regex::new("<[^>]+>").unwrap(); + let re_spaces = Regex::new("[\\u00A0\\s]+").unwrap(); + + let content = content.replace('\n', " "); + let content = re_tags.replace_all(&content, ""); + let content = re_spaces.replace_all(&content, " "); + let content = decode_html_entities(&content); + let content = content.trim(); + + if content.is_empty() { + None + } else { + Some(content.to_string()) + } +} diff --git a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql index dc475d8..2426bee 100644 --- a/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql +++ b/support/clickhouse/docker-entrypoint-initdb.d/0000_init.sql @@ -47,9 +47,10 @@ CREATE TABLE IF NOT EXISTS qrust.news ( symbols Array(LowCardinality(String)), headline String, author String, - source Nullable(String), - summary Nullable(String), - url Nullable(String), + source String, + summary String, + content String, + url String, INDEX index_symbols symbols TYPE bloom_filter() ) ENGINE = ReplacingMergeTree()