diff --git a/Cargo.lock b/Cargo.lock index 4500d25..22e3124 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,9 +30,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.8" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42cd52102d3df161c77a887b608d7a4897d7cc112886a9537b738a887a03aaff" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", "once_cell", @@ -72,15 +72,15 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.79" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" [[package]] name = "arc-swap" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +checksum = "7b3d0060af21e8d11a926981cc00c6c1541aa91dd64b9f881985c3da1094425f" [[package]] name = "async-trait" @@ -90,7 +90,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] @@ -109,10 +109,10 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.1.0", + "hyper 1.2.0", "hyper-util", "itoa", "matchit", @@ -142,7 +142,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "http-body-util", "mime", @@ -218,18 +218,18 @@ dependencies = [ [[package]] name = "bstr" -version = "1.9.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c48f0051a4b4c5e0b6d365cd04af53aeaa209e3cc15ec2cdb69e73cc87fbd0dc" +checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706" dependencies = [ "memchr", ] [[package]] name = "bumpalo" -version = "3.15.0" +version = "3.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d32a994c2b3ca201d9b263612a374263f05e7adde37c4707f693dcd375076d1f" +checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" [[package]] name = "byteorder" @@ -288,9 +288,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.83" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" dependencies = [ "jobserver", "libc", @@ -304,15 +304,15 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.34" +version = "0.4.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" +checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", "serde", - "windows-targets 0.52.0", + "windows-targets 0.52.4", ] [[package]] @@ -488,9 +488,9 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.6" +version = "0.20.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c376d08ea6aa96aafe61237c7200d1241cb177b7d3a542d791f2d118e9cbb955" +checksum = "54e36fcd13ed84ffdfda6f5be89b31287cbb80c439841fe69e04841435464391" dependencies = [ "darling_core", "darling_macro", @@ -498,27 +498,27 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.6" +version = "0.20.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33043dcd19068b8192064c704b3f83eb464f91f1ff527b44a4e2b08d9cdb8855" +checksum = "9c2cf1c23a687a1feeb728783b993c4e1ad83d99f351801977dd809b48d0a70f" dependencies = [ "fnv", "ident_case", "proc-macro2", "quote", "strsim", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] name = "darling_macro" -version = "0.20.6" +version = "0.20.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5a91391accf613803c2a9bf9abccdbaa07c54b4244a5b64883f9c3c137c86be" +checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] @@ -766,7 +766,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] @@ -783,9 +783,9 @@ checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-timer" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" @@ -869,8 +869,8 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 0.2.11", - "indexmap 2.2.3", + "http 0.2.12", + "indexmap 2.2.5", "slab", "tokio", "tokio-util", @@ -888,8 +888,8 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 1.0.0", - "indexmap 2.2.3", + "http 1.1.0", + "indexmap 2.2.5", "slab", "tokio", "tokio-util", @@ -898,9 +898,9 @@ dependencies = [ [[package]] name = "half" -version = "2.3.1" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc52e53916c08643f1b56ec082790d1e86a32e58dc5268f897f313fbae7b4872" +checksum = "b5eceaaeec696539ddaf7b333340f1af35a5aa87ae3e4f3ead0532f72affab2e" dependencies = [ "cfg-if", "crunchy", @@ -933,9 +933,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.6" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hex" @@ -963,9 +963,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" dependencies = [ "bytes", "fnv", @@ -974,9 +974,9 @@ dependencies = [ [[package]] name = "http" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -990,7 +990,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http 0.2.11", + "http 0.2.12", "pin-project-lite", ] @@ -1001,7 +1001,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.0.0", + "http 1.1.0", ] [[package]] @@ -1012,7 +1012,7 @@ checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" dependencies = [ "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "pin-project-lite", ] @@ -1046,7 +1046,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.3.24", - "http 0.2.11", + "http 0.2.12", "http-body 0.4.6", "httparse", "httpdate", @@ -1061,20 +1061,21 @@ dependencies = [ [[package]] name = "hyper" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" dependencies = [ "bytes", "futures-channel", "futures-util", "h2 0.4.2", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", "httparse", "httpdate", "itoa", "pin-project-lite", + "smallvec", "tokio", ] @@ -1099,9 +1100,9 @@ checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" dependencies = [ "bytes", "futures-util", - "http 1.0.0", + "http 1.1.0", "http-body 1.0.0", - "hyper 1.1.0", + "hyper 1.2.0", "pin-project-lite", "socket2", "tokio", @@ -1159,9 +1160,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.3" +version = "2.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" +checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -1239,9 +1240,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.68" +version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" dependencies = [ "wasm-bindgen", ] @@ -1287,9 +1288,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" dependencies = [ "serde", ] @@ -1387,9 +1388,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi", @@ -1484,9 +1485,9 @@ dependencies = [ [[package]] name = "num_threads" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" dependencies = [ "libc", ] @@ -1514,9 +1515,9 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl" -version = "0.10.63" +version = "0.10.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" dependencies = [ "bitflags 2.4.2", "cfg-if", @@ -1535,7 +1536,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] @@ -1546,9 +1547,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.99" +version = "0.9.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22e1bf214306098e4832460f797824c05d25aacdf896f64a985fb0fd992454ae" +checksum = "dda2b0f344e78efc2facf7d195d098df0dd72151b26ab98da807afc26c198dff" dependencies = [ "cc", "libc", @@ -1634,22 +1635,22 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] @@ -1715,7 +1716,7 @@ dependencies = [ "futures-util", "governor", "html-escape", - "http 1.0.0", + "http 1.1.0", "itertools 0.12.1", "lazy_static", "log", @@ -1805,9 +1806,9 @@ checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" [[package]] name = "rayon" -version = "1.8.1" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051" +checksum = "e4963ed1bc86e4f3ee217022bd855b297cef07fb9eac5dfa1f788b220b49b3bd" dependencies = [ "either", "rayon-core", @@ -1857,9 +1858,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", @@ -1874,9 +1875,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.24" +version = "0.11.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" +checksum = "0eea5a9eb898d3783f17c6407670e3592fd174cb81a10e51d4c37f49450b9946" dependencies = [ "base64", "bytes", @@ -1884,7 +1885,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.3.24", - "http 0.2.11", + "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", "hyper-tls", @@ -1988,9 +1989,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "ryu" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" [[package]] name = "safetensors" @@ -2054,18 +2055,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] [[package]] name = "serde-aux" -version = "4.4.0" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a86348501c129f3ad50c2f4635a01971f76974cd8a3f335988a0f1581c082765" +checksum = "0d2e8bfba469d06512e11e3311d4d051a4a387a5b42d010404fecf3200321c95" dependencies = [ "chrono", "serde", @@ -2084,13 +2085,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] @@ -2106,9 +2107,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.113" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ "itoa", "ryu", @@ -2117,9 +2118,9 @@ dependencies = [ [[package]] name = "serde_path_to_error" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebd154a240de39fdebcf5775d2675c204d7c13cf39a4c697be6493c8e734337c" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" dependencies = [ "itoa", "serde", @@ -2133,7 +2134,7 @@ checksum = "0b2e6b945e9d3df726b65d6ee24060aff8e3533d431f677a9695db04eff9dfdb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] @@ -2158,7 +2159,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.2.3", + "indexmap 2.2.5", "serde", "serde_derive", "serde_json", @@ -2175,16 +2176,16 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] name = "serde_yaml" -version = "0.9.31" +version = "0.9.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adf8a49373e98a4c5f0ceb5d05aa7c648d75f63774981ed95b7c7443bbd50c6e" +checksum = "8fd075d994154d4a774f95b51fb96bdc2832b0ea48425c92546073816cda1f2f" dependencies = [ - "indexmap 2.2.3", + "indexmap 2.2.5", "itoa", "ryu", "serde", @@ -2241,12 +2242,12 @@ checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" [[package]] name = "socket2" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -2289,9 +2290,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.49" +version = "2.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "915aea9e586f80826ee59f8453c1101f9d1c4b3964cd2460185ee8e299ada496" +checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" dependencies = [ "proc-macro2", "quote", @@ -2306,20 +2307,20 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "system-configuration" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +checksum = "658bc6ee10a9b4fcf576e9b0819d95ec16f4d2c02d39fd83ac1c8789785c4a42" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.2", "core-foundation", "system-configuration-sys", ] [[package]] name = "system-configuration-sys" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" dependencies = [ "core-foundation-sys", "libc", @@ -2355,9 +2356,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.10.0" +version = "3.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", "fastrand", @@ -2382,7 +2383,7 @@ checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] @@ -2468,7 +2469,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] @@ -2584,7 +2585,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http 1.0.0", + "http 1.1.0", "httparse", "log", "native-tls", @@ -2624,9 +2625,9 @@ checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-normalization" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" dependencies = [ "tinyvec", ] @@ -2723,9 +2724,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -2733,24 +2734,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" +checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" dependencies = [ "cfg-if", "js-sys", @@ -2760,9 +2761,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2770,28 +2771,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.91" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" [[package]] name = "web-sys" -version = "0.3.68" +version = "0.3.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96565907687f7aceb35bc5fc03770a8a0471d82e479f25832f54a0e3f4b28446" +checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" dependencies = [ "js-sys", "wasm-bindgen", @@ -2825,7 +2826,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.4", ] [[package]] @@ -2843,7 +2844,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.4", ] [[package]] @@ -2863,17 +2864,17 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" dependencies = [ - "windows_aarch64_gnullvm 0.52.0", - "windows_aarch64_msvc 0.52.0", - "windows_i686_gnu 0.52.0", - "windows_i686_msvc 0.52.0", - "windows_x86_64_gnu 0.52.0", - "windows_x86_64_gnullvm 0.52.0", - "windows_x86_64_msvc 0.52.0", + "windows_aarch64_gnullvm 0.52.4", + "windows_aarch64_msvc 0.52.4", + "windows_i686_gnu 0.52.4", + "windows_i686_msvc 0.52.4", + "windows_x86_64_gnu 0.52.4", + "windows_x86_64_gnullvm 0.52.4", + "windows_x86_64_msvc 0.52.4", ] [[package]] @@ -2884,9 +2885,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" [[package]] name = "windows_aarch64_msvc" @@ -2896,9 +2897,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" [[package]] name = "windows_i686_gnu" @@ -2908,9 +2909,9 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" [[package]] name = "windows_i686_msvc" @@ -2920,9 +2921,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" [[package]] name = "windows_x86_64_gnu" @@ -2932,9 +2933,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" [[package]] name = "windows_x86_64_gnullvm" @@ -2944,9 +2945,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" [[package]] name = "windows_x86_64_msvc" @@ -2956,9 +2957,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.0" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" [[package]] name = "winreg" @@ -2998,7 +2999,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.49", + "syn 2.0.52", ] [[package]] diff --git a/src/threads/data/backfill.rs b/src/threads/data/backfill.rs deleted file mode 100644 index a4b587b..0000000 --- a/src/threads/data/backfill.rs +++ /dev/null @@ -1,596 +0,0 @@ -use super::ThreadType; -use crate::{ - config::{ - Config, ALPACA_CRYPTO_DATA_API_URL, ALPACA_SOURCE, ALPACA_STOCK_DATA_API_URL, - BERT_MAX_INPUTS, - }, - database, - types::{ - alpaca::{ - self, - shared::{Sort, Source}, - }, - news::Prediction, - Backfill, Bar, Class, News, - }, - utils::{duration_until, last_minute, FIFTEEN_MINUTES, ONE_MINUTE, ONE_SECOND}, -}; -use async_trait::async_trait; -use futures_util::future::join_all; -use itertools::{Either, Itertools}; -use log::{error, info, warn}; -use std::{collections::HashMap, sync::Arc}; -use time::OffsetDateTime; -use tokio::{ - spawn, - sync::{mpsc, oneshot, Mutex}, - task::{block_in_place, JoinHandle}, - time::sleep, - try_join, -}; -use uuid::Uuid; - -pub enum Action { - Backfill, - Purge, -} - -pub struct Message { - pub action: Action, - pub symbols: Vec, - pub response: oneshot::Sender<()>, -} - -impl Message { - pub fn new(action: Action, symbols: Vec) -> (Self, oneshot::Receiver<()>) { - let (sender, receiver) = oneshot::channel::<()>(); - ( - Self { - action, - symbols, - response: sender, - }, - receiver, - ) - } -} - -#[derive(Clone)] -pub struct Job { - pub fetch_from: OffsetDateTime, - pub fetch_to: OffsetDateTime, -} - -#[async_trait] -pub trait Handler: Send + Sync { - async fn select_latest_backfills( - &self, - symbols: &[String], - ) -> Result, clickhouse::error::Error>; - async fn delete_backfills(&self, symbol: &[String]) -> Result<(), clickhouse::error::Error>; - async fn delete_data(&self, symbol: &[String]) -> Result<(), clickhouse::error::Error>; - async fn queue_backfill(&self, jobs: &HashMap); - async fn backfill(&self, jobs: HashMap); - fn max_limit(&self) -> i64; - fn log_string(&self) -> &'static str; -} - -pub struct Jobs { - pub symbol_to_uuid: HashMap, - pub uuid_to_job: HashMap>, -} - -impl Jobs { - pub fn insert(&mut self, jobs: Vec, fut: JoinHandle<()>) { - let uuid = Uuid::new_v4(); - for symbol in jobs { - self.symbol_to_uuid.insert(symbol.clone(), uuid); - } - self.uuid_to_job.insert(uuid, fut); - } - - pub fn get(&self, symbol: &str) -> Option<&JoinHandle<()>> { - self.symbol_to_uuid - .get(symbol) - .and_then(|uuid| self.uuid_to_job.get(uuid)) - } - - pub fn remove(&mut self, symbol: &str) -> Option> { - self.symbol_to_uuid - .remove(symbol) - .and_then(|uuid| self.uuid_to_job.remove(&uuid)) - } -} - -pub async fn run(handler: Arc>, mut receiver: mpsc::Receiver) { - let backfill_jobs = Arc::new(Mutex::new(Jobs { - symbol_to_uuid: HashMap::new(), - uuid_to_job: HashMap::new(), - })); - - loop { - let message = receiver.recv().await.unwrap(); - spawn(handle_backfill_message( - handler.clone(), - backfill_jobs.clone(), - message, - )); - } -} - -async fn handle_backfill_message( - handler: Arc>, - backfill_jobs: Arc>, - message: Message, -) { - let mut backfill_jobs = backfill_jobs.lock().await; - - match message.action { - Action::Backfill => { - let log_string = handler.log_string(); - let max_limit = handler.max_limit(); - - let backfills = handler - .select_latest_backfills(&message.symbols) - .await - .unwrap() - .into_iter() - .map(|backfill| (backfill.symbol.clone(), backfill)) - .collect::>(); - - let mut jobs = vec![]; - - for symbol in message.symbols { - if let Some(job) = backfill_jobs.get(&symbol) { - if !job.is_finished() { - warn!( - "Backfill for {} {} is already running, skipping.", - symbol, log_string - ); - continue; - } - } - - let fetch_from = backfills - .get(&symbol) - .map_or(OffsetDateTime::UNIX_EPOCH, |backfill| { - backfill.time + ONE_SECOND - }); - - let fetch_to = last_minute(); - - if fetch_from > fetch_to { - info!("No need to backfill {} {}.", symbol, log_string,); - return; - } - - jobs.push(( - symbol, - Job { - fetch_from, - fetch_to, - }, - )); - } - - let jobs = jobs - .into_iter() - .sorted_by_key(|job| job.1.fetch_from) - .collect::>(); - - let mut job_groups = vec![HashMap::new()]; - let mut current_minutes = 0; - - for job in jobs { - let minutes = (job.1.fetch_to - job.1.fetch_from).whole_minutes(); - - if job_groups.last().unwrap().is_empty() || (current_minutes + minutes) <= max_limit - { - let job_group = job_groups.last_mut().unwrap(); - job_group.insert(job.0, job.1); - current_minutes += minutes; - } else { - let mut job_group = HashMap::new(); - job_group.insert(job.0, job.1); - job_groups.push(job_group); - current_minutes = minutes; - } - } - - for job_group in job_groups { - let symbols = job_group.keys().cloned().collect::>(); - - let handler = handler.clone(); - let fut = spawn(async move { - handler.queue_backfill(&job_group).await; - handler.backfill(job_group).await; - }); - - backfill_jobs.insert(symbols, fut); - } - } - Action::Purge => { - for symbol in &message.symbols { - if let Some(job) = backfill_jobs.remove(symbol) { - if !job.is_finished() { - job.abort(); - } - let _ = job.await; - } - } - - try_join!( - handler.delete_backfills(&message.symbols), - handler.delete_data(&message.symbols) - ) - .unwrap(); - } - } - - message.response.send(()).unwrap(); -} - -struct BarHandler { - config: Arc, - data_url: &'static str, - api_query_constructor: fn( - symbols: Vec, - fetch_from: OffsetDateTime, - fetch_to: OffsetDateTime, - next_page_token: Option, - ) -> alpaca::api::outgoing::bar::Bar, -} - -fn us_equity_query_constructor( - symbols: Vec, - fetch_from: OffsetDateTime, - fetch_to: OffsetDateTime, - next_page_token: Option, -) -> alpaca::api::outgoing::bar::Bar { - alpaca::api::outgoing::bar::Bar::UsEquity(alpaca::api::outgoing::bar::UsEquity { - symbols, - start: Some(fetch_from), - end: Some(fetch_to), - page_token: next_page_token, - sort: Some(Sort::Asc), - ..Default::default() - }) -} - -fn crypto_query_constructor( - symbols: Vec, - fetch_from: OffsetDateTime, - fetch_to: OffsetDateTime, - next_page_token: Option, -) -> alpaca::api::outgoing::bar::Bar { - alpaca::api::outgoing::bar::Bar::Crypto(alpaca::api::outgoing::bar::Crypto { - symbols, - start: Some(fetch_from), - end: Some(fetch_to), - page_token: next_page_token, - sort: Some(Sort::Asc), - ..Default::default() - }) -} - -#[async_trait] -impl Handler for BarHandler { - async fn select_latest_backfills( - &self, - symbols: &[String], - ) -> Result, clickhouse::error::Error> { - database::backfills_bars::select_where_symbols( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - symbols, - ) - .await - } - - async fn delete_backfills(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> { - database::backfills_bars::delete_where_symbols( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - symbols, - ) - .await - } - - async fn delete_data(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> { - database::bars::delete_where_symbols( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - symbols, - ) - .await - } - - async fn queue_backfill(&self, jobs: &HashMap) { - if *ALPACA_SOURCE == Source::Sip { - return; - } - - let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); - let run_delay = duration_until(fetch_to + FIFTEEN_MINUTES + ONE_MINUTE); - let symbols = jobs.keys().collect::>(); - - info!("Queing bar backfill for {:?} in {:?}.", symbols, run_delay); - sleep(run_delay).await; - } - - async fn backfill(&self, jobs: HashMap) { - let symbols = jobs.keys().cloned().collect::>(); - let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); - let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); - - info!("Backfilling bars for {:?}.", symbols); - - let mut bars = vec![]; - let mut last_time = symbols - .iter() - .map(|symbol| (symbol.clone(), None)) - .collect::>(); - let mut next_page_token = None; - - loop { - let Ok(message) = alpaca::api::incoming::bar::get( - &self.config.alpaca_client, - &self.config.alpaca_rate_limiter, - self.data_url, - &(self.api_query_constructor)( - symbols.clone(), - fetch_from, - fetch_to, - next_page_token.clone(), - ), - None, - ) - .await - else { - error!("Failed to backfill bars for {:?}.", symbols); - return; - }; - - for (symbol, bar_vec) in message.bars { - if let Some(last) = bar_vec.last() { - last_time.insert(symbol.clone(), Some(last.time)); - } - - for bar in bar_vec { - bars.push(Bar::from((bar, symbol.clone()))); - } - } - - if bars.len() >= database::bars::BATCH_FLUSH_SIZE || message.next_page_token.is_none() { - database::bars::upsert_batch( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &bars, - ) - .await - .unwrap(); - bars = vec![]; - } - - if message.next_page_token.is_none() { - break; - } - next_page_token = message.next_page_token; - } - - let (backfilled, skipped): (Vec<_>, Vec<_>) = - last_time.into_iter().partition_map(|(symbol, time)| { - if let Some(time) = time { - Either::Left(Backfill { symbol, time }) - } else { - Either::Right(symbol) - } - }); - - database::backfills_bars::upsert_batch( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &backfilled, - ) - .await - .unwrap(); - - info!("No bars to backfill for {:?}.", skipped); - info!("Backfilled bars for {:?}.", backfilled); - } - - fn max_limit(&self) -> i64 { - alpaca::api::outgoing::bar::MAX_LIMIT - } - - fn log_string(&self) -> &'static str { - "bars" - } -} - -struct NewsHandler { - config: Arc, -} - -#[async_trait] -impl Handler for NewsHandler { - async fn select_latest_backfills( - &self, - symbols: &[String], - ) -> Result, clickhouse::error::Error> { - database::backfills_news::select_where_symbols( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - symbols, - ) - .await - } - - async fn delete_backfills(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> { - database::backfills_news::delete_where_symbols( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - symbols, - ) - .await - } - - async fn delete_data(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> { - database::news::delete_where_symbols( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - symbols, - ) - .await - } - - async fn queue_backfill(&self, jobs: &HashMap) { - if *ALPACA_SOURCE == Source::Sip { - return; - } - - let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); - let run_delay = duration_until(fetch_to + FIFTEEN_MINUTES + ONE_MINUTE); - let symbols = jobs.keys().cloned().collect::>(); - - info!("Queing news backfill for {:?} in {:?}.", symbols, run_delay); - sleep(run_delay).await; - } - - async fn backfill(&self, jobs: HashMap) { - let symbols = jobs.keys().cloned().collect::>(); - let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); - let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); - - info!("Backfilling news for {:?}.", symbols); - - let mut news = vec![]; - let mut last_time = symbols - .iter() - .map(|symbol| (symbol.clone(), None)) - .collect::>(); - let mut next_page_token = None; - - loop { - let Ok(message) = alpaca::api::incoming::news::get( - &self.config.alpaca_client, - &self.config.alpaca_rate_limiter, - &alpaca::api::outgoing::news::News { - symbols: symbols.clone(), - start: Some(fetch_from), - end: Some(fetch_to), - page_token: next_page_token.clone(), - ..Default::default() - }, - None, - ) - .await - else { - error!("Failed to backfill news for {:?}.", symbols); - return; - }; - - for news_item in message.news { - let news_item = News::from(news_item); - - for symbol in &news_item.symbols { - last_time.insert(symbol.clone(), Some(news_item.time_created)); - } - - news.push(news_item); - } - - if news.len() >= *BERT_MAX_INPUTS || message.next_page_token.is_none() { - let inputs = news - .iter() - .map(|news| format!("{}\n\n{}", news.headline, news.content)) - .collect::>(); - - let predictions = - join_all(inputs.chunks(*BERT_MAX_INPUTS).map(|inputs| async move { - let sequence_classifier = self.config.sequence_classifier.lock().await; - block_in_place(|| { - sequence_classifier - .predict(inputs.iter().map(String::as_str).collect::>()) - .into_iter() - .map(|label| Prediction::try_from(label).unwrap()) - .collect::>() - }) - })) - .await - .into_iter() - .flatten(); - - news = news - .into_iter() - .zip(predictions) - .map(|(news, prediction)| News { - sentiment: prediction.sentiment, - confidence: prediction.confidence, - ..news - }) - .collect::>(); - } - - if news.len() >= database::news::BATCH_FLUSH_SIZE || message.next_page_token.is_none() { - database::news::upsert_batch( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &news, - ) - .await - .unwrap(); - news = vec![]; - } - - if message.next_page_token.is_none() { - break; - } - next_page_token = message.next_page_token; - } - - let (backfilled, skipped): (Vec<_>, Vec<_>) = - last_time.into_iter().partition_map(|(symbol, time)| { - if let Some(time) = time { - Either::Left(Backfill { symbol, time }) - } else { - Either::Right(symbol) - } - }); - - database::backfills_news::upsert_batch( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &backfilled, - ) - .await - .unwrap(); - - info!("No news to backfill for {:?}.", skipped); - info!("Backfilled news for {:?}.", backfilled); - } - - fn max_limit(&self) -> i64 { - alpaca::api::outgoing::news::MAX_LIMIT - } - - fn log_string(&self) -> &'static str { - "news" - } -} - -pub fn create_handler(thread_type: ThreadType, config: Arc) -> Box { - match thread_type { - ThreadType::Bars(Class::UsEquity) => Box::new(BarHandler { - config, - data_url: ALPACA_STOCK_DATA_API_URL, - api_query_constructor: us_equity_query_constructor, - }), - ThreadType::Bars(Class::Crypto) => Box::new(BarHandler { - config, - data_url: ALPACA_CRYPTO_DATA_API_URL, - api_query_constructor: crypto_query_constructor, - }), - ThreadType::News => Box::new(NewsHandler { config }), - } -} diff --git a/src/threads/data/backfill/bars.rs b/src/threads/data/backfill/bars.rs new file mode 100644 index 0000000..aba5845 --- /dev/null +++ b/src/threads/data/backfill/bars.rs @@ -0,0 +1,207 @@ +use super::Job; +use crate::{ + config::{Config, ALPACA_SOURCE}, + database, + types::{ + alpaca::{ + self, + shared::{Sort, Source}, + }, + Backfill, Bar, + }, + utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE}, +}; +use async_trait::async_trait; +use itertools::{Either, Itertools}; +use log::{error, info}; +use std::{collections::HashMap, sync::Arc}; +use time::OffsetDateTime; +use tokio::time::sleep; + +pub struct Handler { + pub config: Arc, + pub data_url: &'static str, + pub api_query_constructor: fn( + symbols: Vec, + fetch_from: OffsetDateTime, + fetch_to: OffsetDateTime, + next_page_token: Option, + ) -> alpaca::api::outgoing::bar::Bar, +} + +pub fn us_equity_query_constructor( + symbols: Vec, + fetch_from: OffsetDateTime, + fetch_to: OffsetDateTime, + next_page_token: Option, +) -> alpaca::api::outgoing::bar::Bar { + alpaca::api::outgoing::bar::Bar::UsEquity(alpaca::api::outgoing::bar::UsEquity { + symbols, + start: Some(fetch_from), + end: Some(fetch_to), + page_token: next_page_token, + sort: Some(Sort::Asc), + ..Default::default() + }) +} + +pub fn crypto_query_constructor( + symbols: Vec, + fetch_from: OffsetDateTime, + fetch_to: OffsetDateTime, + next_page_token: Option, +) -> alpaca::api::outgoing::bar::Bar { + alpaca::api::outgoing::bar::Bar::Crypto(alpaca::api::outgoing::bar::Crypto { + symbols, + start: Some(fetch_from), + end: Some(fetch_to), + page_token: next_page_token, + sort: Some(Sort::Asc), + ..Default::default() + }) +} + +#[async_trait] +impl super::Handler for Handler { + async fn select_latest_backfills( + &self, + symbols: &[String], + ) -> Result, clickhouse::error::Error> { + database::backfills_bars::select_where_symbols( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + symbols, + ) + .await + } + + async fn delete_backfills(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> { + database::backfills_bars::delete_where_symbols( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + symbols, + ) + .await + } + + async fn delete_data(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> { + database::bars::delete_where_symbols( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + symbols, + ) + .await + } + + async fn queue_backfill(&self, jobs: &HashMap) { + if *ALPACA_SOURCE == Source::Sip { + return; + } + + let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); + let run_delay = duration_until(fetch_to + FIFTEEN_MINUTES + ONE_MINUTE); + let symbols = jobs.keys().collect::>(); + + info!("Queing bar backfill for {:?} in {:?}.", symbols, run_delay); + sleep(run_delay).await; + } + + async fn backfill(&self, jobs: HashMap) { + let symbols = jobs.keys().cloned().collect::>(); + let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); + let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); + + info!("Backfilling bars for {:?}.", symbols); + + let mut bars = vec![]; + let mut last_times = symbols + .iter() + .map(|symbol| (symbol.clone(), None)) + .collect::>(); + let mut next_page_token = None; + + loop { + let Ok(message) = alpaca::api::incoming::bar::get( + &self.config.alpaca_client, + &self.config.alpaca_rate_limiter, + self.data_url, + &(self.api_query_constructor)( + symbols.clone(), + fetch_from, + fetch_to, + next_page_token.clone(), + ), + None, + ) + .await + else { + error!("Failed to backfill bars for {:?}.", symbols); + return; + }; + + for (symbol, bar_vec) in message.bars { + if let Some(last) = bar_vec.last() { + last_times.insert(symbol.clone(), Some(last.time)); + } + + for bar in bar_vec { + bars.push(Bar::from((bar, symbol.clone()))); + } + } + + if bars.len() >= database::bars::BATCH_FLUSH_SIZE || message.next_page_token.is_none() { + database::bars::upsert_batch( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + &bars, + ) + .await + .unwrap(); + bars = vec![]; + } + + if message.next_page_token.is_none() { + break; + } + next_page_token = message.next_page_token; + } + + let (backfilled, skipped): (Vec<_>, Vec<_>) = + last_times.into_iter().partition_map(|(symbol, time)| { + if let Some(time) = time { + Either::Left(Backfill { symbol, time }) + } else { + Either::Right(symbol) + } + }); + + database::backfills_bars::upsert_batch( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + &backfilled, + ) + .await + .unwrap(); + + let backfilled = backfilled + .into_iter() + .map(|backfill| backfill.symbol) + .collect::>(); + + if !skipped.is_empty() { + info!("No bars to backfill for {:?}.", skipped); + } + + if !backfilled.is_empty() { + info!("Backfilled bars for {:?}.", backfilled); + } + } + + fn max_limit(&self) -> i64 { + alpaca::api::outgoing::bar::MAX_LIMIT + } + + fn log_string(&self) -> &'static str { + "bars" + } +} diff --git a/src/threads/data/backfill/mod.rs b/src/threads/data/backfill/mod.rs new file mode 100644 index 0000000..a483f40 --- /dev/null +++ b/src/threads/data/backfill/mod.rs @@ -0,0 +1,237 @@ +mod bars; +mod news; + +use super::ThreadType; +use crate::{ + config::{Config, ALPACA_CRYPTO_DATA_API_URL, ALPACA_STOCK_DATA_API_URL}, + types::{Backfill, Class}, + utils::{last_minute, ONE_SECOND}, +}; +use async_trait::async_trait; +use itertools::Itertools; +use log::{info, warn}; +use std::{collections::HashMap, sync::Arc}; +use time::OffsetDateTime; +use tokio::{ + spawn, + sync::{mpsc, oneshot, Mutex}, + task::JoinHandle, + try_join, +}; +use uuid::Uuid; + +pub enum Action { + Backfill, + Purge, +} + +pub struct Message { + pub action: Action, + pub symbols: Vec, + pub response: oneshot::Sender<()>, +} + +impl Message { + pub fn new(action: Action, symbols: Vec) -> (Self, oneshot::Receiver<()>) { + let (sender, receiver) = oneshot::channel::<()>(); + ( + Self { + action, + symbols, + response: sender, + }, + receiver, + ) + } +} + +#[derive(Clone)] +pub struct Job { + pub fetch_from: OffsetDateTime, + pub fetch_to: OffsetDateTime, +} + +#[async_trait] +pub trait Handler: Send + Sync { + async fn select_latest_backfills( + &self, + symbols: &[String], + ) -> Result, clickhouse::error::Error>; + async fn delete_backfills(&self, symbol: &[String]) -> Result<(), clickhouse::error::Error>; + async fn delete_data(&self, symbol: &[String]) -> Result<(), clickhouse::error::Error>; + async fn queue_backfill(&self, jobs: &HashMap); + async fn backfill(&self, jobs: HashMap); + fn max_limit(&self) -> i64; + fn log_string(&self) -> &'static str; +} + +pub struct Jobs { + pub symbol_to_uuid: HashMap, + pub uuid_to_job: HashMap>, +} + +impl Jobs { + pub fn insert(&mut self, jobs: Vec, fut: JoinHandle<()>) { + let uuid = Uuid::new_v4(); + for symbol in jobs { + self.symbol_to_uuid.insert(symbol.clone(), uuid); + } + self.uuid_to_job.insert(uuid, fut); + } + + pub fn get(&self, symbol: &str) -> Option<&JoinHandle<()>> { + self.symbol_to_uuid + .get(symbol) + .and_then(|uuid| self.uuid_to_job.get(uuid)) + } + + pub fn remove(&mut self, symbol: &str) -> Option> { + self.symbol_to_uuid + .remove(symbol) + .and_then(|uuid| self.uuid_to_job.remove(&uuid)) + } +} + +pub async fn run(handler: Arc>, mut receiver: mpsc::Receiver) { + let backfill_jobs = Arc::new(Mutex::new(Jobs { + symbol_to_uuid: HashMap::new(), + uuid_to_job: HashMap::new(), + })); + + loop { + let message = receiver.recv().await.unwrap(); + spawn(handle_backfill_message( + handler.clone(), + backfill_jobs.clone(), + message, + )); + } +} + +async fn handle_backfill_message( + handler: Arc>, + backfill_jobs: Arc>, + message: Message, +) { + let mut backfill_jobs = backfill_jobs.lock().await; + + match message.action { + Action::Backfill => { + let log_string = handler.log_string(); + let max_limit = handler.max_limit(); + + let backfills = handler + .select_latest_backfills(&message.symbols) + .await + .unwrap() + .into_iter() + .map(|backfill| (backfill.symbol.clone(), backfill)) + .collect::>(); + + let mut jobs = vec![]; + + for symbol in message.symbols { + if let Some(job) = backfill_jobs.get(&symbol) { + if !job.is_finished() { + warn!( + "Backfill for {} {} is already running, skipping.", + symbol, log_string + ); + continue; + } + } + + let fetch_from = backfills + .get(&symbol) + .map_or(OffsetDateTime::UNIX_EPOCH, |backfill| { + backfill.time + ONE_SECOND + }); + + let fetch_to = last_minute(); + + if fetch_from > fetch_to { + info!("No need to backfill {} {}.", symbol, log_string,); + return; + } + + jobs.push(( + symbol, + Job { + fetch_from, + fetch_to, + }, + )); + } + + let jobs = jobs + .into_iter() + .sorted_by_key(|job| job.1.fetch_from) + .collect::>(); + + let mut job_groups = vec![HashMap::new()]; + let mut current_minutes = 0; + + for job in jobs { + let minutes = (job.1.fetch_to - job.1.fetch_from).whole_minutes(); + + if job_groups.last().unwrap().is_empty() || (current_minutes + minutes) <= max_limit + { + let job_group = job_groups.last_mut().unwrap(); + job_group.insert(job.0, job.1); + current_minutes += minutes; + } else { + let mut job_group = HashMap::new(); + job_group.insert(job.0, job.1); + job_groups.push(job_group); + current_minutes = minutes; + } + } + + for job_group in job_groups { + let symbols = job_group.keys().cloned().collect::>(); + + let handler = handler.clone(); + let fut = spawn(async move { + handler.queue_backfill(&job_group).await; + handler.backfill(job_group).await; + }); + + backfill_jobs.insert(symbols, fut); + } + } + Action::Purge => { + for symbol in &message.symbols { + if let Some(job) = backfill_jobs.remove(symbol) { + if !job.is_finished() { + job.abort(); + } + let _ = job.await; + } + } + + try_join!( + handler.delete_backfills(&message.symbols), + handler.delete_data(&message.symbols) + ) + .unwrap(); + } + } + + message.response.send(()).unwrap(); +} + +pub fn create_handler(thread_type: ThreadType, config: Arc) -> Box { + match thread_type { + ThreadType::Bars(Class::UsEquity) => Box::new(bars::Handler { + config, + data_url: ALPACA_STOCK_DATA_API_URL, + api_query_constructor: bars::us_equity_query_constructor, + }), + ThreadType::Bars(Class::Crypto) => Box::new(bars::Handler { + config, + data_url: ALPACA_CRYPTO_DATA_API_URL, + api_query_constructor: bars::crypto_query_constructor, + }), + ThreadType::News => Box::new(news::Handler { config }), + } +} diff --git a/src/threads/data/backfill/news.rs b/src/threads/data/backfill/news.rs new file mode 100644 index 0000000..34a7745 --- /dev/null +++ b/src/threads/data/backfill/news.rs @@ -0,0 +1,205 @@ +use super::Job; +use crate::{ + config::{Config, ALPACA_SOURCE, BERT_MAX_INPUTS}, + database, + types::{ + alpaca::{ + self, + shared::{Sort, Source}, + }, + news::Prediction, + Backfill, News, + }, + utils::{duration_until, FIFTEEN_MINUTES, ONE_MINUTE}, +}; +use async_trait::async_trait; +use futures_util::future::join_all; +use itertools::{Either, Itertools}; +use log::{error, info}; +use std::{collections::HashMap, sync::Arc}; +use tokio::{task::block_in_place, time::sleep}; + +pub struct Handler { + pub config: Arc, +} + +#[async_trait] +impl super::Handler for Handler { + async fn select_latest_backfills( + &self, + symbols: &[String], + ) -> Result, clickhouse::error::Error> { + database::backfills_news::select_where_symbols( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + symbols, + ) + .await + } + + async fn delete_backfills(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> { + database::backfills_news::delete_where_symbols( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + symbols, + ) + .await + } + + async fn delete_data(&self, symbols: &[String]) -> Result<(), clickhouse::error::Error> { + database::news::delete_where_symbols( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + symbols, + ) + .await + } + + async fn queue_backfill(&self, jobs: &HashMap) { + if *ALPACA_SOURCE == Source::Sip { + return; + } + + let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); + let run_delay = duration_until(fetch_to + FIFTEEN_MINUTES + ONE_MINUTE); + let symbols = jobs.keys().cloned().collect::>(); + + info!("Queing news backfill for {:?} in {:?}.", symbols, run_delay); + sleep(run_delay).await; + } + + #[allow(clippy::too_many_lines)] + async fn backfill(&self, jobs: HashMap) { + let symbols = jobs.keys().cloned().collect::>(); + let fetch_from = jobs.values().map(|job| job.fetch_from).min().unwrap(); + let fetch_to = jobs.values().map(|job| job.fetch_to).max().unwrap(); + + info!("Backfilling news for {:?}.", symbols); + + let mut news = vec![]; + let mut last_times = symbols + .iter() + .map(|symbol| (symbol.clone(), None)) + .collect::>(); + let mut next_page_token = None; + + loop { + let Ok(message) = alpaca::api::incoming::news::get( + &self.config.alpaca_client, + &self.config.alpaca_rate_limiter, + &alpaca::api::outgoing::news::News { + symbols: symbols.clone(), + start: Some(fetch_from), + end: Some(fetch_to), + page_token: next_page_token.clone(), + sort: Some(Sort::Asc), + ..Default::default() + }, + None, + ) + .await + else { + error!("Failed to backfill news for {:?}.", symbols); + return; + }; + + for news_item in message.news { + let news_item = News::from(news_item); + + for symbol in &news_item.symbols { + if last_times.contains_key(symbol) { + last_times.insert(symbol.clone(), Some(news_item.time_created)); + } + } + + news.push(news_item); + } + + if news.len() >= *BERT_MAX_INPUTS || message.next_page_token.is_none() { + let inputs = news + .iter() + .map(|news| format!("{}\n\n{}", news.headline, news.content)) + .collect::>(); + + let predictions = + join_all(inputs.chunks(*BERT_MAX_INPUTS).map(|inputs| async move { + let sequence_classifier = self.config.sequence_classifier.lock().await; + block_in_place(|| { + sequence_classifier + .predict(inputs.iter().map(String::as_str).collect::>()) + .into_iter() + .map(|label| Prediction::try_from(label).unwrap()) + .collect::>() + }) + })) + .await + .into_iter() + .flatten(); + + news = news + .into_iter() + .zip(predictions) + .map(|(news, prediction)| News { + sentiment: prediction.sentiment, + confidence: prediction.confidence, + ..news + }) + .collect::>(); + } + + if news.len() >= database::news::BATCH_FLUSH_SIZE || message.next_page_token.is_none() { + database::news::upsert_batch( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + &news, + ) + .await + .unwrap(); + news = vec![]; + } + + if message.next_page_token.is_none() { + break; + } + next_page_token = message.next_page_token; + } + + let (backfilled, skipped): (Vec<_>, Vec<_>) = + last_times.into_iter().partition_map(|(symbol, time)| { + if let Some(time) = time { + Either::Left(Backfill { symbol, time }) + } else { + Either::Right(symbol) + } + }); + + database::backfills_news::upsert_batch( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + &backfilled, + ) + .await + .unwrap(); + + let backfilled = backfilled + .into_iter() + .map(|backfill| backfill.symbol) + .collect::>(); + + if !skipped.is_empty() { + info!("No news to backfill for {:?}.", skipped); + } + + if !backfilled.is_empty() { + info!("Backfilled news for {:?}.", backfilled); + } + } + + fn max_limit(&self) -> i64 { + alpaca::api::outgoing::news::MAX_LIMIT + } + + fn log_string(&self) -> &'static str { + "news" + } +} diff --git a/src/threads/data/websocket.rs b/src/threads/data/websocket.rs deleted file mode 100644 index 1d4b256..0000000 --- a/src/threads/data/websocket.rs +++ /dev/null @@ -1,437 +0,0 @@ -use super::ThreadType; -use crate::{ - config::Config, - database, - types::{alpaca::websocket, news::Prediction, Bar, Class, News}, -}; -use async_trait::async_trait; -use futures_util::{ - future::join_all, - stream::{SplitSink, SplitStream}, - SinkExt, StreamExt, -}; -use log::{debug, error, info}; -use serde_json::{from_str, to_string}; -use std::{collections::HashMap, sync::Arc}; -use tokio::{ - net::TcpStream, - select, spawn, - sync::{mpsc, oneshot, Mutex, RwLock}, - task::block_in_place, -}; -use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; - -pub enum Action { - Subscribe, - Unsubscribe, -} - -impl From for Option { - fn from(action: super::Action) -> Self { - match action { - super::Action::Add | super::Action::Enable => Some(Action::Subscribe), - super::Action::Remove | super::Action::Disable => Some(Action::Unsubscribe), - } - } -} - -pub struct Message { - pub action: Option, - pub symbols: Vec, - pub response: oneshot::Sender<()>, -} - -impl Message { - pub fn new(action: Option, symbols: Vec) -> (Self, oneshot::Receiver<()>) { - let (sender, receiver) = oneshot::channel(); - ( - Self { - action, - symbols, - response: sender, - }, - receiver, - ) - } -} - -pub struct Pending { - pub subscriptions: HashMap>, - pub unsubscriptions: HashMap>, -} - -#[async_trait] -pub trait Handler: Send + Sync { - fn create_subscription_message( - &self, - symbols: Vec, - ) -> websocket::data::outgoing::subscribe::Message; - async fn handle_websocket_message( - &self, - pending: Arc>, - message: websocket::data::incoming::Message, - ); -} - -pub async fn run( - handler: Arc>, - mut receiver: mpsc::Receiver, - mut websocket_stream: SplitStream>>, - websocket_sink: SplitSink>, tungstenite::Message>, -) { - let pending = Arc::new(RwLock::new(Pending { - subscriptions: HashMap::new(), - unsubscriptions: HashMap::new(), - })); - let websocket_sink = Arc::new(Mutex::new(websocket_sink)); - - loop { - select! { - Some(message) = receiver.recv() => { - spawn(handle_message( - handler.clone(), - pending.clone(), - websocket_sink.clone(), - message, - )); - } - Some(Ok(message)) = websocket_stream.next() => { - match message { - tungstenite::Message::Text(message) => { - let parsed_message = from_str::>(&message); - - if parsed_message.is_err() { - error!("Failed to deserialize websocket message: {:?}", message); - continue; - } - - for message in parsed_message.unwrap() { - let handler = handler.clone(); - let pending = pending.clone(); - spawn(async move { - handler.handle_websocket_message(pending, message).await; - }); - } - } - tungstenite::Message::Ping(_) => {} - _ => error!("Unexpected websocket message: {:?}", message), - } - } - else => panic!("Communication channel unexpectedly closed.") - } - } -} - -async fn handle_message( - handler: Arc>, - pending: Arc>, - sink: Arc>, tungstenite::Message>>>, - message: Message, -) { - if message.symbols.is_empty() { - message.response.send(()).unwrap(); - return; - } - - match message.action { - Some(Action::Subscribe) => { - let (pending_subscriptions, receivers): (Vec<_>, Vec<_>) = message - .symbols - .iter() - .map(|symbol| { - let (sender, receiver) = oneshot::channel(); - ((symbol.clone(), sender), receiver) - }) - .unzip(); - - pending - .write() - .await - .subscriptions - .extend(pending_subscriptions); - - sink.lock() - .await - .send(tungstenite::Message::Text( - to_string(&websocket::data::outgoing::Message::Subscribe( - handler.create_subscription_message(message.symbols), - )) - .unwrap(), - )) - .await - .unwrap(); - - join_all(receivers).await; - } - Some(Action::Unsubscribe) => { - let (pending_unsubscriptions, receivers): (Vec<_>, Vec<_>) = message - .symbols - .iter() - .map(|symbol| { - let (sender, receiver) = oneshot::channel(); - ((symbol.clone(), sender), receiver) - }) - .unzip(); - - pending - .write() - .await - .unsubscriptions - .extend(pending_unsubscriptions); - - sink.lock() - .await - .send(tungstenite::Message::Text( - to_string(&websocket::data::outgoing::Message::Unsubscribe( - handler.create_subscription_message(message.symbols.clone()), - )) - .unwrap(), - )) - .await - .unwrap(); - - join_all(receivers).await; - } - None => {} - } - - message.response.send(()).unwrap(); -} - -struct BarsHandler { - config: Arc, - subscription_message_constructor: - fn(Vec) -> websocket::data::outgoing::subscribe::Message, -} - -#[async_trait] -impl Handler for BarsHandler { - fn create_subscription_message( - &self, - symbols: Vec, - ) -> websocket::data::outgoing::subscribe::Message { - (self.subscription_message_constructor)(symbols) - } - - async fn handle_websocket_message( - &self, - pending: Arc>, - message: websocket::data::incoming::Message, - ) { - match message { - websocket::data::incoming::Message::Subscription(message) => { - let websocket::data::incoming::subscription::Message::Market { - bars: symbols, .. - } = message - else { - unreachable!() - }; - - let mut pending = pending.write().await; - - let newly_subscribed = pending - .subscriptions - .extract_if(|symbol, _| symbols.contains(symbol)) - .collect::>(); - - let newly_unsubscribed = pending - .unsubscriptions - .extract_if(|symbol, _| !symbols.contains(symbol)) - .collect::>(); - - drop(pending); - - if !newly_subscribed.is_empty() { - info!( - "Subscribed to bars for {:?}.", - newly_subscribed.keys().collect::>() - ); - - for sender in newly_subscribed.into_values() { - sender.send(()).unwrap(); - } - } - - if !newly_unsubscribed.is_empty() { - info!( - "Unsubscribed from bars for {:?}.", - newly_unsubscribed.keys().collect::>() - ); - - for sender in newly_unsubscribed.into_values() { - sender.send(()).unwrap(); - } - } - } - websocket::data::incoming::Message::Bar(message) - | websocket::data::incoming::Message::UpdatedBar(message) => { - let bar = Bar::from(message); - debug!("Received bar for {}: {}.", bar.symbol, bar.time); - - database::bars::upsert( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &bar, - ) - .await - .unwrap(); - } - websocket::data::incoming::Message::Status(message) => { - debug!( - "Received status message for {}: {:?}.", - message.symbol, message.status - ); - - match message.status { - websocket::data::incoming::status::Status::TradingHalt(_) - | websocket::data::incoming::status::Status::VolatilityTradingPause(_) => { - database::assets::update_status_where_symbol( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &message.symbol, - false, - ) - .await - .unwrap(); - } - websocket::data::incoming::status::Status::Resume(_) - | websocket::data::incoming::status::Status::TradingResumption(_) => { - database::assets::update_status_where_symbol( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &message.symbol, - true, - ) - .await - .unwrap(); - } - _ => {} - } - } - websocket::data::incoming::Message::Error(message) => { - error!("Received error message: {}.", message.message); - } - _ => unreachable!(), - } - } -} - -struct NewsHandler { - config: Arc, -} - -#[async_trait] -impl Handler for NewsHandler { - fn create_subscription_message( - &self, - symbols: Vec, - ) -> websocket::data::outgoing::subscribe::Message { - websocket::data::outgoing::subscribe::Message::new_news(symbols) - } - - async fn handle_websocket_message( - &self, - pending: Arc>, - message: websocket::data::incoming::Message, - ) { - match message { - websocket::data::incoming::Message::Subscription(message) => { - let websocket::data::incoming::subscription::Message::News { news: symbols } = - message - else { - unreachable!() - }; - - let mut pending = pending.write().await; - - let newly_subscribed = pending - .subscriptions - .extract_if(|symbol, _| symbols.contains(symbol)) - .collect::>(); - - let newly_unsubscribed = pending - .unsubscriptions - .extract_if(|symbol, _| !symbols.contains(symbol)) - .collect::>(); - - drop(pending); - - if !newly_subscribed.is_empty() { - info!( - "Subscribed to news for {:?}.", - newly_subscribed.keys().collect::>() - ); - - for sender in newly_subscribed.into_values() { - sender.send(()).unwrap(); - } - } - - if !newly_unsubscribed.is_empty() { - info!( - "Unsubscribed from news for {:?}.", - newly_unsubscribed.keys().collect::>() - ); - - for sender in newly_unsubscribed.into_values() { - sender.send(()).unwrap(); - } - } - } - websocket::data::incoming::Message::News(message) => { - let news = News::from(message); - - debug!( - "Received news for {:?}: {}.", - news.symbols, news.time_created - ); - - let input = format!("{}\n\n{}", news.headline, news.content); - - let sequence_classifier = self.config.sequence_classifier.lock().await; - let prediction = block_in_place(|| { - sequence_classifier - .predict(vec![input.as_str()]) - .into_iter() - .map(|label| Prediction::try_from(label).unwrap()) - .collect::>()[0] - }); - drop(sequence_classifier); - - let news = News { - sentiment: prediction.sentiment, - confidence: prediction.confidence, - ..news - }; - - database::news::upsert( - &self.config.clickhouse_client, - &self.config.clickhouse_concurrency_limiter, - &news, - ) - .await - .unwrap(); - } - websocket::data::incoming::Message::Error(message) => { - error!("Received error message: {}.", message.message); - } - _ => unreachable!(), - } - } -} - -pub fn create_handler(thread_type: ThreadType, config: Arc) -> Box { - match thread_type { - ThreadType::Bars(Class::UsEquity) => Box::new(BarsHandler { - config, - subscription_message_constructor: - websocket::data::outgoing::subscribe::Message::new_market_us_equity, - }), - ThreadType::Bars(Class::Crypto) => Box::new(BarsHandler { - config, - subscription_message_constructor: - websocket::data::outgoing::subscribe::Message::new_market_crypto, - }), - ThreadType::News => Box::new(NewsHandler { config }), - } -} diff --git a/src/threads/data/websocket/bars.rs b/src/threads/data/websocket/bars.rs new file mode 100644 index 0000000..4413467 --- /dev/null +++ b/src/threads/data/websocket/bars.rs @@ -0,0 +1,128 @@ +use super::Pending; +use crate::{ + config::Config, + database, + types::{alpaca::websocket, Bar}, +}; +use async_trait::async_trait; +use log::{debug, error, info}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; + +pub struct Handler { + pub config: Arc, + pub subscription_message_constructor: + fn(Vec) -> websocket::data::outgoing::subscribe::Message, +} + +#[async_trait] +impl super::Handler for Handler { + fn create_subscription_message( + &self, + symbols: Vec, + ) -> websocket::data::outgoing::subscribe::Message { + (self.subscription_message_constructor)(symbols) + } + + async fn handle_websocket_message( + &self, + pending: Arc>, + message: websocket::data::incoming::Message, + ) { + match message { + websocket::data::incoming::Message::Subscription(message) => { + let websocket::data::incoming::subscription::Message::Market { + bars: symbols, .. + } = message + else { + unreachable!() + }; + + let mut pending = pending.write().await; + + let newly_subscribed = pending + .subscriptions + .extract_if(|symbol, _| symbols.contains(symbol)) + .collect::>(); + + let newly_unsubscribed = pending + .unsubscriptions + .extract_if(|symbol, _| !symbols.contains(symbol)) + .collect::>(); + + drop(pending); + + if !newly_subscribed.is_empty() { + info!( + "Subscribed to bars for {:?}.", + newly_subscribed.keys().collect::>() + ); + + for sender in newly_subscribed.into_values() { + sender.send(()).unwrap(); + } + } + + if !newly_unsubscribed.is_empty() { + info!( + "Unsubscribed from bars for {:?}.", + newly_unsubscribed.keys().collect::>() + ); + + for sender in newly_unsubscribed.into_values() { + sender.send(()).unwrap(); + } + } + } + websocket::data::incoming::Message::Bar(message) + | websocket::data::incoming::Message::UpdatedBar(message) => { + let bar = Bar::from(message); + debug!("Received bar for {}: {}.", bar.symbol, bar.time); + + database::bars::upsert( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + &bar, + ) + .await + .unwrap(); + } + websocket::data::incoming::Message::Status(message) => { + debug!( + "Received status message for {}: {:?}.", + message.symbol, message.status + ); + + match message.status { + websocket::data::incoming::status::Status::TradingHalt(_) + | websocket::data::incoming::status::Status::VolatilityTradingPause(_) => { + database::assets::update_status_where_symbol( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + &message.symbol, + false, + ) + .await + .unwrap(); + } + websocket::data::incoming::status::Status::Resume(_) + | websocket::data::incoming::status::Status::TradingResumption(_) => { + database::assets::update_status_where_symbol( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + &message.symbol, + true, + ) + .await + .unwrap(); + } + _ => {} + } + } + websocket::data::incoming::Message::Error(message) => { + error!("Received error message: {}.", message.message); + } + _ => unreachable!(), + } + } +} diff --git a/src/threads/data/websocket/mod.rs b/src/threads/data/websocket/mod.rs new file mode 100644 index 0000000..5181a0c --- /dev/null +++ b/src/threads/data/websocket/mod.rs @@ -0,0 +1,216 @@ +mod bars; +mod news; + +use super::ThreadType; +use crate::{ + config::Config, + types::{alpaca::websocket, Class}, +}; +use async_trait::async_trait; +use futures_util::{ + future::join_all, + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use log::error; +use serde_json::{from_str, to_string}; +use std::{collections::HashMap, sync::Arc}; +use tokio::{ + net::TcpStream, + select, spawn, + sync::{mpsc, oneshot, Mutex, RwLock}, +}; +use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; + +pub enum Action { + Subscribe, + Unsubscribe, +} + +impl From for Option { + fn from(action: super::Action) -> Self { + match action { + super::Action::Add | super::Action::Enable => Some(Action::Subscribe), + super::Action::Remove | super::Action::Disable => Some(Action::Unsubscribe), + } + } +} + +pub struct Message { + pub action: Option, + pub symbols: Vec, + pub response: oneshot::Sender<()>, +} + +impl Message { + pub fn new(action: Option, symbols: Vec) -> (Self, oneshot::Receiver<()>) { + let (sender, receiver) = oneshot::channel(); + ( + Self { + action, + symbols, + response: sender, + }, + receiver, + ) + } +} + +pub struct Pending { + pub subscriptions: HashMap>, + pub unsubscriptions: HashMap>, +} + +#[async_trait] +pub trait Handler: Send + Sync { + fn create_subscription_message( + &self, + symbols: Vec, + ) -> websocket::data::outgoing::subscribe::Message; + async fn handle_websocket_message( + &self, + pending: Arc>, + message: websocket::data::incoming::Message, + ); +} + +pub async fn run( + handler: Arc>, + mut receiver: mpsc::Receiver, + mut websocket_stream: SplitStream>>, + websocket_sink: SplitSink>, tungstenite::Message>, +) { + let pending = Arc::new(RwLock::new(Pending { + subscriptions: HashMap::new(), + unsubscriptions: HashMap::new(), + })); + let websocket_sink = Arc::new(Mutex::new(websocket_sink)); + + loop { + select! { + Some(message) = receiver.recv() => { + spawn(handle_message( + handler.clone(), + pending.clone(), + websocket_sink.clone(), + message, + )); + } + Some(Ok(message)) = websocket_stream.next() => { + match message { + tungstenite::Message::Text(message) => { + let parsed_message = from_str::>(&message); + + if parsed_message.is_err() { + error!("Failed to deserialize websocket message: {:?}", message); + continue; + } + + for message in parsed_message.unwrap() { + let handler = handler.clone(); + let pending = pending.clone(); + spawn(async move { + handler.handle_websocket_message(pending, message).await; + }); + } + } + tungstenite::Message::Ping(_) => {} + _ => error!("Unexpected websocket message: {:?}", message), + } + } + else => panic!("Communication channel unexpectedly closed.") + } + } +} + +async fn handle_message( + handler: Arc>, + pending: Arc>, + sink: Arc>, tungstenite::Message>>>, + message: Message, +) { + if message.symbols.is_empty() { + message.response.send(()).unwrap(); + return; + } + + match message.action { + Some(Action::Subscribe) => { + let (pending_subscriptions, receivers): (Vec<_>, Vec<_>) = message + .symbols + .iter() + .map(|symbol| { + let (sender, receiver) = oneshot::channel(); + ((symbol.clone(), sender), receiver) + }) + .unzip(); + + pending + .write() + .await + .subscriptions + .extend(pending_subscriptions); + + sink.lock() + .await + .send(tungstenite::Message::Text( + to_string(&websocket::data::outgoing::Message::Subscribe( + handler.create_subscription_message(message.symbols), + )) + .unwrap(), + )) + .await + .unwrap(); + + join_all(receivers).await; + } + Some(Action::Unsubscribe) => { + let (pending_unsubscriptions, receivers): (Vec<_>, Vec<_>) = message + .symbols + .iter() + .map(|symbol| { + let (sender, receiver) = oneshot::channel(); + ((symbol.clone(), sender), receiver) + }) + .unzip(); + + pending + .write() + .await + .unsubscriptions + .extend(pending_unsubscriptions); + + sink.lock() + .await + .send(tungstenite::Message::Text( + to_string(&websocket::data::outgoing::Message::Unsubscribe( + handler.create_subscription_message(message.symbols.clone()), + )) + .unwrap(), + )) + .await + .unwrap(); + + join_all(receivers).await; + } + None => {} + } + + message.response.send(()).unwrap(); +} + +pub fn create_handler(thread_type: ThreadType, config: Arc) -> Box { + match thread_type { + ThreadType::Bars(Class::UsEquity) => Box::new(bars::Handler { + config, + subscription_message_constructor: + websocket::data::outgoing::subscribe::Message::new_market_us_equity, + }), + ThreadType::Bars(Class::Crypto) => Box::new(bars::Handler { + config, + subscription_message_constructor: + websocket::data::outgoing::subscribe::Message::new_market_crypto, + }), + ThreadType::News => Box::new(news::Handler { config }), + } +} diff --git a/src/threads/data/websocket/news.rs b/src/threads/data/websocket/news.rs new file mode 100644 index 0000000..7512614 --- /dev/null +++ b/src/threads/data/websocket/news.rs @@ -0,0 +1,114 @@ +use super::Pending; +use crate::{ + config::Config, + database, + types::{alpaca::websocket, news::Prediction, News}, +}; +use async_trait::async_trait; +use log::{debug, error, info}; +use std::{collections::HashMap, sync::Arc}; +use tokio::{sync::RwLock, task::block_in_place}; + +pub struct Handler { + pub config: Arc, +} + +#[async_trait] +impl super::Handler for Handler { + fn create_subscription_message( + &self, + symbols: Vec, + ) -> websocket::data::outgoing::subscribe::Message { + websocket::data::outgoing::subscribe::Message::new_news(symbols) + } + + async fn handle_websocket_message( + &self, + pending: Arc>, + message: websocket::data::incoming::Message, + ) { + match message { + websocket::data::incoming::Message::Subscription(message) => { + let websocket::data::incoming::subscription::Message::News { news: symbols } = + message + else { + unreachable!() + }; + + let mut pending = pending.write().await; + + let newly_subscribed = pending + .subscriptions + .extract_if(|symbol, _| symbols.contains(symbol)) + .collect::>(); + + let newly_unsubscribed = pending + .unsubscriptions + .extract_if(|symbol, _| !symbols.contains(symbol)) + .collect::>(); + + drop(pending); + + if !newly_subscribed.is_empty() { + info!( + "Subscribed to news for {:?}.", + newly_subscribed.keys().collect::>() + ); + + for sender in newly_subscribed.into_values() { + sender.send(()).unwrap(); + } + } + + if !newly_unsubscribed.is_empty() { + info!( + "Unsubscribed from news for {:?}.", + newly_unsubscribed.keys().collect::>() + ); + + for sender in newly_unsubscribed.into_values() { + sender.send(()).unwrap(); + } + } + } + websocket::data::incoming::Message::News(message) => { + let news = News::from(message); + + debug!( + "Received news for {:?}: {}.", + news.symbols, news.time_created + ); + + let input = format!("{}\n\n{}", news.headline, news.content); + + let sequence_classifier = self.config.sequence_classifier.lock().await; + let prediction = block_in_place(|| { + sequence_classifier + .predict(vec![input.as_str()]) + .into_iter() + .map(|label| Prediction::try_from(label).unwrap()) + .collect::>()[0] + }); + drop(sequence_classifier); + + let news = News { + sentiment: prediction.sentiment, + confidence: prediction.confidence, + ..news + }; + + database::news::upsert( + &self.config.clickhouse_client, + &self.config.clickhouse_concurrency_limiter, + &news, + ) + .await + .unwrap(); + } + websocket::data::incoming::Message::Error(message) => { + error!("Received error message: {}.", message.message); + } + _ => unreachable!(), + } + } +}