diff --git a/backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json b/backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json index 5fd5872..f24ff16 100644 --- a/backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json +++ b/backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json @@ -17,8 +17,7 @@ "kind": { "Enum": [ "us_equity", - "crypto", - "unknown" + "crypto" ] } } @@ -39,7 +38,7 @@ "NYSE", "NYSEARCA", "OTC", - "unknown" + "CRYPTO" ] } } diff --git a/backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json b/backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json index a53e391..d234a02 100644 --- a/backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json +++ b/backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json @@ -17,8 +17,7 @@ "kind": { "Enum": [ "us_equity", - "crypto", - "unknown" + "crypto" ] } } @@ -39,7 +38,7 @@ "NYSE", "NYSEARCA", "OTC", - "unknown" + "CRYPTO" ] } } diff --git a/backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json b/backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json index ef34ae4..a6d4765 100644 --- a/backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json +++ b/backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json @@ -17,8 +17,7 @@ "kind": { "Enum": [ "us_equity", - "crypto", - "unknown" + "crypto" ] } } @@ -39,7 +38,7 @@ "NYSE", "NYSEARCA", "OTC", - "unknown" + "CRYPTO" ] } } diff --git a/backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json b/backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json index 5195d90..985456a 100644 --- a/backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json +++ b/backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json @@ -17,8 +17,7 @@ "kind": { "Enum": [ "us_equity", - "crypto", - "unknown" + "crypto" ] } } @@ -39,7 +38,7 @@ "NYSE", "NYSEARCA", "OTC", - "unknown" + "CRYPTO" ] } } @@ -65,8 +64,7 @@ "kind": { "Enum": [ "us_equity", - "crypto", - "unknown" + "crypto" ] } } @@ -83,7 +81,7 @@ "NYSE", "NYSEARCA", "OTC", - "unknown" + "CRYPTO" ] } } diff --git a/backend/.sqlx/query-e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d.json b/backend/.sqlx/query-b940befc2fbef48069c41f18485a2b6b3e523ee3106af735235701a5a151a29f.json similarity index 63% rename from backend/.sqlx/query-e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d.json rename to backend/.sqlx/query-b940befc2fbef48069c41f18485a2b6b3e523ee3106af735235701a5a151a29f.json index 7d233f0..e8f260a 100644 --- a/backend/.sqlx/query-e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d.json +++ b/backend/.sqlx/query-b940befc2fbef48069c41f18485a2b6b3e523ee3106af735235701a5a151a29f.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume)\n SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[])\n RETURNING timestamp, asset_symbol, open, high, low, close, volume", + "query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted)\n SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[])\n RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted", "describe": { "columns": [ { @@ -37,6 +37,16 @@ "ordinal": 6, "name": "volume", "type_info": "Float8" + }, + { + "ordinal": 7, + "name": "num_trades", + "type_info": "Int8" + }, + { + "ordinal": 8, + "name": "volume_weighted", + "type_info": "Float8" } ], "parameters": { @@ -47,6 +57,8 @@ "Float8Array", "Float8Array", "Float8Array", + "Float8Array", + "Int8Array", "Float8Array" ] }, @@ -57,8 +69,10 @@ false, false, false, + false, + false, false ] }, - "hash": "e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d" + "hash": "b940befc2fbef48069c41f18485a2b6b3e523ee3106af735235701a5a151a29f" } diff --git a/backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json b/backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json index 174372c..8a64b8b 100644 --- a/backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json +++ b/backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json @@ -17,8 +17,7 @@ "kind": { "Enum": [ "us_equity", - "crypto", - "unknown" + "crypto" ] } } @@ -39,7 +38,7 @@ "NYSE", "NYSEARCA", "OTC", - "unknown" + "CRYPTO" ] } } diff --git a/backend/.sqlx/query-826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e.json b/backend/.sqlx/query-d1e9b79a4bb2651b4dde42770576a2776f5881039c8f17c04747770a5bf97214.json similarity index 77% rename from backend/.sqlx/query-826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e.json rename to backend/.sqlx/query-d1e9b79a4bb2651b4dde42770576a2776f5881039c8f17c04747770a5bf97214.json index 4e8f3f3..85365cd 100644 --- a/backend/.sqlx/query-826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e.json +++ b/backend/.sqlx/query-d1e9b79a4bb2651b4dde42770576a2776f5881039c8f17c04747770a5bf97214.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = 'crypto'", + "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = $1::CLASS", "describe": { "columns": [ { @@ -17,8 +17,7 @@ "kind": { "Enum": [ "us_equity", - "crypto", - "unknown" + "crypto" ] } } @@ -39,7 +38,7 @@ "NYSE", "NYSEARCA", "OTC", - "unknown" + "CRYPTO" ] } } @@ -57,7 +56,19 @@ } ], "parameters": { - "Left": [] + "Left": [ + { + "Custom": { + "name": "class", + "kind": { + "Enum": [ + "us_equity", + "crypto" + ] + } + } + } + ] }, "nullable": [ false, @@ -67,5 +78,5 @@ false ] }, - "hash": "826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e" + "hash": "d1e9b79a4bb2651b4dde42770576a2776f5881039c8f17c04747770a5bf97214" } diff --git a/backend/.sqlx/query-e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad.json b/backend/.sqlx/query-ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c.json similarity index 61% rename from backend/.sqlx/query-e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad.json rename to backend/.sqlx/query-ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c.json index 67eb4b4..9c45d27 100644 --- a/backend/.sqlx/query-e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad.json +++ b/backend/.sqlx/query-ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume) VALUES ($1, $2, $3, $4, $5, $6, $7)\n RETURNING timestamp, asset_symbol, open, high, low, close, volume", + "query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)\n ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9\n RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted", "describe": { "columns": [ { @@ -37,6 +37,16 @@ "ordinal": 6, "name": "volume", "type_info": "Float8" + }, + { + "ordinal": 7, + "name": "num_trades", + "type_info": "Int8" + }, + { + "ordinal": 8, + "name": "volume_weighted", + "type_info": "Float8" } ], "parameters": { @@ -47,6 +57,8 @@ "Float8", "Float8", "Float8", + "Float8", + "Int8", "Float8" ] }, @@ -57,8 +69,10 @@ false, false, false, + false, + false, false ] }, - "hash": "e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad" + "hash": "ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c" } diff --git a/backend/.sqlx/query-f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b.json b/backend/.sqlx/query-f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b.json deleted file mode 100644 index 3947cbb..0000000 --- a/backend/.sqlx/query-f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b.json +++ /dev/null @@ -1,71 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = 'us_equity'", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "symbol", - "type_info": "Text" - }, - { - "ordinal": 1, - "name": "class: Class", - "type_info": { - "Custom": { - "name": "class", - "kind": { - "Enum": [ - "us_equity", - "crypto", - "unknown" - ] - } - } - } - }, - { - "ordinal": 2, - "name": "exchange: Exchange", - "type_info": { - "Custom": { - "name": "exchange", - "kind": { - "Enum": [ - "AMEX", - "ARCA", - "BATS", - "NASDAQ", - "NYSE", - "NYSEARCA", - "OTC", - "unknown" - ] - } - } - } - }, - { - "ordinal": 3, - "name": "trading", - "type_info": "Bool" - }, - { - "ordinal": 4, - "name": "date_added", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b" -} diff --git a/backend/Cargo.lock b/backend/Cargo.lock index c84aea2..e8a8032 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -56,54 +56,12 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" -[[package]] -name = "apca" -version = "0.27.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bed93cbc521cf474aafc4ae672130d86662117f29d4f74eed7d9a8851502256c" -dependencies = [ - "async-compression", - "async-trait", - "chrono", - "futures", - "http", - "http-endpoint", - "hyper", - "hyper-tls", - "num-decimal", - "serde", - "serde_json", - "serde_urlencoded", - "serde_variant", - "thiserror", - "tokio", - "tokio-tungstenite", - "tracing", - "tracing-futures", - "url", - "uuid", - "websocket-util", -] - [[package]] name = "arc-swap" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" -[[package]] -name = "async-compression" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d495b6dc0184693324491a5ac05f559acc97bf937ab31d7a1c33dd0016be6d2b" -dependencies = [ - "flate2", - "futures-core", - "futures-io", - "memchr", - "pin-project-lite", -] - [[package]] name = "async-trait" version = "0.1.73" @@ -183,21 +141,19 @@ dependencies = [ name = "backend" version = "0.1.0" dependencies = [ - "apca", - "async-trait", "axum", - "deadpool", "dotenv", - "futures", "futures-util", + "http", "log", "log4rs", + "reqwest", "serde", "serde_json", "sqlx", "time 0.3.28", "tokio", - "websocket-util", + "tokio-tungstenite", ] [[package]] @@ -215,12 +171,6 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "base64" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" - [[package]] name = "base64" version = "0.21.3" @@ -300,7 +250,6 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", - "serde", "time 0.1.45", "wasm-bindgen", "windows-targets", @@ -352,15 +301,6 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" -[[package]] -name = "crc32fast" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" -dependencies = [ - "cfg-if", -] - [[package]] name = "crossbeam-queue" version = "0.3.8" @@ -391,26 +331,10 @@ dependencies = [ ] [[package]] -name = "deadpool" -version = "0.9.5" +name = "data-encoding" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e" -dependencies = [ - "async-trait", - "deadpool-runtime", - "num_cpus", - "retain_mut", - "tokio", -] - -[[package]] -name = "deadpool-runtime" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1" -dependencies = [ - "tokio", -] +checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "der" @@ -482,6 +406,15 @@ dependencies = [ "serde", ] +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -532,16 +465,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" -[[package]] -name = "flate2" -version = "1.0.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" -dependencies = [ - "crc32fast", - "miniz_oxide", -] - [[package]] name = "flume" version = "0.10.14" @@ -584,21 +507,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "futures" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.28" @@ -672,7 +580,6 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ - "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -711,6 +618,25 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +[[package]] +name = "h2" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap 1.9.3", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -806,15 +732,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-endpoint" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5224352a86c0e121f1bf26d1c2f87a344d1978ba4bb94798c831c89f0d427a26" -dependencies = [ - "http", -] - [[package]] name = "httparse" version = "1.8.0" @@ -843,6 +760,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", + "h2", "http", "http-body", "httparse", @@ -922,6 +840,12 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "ipnet" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" + [[package]] name = "itertools" version = "0.10.5" @@ -1122,17 +1046,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "num-bigint" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -1150,18 +1063,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "num-decimal" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8783636b20810a87540f59d19858498a987d7fcdc6555e62f2c99d6ca8a84b61" -dependencies = [ - "num-bigint", - "num-rational", - "num-traits", - "serde", -] - [[package]] name = "num-integer" version = "0.1.45" @@ -1183,18 +1084,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-rational" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" -dependencies = [ - "autocfg", - "num-bigint", - "num-integer", - "num-traits", -] - [[package]] name = "num-traits" version = "0.2.16" @@ -1459,10 +1348,41 @@ dependencies = [ ] [[package]] -name = "retain_mut" -version = "0.1.9" +name = "reqwest" +version = "0.11.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" +checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] [[package]] name = "rsa" @@ -1618,15 +1538,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_variant" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47a8ec0b2fd0506290348d9699c0e3eb2e3e8c0498b5a9a6158b3bd4d6970076" -dependencies = [ - "serde", -] - [[package]] name = "serde_yaml" version = "0.8.26" @@ -1843,7 +1754,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ca69bf415b93b60b80dc8fda3cb4ef52b2336614d8da2de5456cc942a110482" dependencies = [ "atoi", - "base64 0.21.3", + "base64", "bitflags 2.4.0", "byteorder", "bytes", @@ -1887,7 +1798,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0db2df1b8731c3651e204629dd55e52adbae0462fa1bdcbed56a2302c18181e" dependencies = [ "atoi", - "base64 0.21.3", + "base64", "bitflags 2.4.0", "byteorder", "crc", @@ -2138,9 +2049,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.18.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" +checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2" dependencies = [ "futures-util", "log", @@ -2150,6 +2061,20 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "tokio-util" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -2211,16 +2136,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "try-lock" version = "0.2.4" @@ -2229,13 +2144,13 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "tungstenite" -version = "0.18.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" +checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649" dependencies = [ - "base64 0.13.1", "byteorder", "bytes", + "data-encoding", "http", "httparse", "log", @@ -2326,9 +2241,6 @@ name = "uuid" version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" -dependencies = [ - "serde", -] [[package]] name = "vcpkg" @@ -2388,6 +2300,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.87" @@ -2418,15 +2342,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" [[package]] -name = "websocket-util" -version = "0.11.2" +name = "web-sys" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ad8d6da9976a197513a4bf34c12b21095cba617dce356cd9e9616ccc5afe0d9" +checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" dependencies = [ - "futures", - "tokio", - "tokio-tungstenite", - "tracing", + "js-sys", + "wasm-bindgen", ] [[package]] @@ -2532,6 +2454,16 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys", +] + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 90ebad6..7af11fe 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -12,7 +12,6 @@ codegen-units = 1 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -apca = "0.27.2" axum = "0.6.20" dotenv = "0.15.0" sqlx = { version = "0.7.1", features = [ @@ -25,9 +24,6 @@ tokio = { version = "1.32.0", features = [ "macros", "rt-multi-thread", ] } -deadpool = { version = "0.9.5", features = [ - "rt_tokio_1", -] } serde = "1.0.188" log = "0.4.20" serde_json = "1.0.105" @@ -35,7 +31,7 @@ log4rs = "1.2.0" time = { version = "0.3.27", features = [ "serde", ] } -futures = "0.3.28" -websocket-util = "0.11.2" futures-util = "0.3.28" -async-trait = "0.1.73" +reqwest = { version = "0.11.20", features = ["json", "serde_json"] } +tokio-tungstenite = { version = "0.20.0", features = ["tokio-native-tls", "native-tls"] } +http = "0.2.9" diff --git a/backend/src/config.rs b/backend/src/config.rs new file mode 100644 index 0000000..e3ef0e3 --- /dev/null +++ b/backend/src/config.rs @@ -0,0 +1,30 @@ +use reqwest::Client; +use sqlx::{postgres::PgPoolOptions, PgPool}; +use std::{env, sync::Arc}; + +pub struct AppConfig { + pub alpaca_api_key: String, + pub alpaca_api_secret: String, + pub postgres_pool: PgPool, + pub reqwest_client: Client, +} + +const NUM_CLIENTS: usize = 10; + +impl AppConfig { + pub async fn from_env() -> Result> { + Ok(AppConfig { + alpaca_api_key: env::var("APCA_API_KEY_ID").unwrap(), + alpaca_api_secret: env::var("APCA_API_SECRET_KEY").unwrap(), + postgres_pool: PgPoolOptions::new() + .max_connections(NUM_CLIENTS as u32) + .connect(&env::var("DATABASE_URL")?) + .await?, + reqwest_client: Client::new(), + }) + } + + pub async fn arc_from_env() -> Result, Box> { + Ok(Arc::new(AppConfig::from_env().await?)) + } +} diff --git a/backend/src/data/live.rs b/backend/src/data/live.rs new file mode 100644 index 0000000..429a8ac --- /dev/null +++ b/backend/src/data/live.rs @@ -0,0 +1,168 @@ +use crate::{ + config::AppConfig, + database::{assets::get_assets_with_class, bars::add_bar}, + types::{ + websocket::{ + AuthMessage, IncomingMessage, OutgoingMessage, SubscribeMessage, SuccessMessage, + SuccessMessageType, + }, + AssetBroadcastMessage, Bar, Class, + }, +}; +use core::panic; +use futures_util::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use log::{debug, error, info, warn}; +use serde_json::{from_str, to_string}; +use std::{error::Error, sync::Arc, time::Duration}; +use tokio::{ + net::TcpStream, + spawn, + sync::{broadcast::Receiver, RwLock}, + time::timeout, +}; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2/iex"; +const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; +const TIMEOUT_DURATION: Duration = Duration::from_millis(100); + +pub async fn run_data_live( + class: Class, + app_config: Arc, + asset_broadcast_receiver: Receiver, +) -> Result<(), Box> { + let websocket_url = match class { + Class::UsEquity => ALPACA_STOCK_WEBSOCKET_URL, + Class::Crypto => ALPACA_CRYPTO_WEBSOCKET_URL, + }; + + let (stream, _) = connect_async(websocket_url).await?; + let (mut sink, mut stream) = stream.split(); + + match stream.next().await { + Some(Ok(Message::Text(data))) + if from_str::>(&data)?.get(0) + == Some(&IncomingMessage::Success(SuccessMessage { + msg: SuccessMessageType::Connected, + })) => {} + _ => panic!(), + } + + sink.send(Message::Text(to_string(&OutgoingMessage::Auth( + AuthMessage::new( + app_config.alpaca_api_key.clone(), + app_config.alpaca_api_secret.clone(), + ), + ))?)) + .await?; + + match stream.next().await { + Some(Ok(Message::Text(data))) + if from_str::>(&data)?.get(0) + == Some(&IncomingMessage::Success(SuccessMessage { + msg: SuccessMessageType::Authenticated, + })) => {} + _ => panic!(), + } + + let symbols = get_assets_with_class(&app_config.postgres_pool, class.clone()) + .await? + .into_iter() + .map(|asset| asset.symbol) + .collect::>(); + + if !symbols.is_empty() { + sink.send(Message::Text(to_string(&OutgoingMessage::Subscribe( + SubscribeMessage::from_vec(symbols), + ))?)) + .await?; + } + + let sink = Arc::new(RwLock::new(sink)); + let stream = Arc::new(RwLock::new(stream)); + + info!("Running live data thread for {:?}.", class); + + spawn(broadcast_handler( + class, + asset_broadcast_receiver, + sink.clone(), + )); + + websocket_handler(app_config, sink, stream).await?; + + unreachable!() +} + +pub async fn websocket_handler( + app_config: Arc, + sink: Arc>, Message>>>, + stream: Arc>>>>, +) -> Result<(), Box> { + loop { + let mut stream = stream.write().await; + + match timeout(TIMEOUT_DURATION, stream.next()).await { + Ok(Some(Ok(Message::Text(data)))) => match from_str::>(&data) { + Ok(parsed_data) => { + for message in parsed_data { + match message { + IncomingMessage::Subscription(subscription_message) => { + info!("Current subscriptions: {:?}", subscription_message.bars); + } + IncomingMessage::Bars(bar_message) + | IncomingMessage::UpdatedBars(bar_message) => { + debug!("Incoming bar: {:?}", bar_message); + add_bar(&app_config.postgres_pool, Bar::from(bar_message)).await?; + } + message => { + warn!("Unhandled incoming message: {:?}", message); + } + } + } + } + Err(e) => { + warn!("Unparsed incoming message: {:?}: {}", data, e); + } + }, + Ok(Some(Ok(Message::Ping(_)))) => { + sink.write().await.send(Message::Pong(vec![])).await? + } + Ok(unknown) => { + error!("Unknown incoming message: {:?}", unknown); + } + Err(_) => {} + } + } +} + +pub async fn broadcast_handler( + class: Class, + mut asset_broadcast_receiver: Receiver, + sink: Arc>, Message>>>, +) -> Result<(), Box> { + loop { + match asset_broadcast_receiver.recv().await? { + AssetBroadcastMessage::Added(asset) if asset.class == class => { + sink.write() + .await + .send(Message::Text(serde_json::to_string( + &OutgoingMessage::Subscribe(SubscribeMessage::new(asset.symbol)), + )?)) + .await?; + } + AssetBroadcastMessage::Deleted(asset) if asset.class == class => { + sink.write() + .await + .send(Message::Text(serde_json::to_string( + &OutgoingMessage::Unsubscribe(SubscribeMessage::new(asset.symbol)), + )?)) + .await?; + } + _ => {} + } + } +} diff --git a/backend/src/data/live/crypto.rs b/backend/src/data/live/crypto.rs deleted file mode 100644 index f72eb06..0000000 --- a/backend/src/data/live/crypto.rs +++ /dev/null @@ -1,57 +0,0 @@ -use super::{AssetMPSC, StockStreamSubscription}; -use crate::{ - database::assets::get_assets_crypto, - pool::{alpaca::create_alpaca_client_from_env, postgres::PostgresPool}, -}; -use apca::data::v2::stream::{ - drive, Bar, CustomUrl, MarketData, Quote, RealtimeData, SymbolList, Symbols, Trade, IEX, -}; -use futures_util::FutureExt; -use std::{error::Error, sync::Arc}; -use tokio::sync::{mpsc, Mutex}; - -#[derive(Default)] -pub struct CryptoUrl; - -impl ToString for CryptoUrl { - fn to_string(&self) -> String { - "wss://stream.data.alpaca.markets/v1beta3/crypto/us".into() - } -} - -pub type Crypto = CustomUrl; - -pub async fn init_stream_subscription_mpsc( - postgres_pool: &PostgresPool, -) -> Result<(Arc>>, AssetMPSC), Box> { - let client = create_alpaca_client_from_env().await?; - - let (mut stream, mut subscription) = client - .subscribe::>() - .await?; - - let symbols = get_assets_crypto(postgres_pool) - .await? - .iter() - .map(|asset| asset.symbol.clone()) - .collect::>(); - - if !symbols.is_empty() { - let data = MarketData { - bars: Symbols::List(SymbolList::from(symbols)), - ..Default::default() - }; - - drive(subscription.subscribe(&data).boxed(), &mut stream) - .await - .unwrap() - .unwrap() - .unwrap(); - } - - let stream_subscription_mutex = Arc::new(Mutex::new((stream, subscription))); - let (sender, receiver) = mpsc::channel(50); - let asset_mpcs = AssetMPSC { sender, receiver }; - - Ok((stream_subscription_mutex, asset_mpcs)) -} diff --git a/backend/src/data/live/mod.rs b/backend/src/data/live/mod.rs deleted file mode 100644 index 4958c39..0000000 --- a/backend/src/data/live/mod.rs +++ /dev/null @@ -1,133 +0,0 @@ -pub mod crypto; -pub mod stocks; - -use crate::{database::bars::add_bar, pool::postgres::PostgresPool, types::Asset}; -use apca::{ - data::v2::stream::{drive, Data, MarketData, RealtimeData, Source, SymbolList, Symbols}, - Subscribable, -}; -use futures_util::{FutureExt, StreamExt}; -use log::{debug, error, info, warn}; -use std::{any::type_name, error::Error, sync::Arc, time::Duration}; -use time::OffsetDateTime; -use tokio::{ - spawn, - sync::{ - mpsc::{Receiver, Sender}, - Mutex, - }, - time::timeout, -}; - -pub enum AssetMPSCMessage { - Added(Asset), - Removed(Asset), -} - -pub struct AssetMPSC { - pub sender: Sender, - pub receiver: Receiver, -} - -pub type StockStreamSubscription = ( - as Subscribable>::Stream, - as Subscribable>::Subscription, -); - -pub const TIMEOUT_DURATION: Duration = Duration::from_millis(100); - -pub async fn run_data_live( - postgres_pool: PostgresPool, - stream_subscription_mutex: Arc>>, - asset_mpsc_receiver: Receiver, -) -> Result<(), Box> -where - S: Source + 'static, -{ - info!("Running live data thread for {}.", type_name::()); - - spawn(mpsc_handler::( - stream_subscription_mutex.clone(), - asset_mpsc_receiver, - )); - - loop { - let (stream, _) = &mut *stream_subscription_mutex.lock().await; - match timeout(TIMEOUT_DURATION, stream.next()).await { - Ok(Some(Ok(Ok(Data::Bar(bar))))) => { - let bar = add_bar( - &postgres_pool, - crate::types::Bar { - timestamp: match OffsetDateTime::from_unix_timestamp( - bar.timestamp.timestamp(), - ) { - Ok(timestamp) => timestamp, - Err(_) => { - warn!( - "Failed to parse timestamp for {}: {}.", - bar.symbol, bar.timestamp - ); - continue; - } - }, - asset_symbol: bar.symbol, - open: bar.open_price.to_f64().unwrap_or_default(), - high: bar.high_price.to_f64().unwrap_or_default(), - low: bar.low_price.to_f64().unwrap_or_default(), - close: bar.close_price.to_f64().unwrap_or_default(), - volume: bar.volume.to_f64().unwrap_or_default(), - }, - ) - .await?; - debug!( - "Saved timestamp for {}: {}.", - bar.asset_symbol, bar.timestamp - ); - } - Ok(Some(Ok(Ok(_)))) | Ok(Some(Ok(Err(_)))) | Err(_) => continue, - _ => panic!(), - } - } -} - -pub async fn mpsc_handler( - stream_subscription_mutex: Arc>>, - mut asset_mpsc_receiver: Receiver, -) -> Result<(), Box> { - while let Some(message) = asset_mpsc_receiver.recv().await { - let (stream, subscription) = &mut *stream_subscription_mutex.lock().await; - - match message { - AssetMPSCMessage::Added(asset) => { - let data = MarketData { - bars: Symbols::List(SymbolList::from(vec![asset.symbol.clone()])), - ..Default::default() - }; - - match drive(subscription.subscribe(&data).boxed(), stream).await { - Ok(_) => info!("Successfully subscribed to {}", asset.symbol), - Err(e) => { - error!("Failed to subscribe to {}: {:?}", asset.symbol, e); - continue; - } - } - } - AssetMPSCMessage::Removed(asset) => { - let data = MarketData { - bars: Symbols::List(SymbolList::from(vec![asset.symbol.clone()])), - ..Default::default() - }; - - match drive(subscription.unsubscribe(&data).boxed(), stream).await { - Ok(_) => info!("Successfully unsubscribed from {}", asset.symbol), - Err(e) => { - error!("Failed to unsubscribe from {}: {:?}", asset.symbol, e); - continue; - } - } - } - } - } - - Ok(()) -} diff --git a/backend/src/data/live/stocks.rs b/backend/src/data/live/stocks.rs deleted file mode 100644 index dae6891..0000000 --- a/backend/src/data/live/stocks.rs +++ /dev/null @@ -1,46 +0,0 @@ -use super::{AssetMPSC, StockStreamSubscription}; -use crate::{ - database::assets::get_assets_stocks, - pool::{alpaca::create_alpaca_client_from_env, postgres::PostgresPool}, -}; -use apca::data::v2::stream::{ - drive, Bar, MarketData, Quote, RealtimeData, SymbolList, Symbols, Trade, IEX, -}; -use futures_util::FutureExt; -use std::{error::Error, sync::Arc}; -use tokio::sync::{mpsc, Mutex}; - -pub async fn init_stream_subscription_mpsc( - postgres_pool: &PostgresPool, -) -> Result<(Arc>>, AssetMPSC), Box> { - let client = create_alpaca_client_from_env().await?; - - let (mut stream, mut subscription) = client - .subscribe::>() - .await?; - - let symbols = get_assets_stocks(postgres_pool) - .await? - .iter() - .map(|asset| asset.symbol.clone()) - .collect::>(); - - if !symbols.is_empty() { - let data = MarketData { - bars: Symbols::List(SymbolList::from(symbols)), - ..Default::default() - }; - - drive(subscription.subscribe(&data).boxed(), &mut stream) - .await - .unwrap() - .unwrap() - .unwrap(); - } - - let stream_subscription_mutex = Arc::new(Mutex::new((stream, subscription))); - let (sender, receiver) = mpsc::channel(50); - let asset_mpcs = AssetMPSC { sender, receiver }; - - Ok((stream_subscription_mutex, asset_mpcs)) -} diff --git a/backend/src/database/assets.rs b/backend/src/database/assets.rs index 3da257a..ec51015 100644 --- a/backend/src/database/assets.rs +++ b/backend/src/database/assets.rs @@ -1,12 +1,9 @@ -use crate::{ - pool::postgres::PostgresPool, - types::{Asset, Class, Exchange}, -}; -use sqlx::query_as; +use crate::types::{Asset, Class, Exchange}; +use sqlx::{query_as, PgPool}; use std::error::Error; pub async fn get_assets( - postgres_pool: &PostgresPool, + postgres_pool: &PgPool, ) -> Result, Box> { query_as!( Asset, @@ -17,24 +14,13 @@ pub async fn get_assets( .map_err(|e| e.into()) } -pub async fn get_assets_stocks( - postgres_pool: &PostgresPool, +pub async fn get_assets_with_class( + postgres_pool: &PgPool, + class: Class, ) -> Result, Box> { query_as!( Asset, - r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = 'us_equity'"# - ) - .fetch_all(postgres_pool) - .await - .map_err(|e| e.into()) -} - -pub async fn get_assets_crypto( - postgres_pool: &PostgresPool, -) -> Result, Box> { - query_as!( - Asset, - r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = 'crypto'"# + r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = $1::CLASS"#, class as Class ) .fetch_all(postgres_pool) .await @@ -42,7 +28,7 @@ pub async fn get_assets_crypto( } pub async fn get_asset( - postgres_pool: &PostgresPool, + postgres_pool: &PgPool, symbol: &str, ) -> Result, Box> { query_as!( @@ -55,7 +41,7 @@ pub async fn get_asset( } pub async fn add_asset( - postgres_pool: &PostgresPool, + postgres_pool: &PgPool, asset: Asset, ) -> Result> { query_as!( @@ -70,7 +56,7 @@ pub async fn add_asset( } pub async fn update_asset_trading( - postgres_pool: &PostgresPool, + postgres_pool: &PgPool, symbol: &str, trading: bool, ) -> Result, Box> { @@ -86,7 +72,7 @@ pub async fn update_asset_trading( } pub async fn delete_asset( - postgres_pool: &PostgresPool, + postgres_pool: &PgPool, symbol: &str, ) -> Result, Box> { Ok(query_as!( diff --git a/backend/src/database/bars.rs b/backend/src/database/bars.rs index 483a6cd..19b88af 100644 --- a/backend/src/database/bars.rs +++ b/backend/src/database/bars.rs @@ -8,9 +8,10 @@ pub async fn add_bar( ) -> Result> { query_as!( Bar, - r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume) VALUES ($1, $2, $3, $4, $5, $6, $7) - RETURNING timestamp, asset_symbol, open, high, low, close, volume"#, - bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume + r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9 + RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, + bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted ) .fetch_one(postgres_pool) .await @@ -29,6 +30,8 @@ pub async fn add_bars( let mut lows = Vec::with_capacity(bars.len()); let mut closes = Vec::with_capacity(bars.len()); let mut volumes = Vec::with_capacity(bars.len()); + let mut num_trades = Vec::with_capacity(bars.len()); + let mut volumes_weighted = Vec::with_capacity(bars.len()); for bar in bars { timestamps.push(bar.timestamp); @@ -38,14 +41,16 @@ pub async fn add_bars( lows.push(bar.low); closes.push(bar.close); volumes.push(bar.volume); + num_trades.push(bar.num_trades); + volumes_weighted.push(bar.volume_weighted); } query_as!( Bar, - r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume) - SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[]) - RETURNING timestamp, asset_symbol, open, high, low, close, volume"#, - ×tamps, &asset_symbols, &opens, &highs, &lows, &closes, &volumes + r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) + SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[]) + RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, + ×tamps, &asset_symbols, &opens, &highs, &lows, &closes, &volumes, &num_trades, &volumes_weighted ) .fetch_all(postgres_pool) .await diff --git a/backend/src/main.rs b/backend/src/main.rs index 637aae1..d09d7f1 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,61 +1,43 @@ +mod config; mod data; mod database; -mod pool; mod routes; mod types; -use apca::data::v2::stream::IEX; -use data::live::{ - crypto::{self, Crypto}, - run_data_live, stocks, -}; +use config::AppConfig; +use data::live::run_data_live; use dotenv::dotenv; -use pool::{alpaca::create_alpaca_pool_from_env, postgres::create_postgres_pool_from_env}; use routes::run_api; -use std::{error::Error, sync::Arc}; -use tokio::spawn; - -const NUM_CLIENTS: usize = 10; +use std::error::Error; +use tokio::{spawn, sync::broadcast}; +use types::{AssetBroadcastMessage, Class}; #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); log4rs::init_file("log4rs.yaml", Default::default()).unwrap(); + let app_config = AppConfig::arc_from_env().await.unwrap(); let mut threads = Vec::new(); - let postgres_pool = create_postgres_pool_from_env(NUM_CLIENTS).await?; - let alpaca_pool = create_alpaca_pool_from_env(NUM_CLIENTS).await?; + let (asset_broadcast_sender, _) = broadcast::channel::(100); // Stock Live Data - let (stock_live_stream_subscription_mutex, stock_live_mpsc) = - stocks::init_stream_subscription_mpsc(&postgres_pool).await?; - let stock_live_mpsc_sender_arc = Arc::new(stock_live_mpsc.sender); - - threads.push(spawn(run_data_live::( - postgres_pool.clone(), - stock_live_stream_subscription_mutex.clone(), - stock_live_mpsc.receiver, + threads.push(spawn(run_data_live( + Class::UsEquity, + app_config.clone(), + asset_broadcast_sender.subscribe(), ))); // Crypto Live Data - let (crypto_stream_subscription_mutex, crypto_live_mpsc) = - crypto::init_stream_subscription_mpsc(&postgres_pool).await?; - let crypto_live_mpsc_sender_arc = Arc::new(crypto_live_mpsc.sender); - - threads.push(spawn(run_data_live::( - postgres_pool.clone(), - crypto_stream_subscription_mutex.clone(), - crypto_live_mpsc.receiver, + threads.push(spawn(run_data_live( + Class::Crypto, + app_config.clone(), + asset_broadcast_sender.subscribe(), ))); // REST API - threads.push(spawn(run_api( - postgres_pool.clone(), - alpaca_pool.clone(), - stock_live_mpsc_sender_arc.clone(), - crypto_live_mpsc_sender_arc.clone(), - ))); + threads.push(spawn(run_api(app_config.clone(), asset_broadcast_sender))); for thread in threads { let _ = thread.await?; diff --git a/backend/src/pool/alpaca.rs b/backend/src/pool/alpaca.rs deleted file mode 100644 index a8d77a5..0000000 --- a/backend/src/pool/alpaca.rs +++ /dev/null @@ -1,98 +0,0 @@ -use apca::{ApiInfo, Client}; -use async_trait::async_trait; -use deadpool::managed::{BuildError, Manager, Pool, RecycleResult}; -use std::{env, error::Error}; - -pub struct AlpacaManager { - apca_api_base_url: String, - apca_api_key_id: String, - apca_api_secret_key: String, -} - -impl AlpacaManager { - pub fn new( - apca_api_base_url: String, - apca_api_key_id: String, - apca_api_secret_key: String, - ) -> Self { - Self { - apca_api_base_url, - apca_api_key_id, - apca_api_secret_key, - } - } -} - -pub type AlpacaPool = Pool; - -#[async_trait] -impl Manager for AlpacaManager { - type Type = Client; - type Error = Box; - - async fn create(&self) -> Result { - let client = Client::new(ApiInfo::from_parts( - &self.apca_api_base_url, - &self.apca_api_key_id, - &self.apca_api_secret_key, - )?); - Ok(client) - } - - async fn recycle(&self, _: &mut Self::Type) -> RecycleResult { - Ok(()) - } -} - -pub async fn create_alpaca_client( - apca_api_base_url: &str, - apca_api_key_id: &str, - apca_api_secret_key: &str, -) -> Result> { - Ok(Client::new(ApiInfo::from_parts( - apca_api_base_url, - apca_api_key_id, - apca_api_secret_key, - )?)) -} - -pub async fn create_alpaca_client_from_env() -> Result> { - create_alpaca_client( - &env::var("APCA_API_BASE_URL")?, - &env::var("APCA_API_KEY_ID")?, - &env::var("APCA_API_SECRET_KEY")?, - ) - .await -} - -pub async fn create_alpaca_pool( - apca_api_base_url: &str, - apca_api_key_id: &str, - apca_api_secret_key: &str, - num_clients: usize, -) -> Result> { - let manager = AlpacaManager::new( - apca_api_base_url.to_owned(), - apca_api_key_id.to_owned(), - apca_api_secret_key.to_owned(), - ); - Pool::builder(manager) - .max_size(num_clients) - .build() - .map_err(|e| match e { - BuildError::Backend(e) => e, - BuildError::NoRuntimeSpecified(_) => unreachable!(), - }) -} - -pub async fn create_alpaca_pool_from_env( - num_clients: usize, -) -> Result> { - create_alpaca_pool( - &env::var("APCA_API_BASE_URL")?, - &env::var("APCA_API_KEY_ID")?, - &env::var("APCA_API_SECRET_KEY")?, - num_clients, - ) - .await -} diff --git a/backend/src/pool/mod.rs b/backend/src/pool/mod.rs deleted file mode 100644 index d0cd51c..0000000 --- a/backend/src/pool/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod alpaca; -pub mod postgres; diff --git a/backend/src/pool/postgres.rs b/backend/src/pool/postgres.rs deleted file mode 100644 index e745aaa..0000000 --- a/backend/src/pool/postgres.rs +++ /dev/null @@ -1,21 +0,0 @@ -use sqlx::{postgres::PgPoolOptions, PgPool}; -use std::{env, error::Error}; - -pub type PostgresPool = PgPool; - -pub async fn create_postgres_pool( - database_url: &str, - num_clients: usize, -) -> Result> { - PgPoolOptions::new() - .max_connections(num_clients as u32) - .connect(database_url) - .await - .map_err(|e| e.into()) -} - -pub async fn create_postgres_pool_from_env( - num_clients: usize, -) -> Result> { - create_postgres_pool(&env::var("DATABASE_URL")?, num_clients).await -} diff --git a/backend/src/routes/assets.rs b/backend/src/routes/assets.rs index 4be5f0d..76d34f1 100644 --- a/backend/src/routes/assets.rs +++ b/backend/src/routes/assets.rs @@ -1,22 +1,22 @@ -use crate::data::live::AssetMPSCMessage; +use std::sync::Arc; + +use crate::config::AppConfig; use crate::database; use crate::database::assets::update_asset_trading; -use crate::pool::alpaca::AlpacaPool; -use crate::pool::postgres::PostgresPool; -use crate::types::{Asset, Class, Exchange}; -use apca::api::v2::asset::{self, Symbol}; -use apca::RequestError; +use crate::types::api; +use crate::types::{Asset, AssetBroadcastMessage, Status}; use axum::{extract::Path, http::StatusCode, Extension, Json}; +use http::Method; use log::info; use serde::Deserialize; -use sqlx::types::time::OffsetDateTime; -use std::sync::Arc; -use tokio::sync::mpsc::Sender; +use tokio::sync::broadcast::Sender; + +const ALPACA_API_URL: &str = "https://api.alpaca.markets/v2"; pub async fn get_assets( - Extension(postgres_pool): Extension, + Extension(app_config): Extension>, ) -> Result<(StatusCode, Json>), StatusCode> { - let assets = database::assets::get_assets(&postgres_pool) + let assets = database::assets::get_assets(&app_config.postgres_pool) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -24,10 +24,10 @@ pub async fn get_assets( } pub async fn get_asset( - Extension(postgres_pool): Extension, + Extension(app_config): Extension>, Path(symbol): Path, ) -> Result<(StatusCode, Json), StatusCode> { - let asset = database::assets::get_asset(&postgres_pool, &symbol) + let asset = database::assets::get_asset(&app_config.postgres_pool, &symbol) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -37,7 +37,6 @@ pub async fn get_asset( } } -#[allow(dead_code)] #[derive(Deserialize)] pub struct AddAssetRequest { symbol: String, @@ -45,13 +44,11 @@ pub struct AddAssetRequest { } pub async fn add_asset( - Extension(postgres_pool): Extension, - Extension(alpaca_pool): Extension, - Extension(stock_live_mpsc_sender): Extension>>, - Extension(crypto_live_mpsc_sender): Extension>>, + Extension(app_config): Extension>, + Extension(asset_broadcast_sender): Extension>, Json(request): Json, ) -> Result<(StatusCode, Json), StatusCode> { - if database::assets::get_asset(&postgres_pool, &request.symbol) + if database::assets::get_asset(&app_config.postgres_pool, &request.symbol) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .is_some() @@ -59,44 +56,40 @@ pub async fn add_asset( return Err(StatusCode::CONFLICT); } - let asset = alpaca_pool - .get() + let asset = app_config + .reqwest_client + .request( + Method::GET, + &format!("{}/assets/{}", ALPACA_API_URL, request.symbol), + ) + .header("APCA-API-KEY-ID", &app_config.alpaca_api_key) + .header("APCA-API-SECRET-KEY", &app_config.alpaca_api_secret) + .send() .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .issue::(&Symbol::Sym(request.symbol)) - .await - .map_err(|e| match e { - RequestError::Endpoint(_) => StatusCode::NOT_FOUND, + .map_err(|e| match e.status() { + Some(StatusCode::NOT_FOUND) => StatusCode::NOT_FOUND, + Some(StatusCode::FORBIDDEN) => panic!(), _ => StatusCode::INTERNAL_SERVER_ERROR, })?; - let asset = Asset { - symbol: asset.symbol, - class: Class::from(asset.class) as Class, - exchange: Exchange::from(asset.exchange) as Exchange, - trading: request.trading.unwrap_or(false), - date_added: OffsetDateTime::now_utc(), - }; + let asset = asset.json::().await.unwrap(); - let asset = database::assets::add_asset(&postgres_pool, asset) + if asset.status != Status::Active || !asset.tradable || !asset.fractionable { + return Err(StatusCode::FORBIDDEN); + } + + let mut asset = Asset::from(asset); + if let Some(trading) = request.trading { + asset.trading = trading; + } + + let asset = database::assets::add_asset(&app_config.postgres_pool, asset) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - match asset.class { - Class(asset::Class::UsEquity) => { - stock_live_mpsc_sender - .send(AssetMPSCMessage::Added(asset.clone())) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - } - Class(asset::Class::Crypto) => { - crypto_live_mpsc_sender - .send(AssetMPSCMessage::Added(asset.clone())) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - } - _ => {} - } + asset_broadcast_sender + .send(AssetBroadcastMessage::Added(asset.clone())) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; info!("Added asset {}.", asset.symbol); Ok((StatusCode::CREATED, Json(asset))) @@ -109,16 +102,21 @@ pub struct UpdateAssetRequest { } pub async fn update_asset( - Extension(postgres_pool): Extension, + Extension(app_config): Extension>, + Extension(asset_broadcast_sender): Extension>, Path(symbol): Path, Json(request): Json, ) -> Result<(StatusCode, Json), StatusCode> { - let asset = update_asset_trading(&postgres_pool, &symbol, request.trading) + let asset = update_asset_trading(&app_config.postgres_pool, &symbol, request.trading) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; match asset { Some(asset) => { + asset_broadcast_sender + .send(AssetBroadcastMessage::Updated(asset.clone())) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + info!("Updated asset {}.", symbol); Ok((StatusCode::OK, Json(asset))) } @@ -127,32 +125,19 @@ pub async fn update_asset( } pub async fn delete_asset( - Extension(postgres_pool): Extension, - Extension(stock_live_mpsc_sender): Extension>>, - Extension(crypto_live_mpsc_sender): Extension>>, + Extension(app_config): Extension>, + Extension(asset_broadcast_sender): Extension>, Path(symbol): Path, ) -> Result { - let asset = database::assets::delete_asset(&postgres_pool, &symbol) + let asset = database::assets::delete_asset(&app_config.postgres_pool, &symbol) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; match asset { Some(asset) => { - match asset.class { - Class(asset::Class::UsEquity) => { - stock_live_mpsc_sender - .send(AssetMPSCMessage::Removed(asset.clone())) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - } - Class(asset::Class::Crypto) => { - crypto_live_mpsc_sender - .send(AssetMPSCMessage::Removed(asset.clone())) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - } - _ => {} - } + asset_broadcast_sender + .send(AssetBroadcastMessage::Deleted(asset.clone())) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; info!("Deleted asset {}.", symbol); Ok(StatusCode::NO_CONTENT) diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs index a1e4c9c..cb90879 100644 --- a/backend/src/routes/mod.rs +++ b/backend/src/routes/mod.rs @@ -1,22 +1,17 @@ -use crate::{ - data::live::AssetMPSCMessage, - pool::{alpaca::AlpacaPool, postgres::PostgresPool}, -}; +use crate::{config::AppConfig, types::AssetBroadcastMessage}; use axum::{ routing::{delete, get, post}, Extension, Router, Server, }; use log::info; use std::{net::SocketAddr, sync::Arc}; -use tokio::sync::mpsc::Sender; +use tokio::sync::broadcast::Sender; pub mod assets; pub async fn run_api( - postgres_pool: PostgresPool, - alpaca_pool: AlpacaPool, - stock_live_mpsc_sender: Arc>, - crypto_live_mpsc_sender: Arc>, + app_config: Arc, + asset_broadcast_sender: Sender, ) -> Result<(), Box> { let app = Router::new() .route("/assets", get(assets::get_assets)) @@ -24,10 +19,8 @@ pub async fn run_api( .route("/assets", post(assets::add_asset)) .route("/assets/:symbol", post(assets::update_asset)) .route("/assets/:symbol", delete(assets::delete_asset)) - .layer(Extension(postgres_pool)) - .layer(Extension(alpaca_pool)) - .layer(Extension(stock_live_mpsc_sender)) - .layer(Extension(crypto_live_mpsc_sender)); + .layer(Extension(app_config)) + .layer(Extension(asset_broadcast_sender)); let addr = SocketAddr::from(([0, 0, 0, 0], 7878)); info!("Listening on {}...", addr); diff --git a/backend/src/types.rs b/backend/src/types.rs deleted file mode 100644 index fe592d7..0000000 --- a/backend/src/types.rs +++ /dev/null @@ -1,88 +0,0 @@ -use serde::{Deserialize, Serialize}; -use sqlx::{error::BoxDynError, Decode, Encode, FromRow, Postgres, Type}; -use std::ops::Deref; -use time::OffsetDateTime; - -macro_rules! impl_apca_sqlx_traits { - ($outer_type:ident, $inner_type:path, $fallback:expr) => { - #[derive(Clone, Debug, Copy, PartialEq, Serialize, Deserialize)] - pub struct $outer_type(pub $inner_type); - - impl Deref for $outer_type { - type Target = $inner_type; - - fn deref(&self) -> &Self::Target { - &self.0 - } - } - - impl From<$inner_type> for $outer_type { - fn from(inner: $inner_type) -> Self { - $outer_type(inner) - } - } - - impl From for $outer_type { - fn from(s: String) -> Self { - s.parse().unwrap_or($fallback).into() - } - } - - impl Decode<'_, Postgres> for $outer_type { - fn decode( - value: >::ValueRef, - ) -> Result { - Ok($outer_type::from(>::decode( - value, - )?)) - } - } - - impl Encode<'_, Postgres> for $outer_type { - fn encode_by_ref( - &self, - buf: &mut >::ArgumentBuffer, - ) -> sqlx::encode::IsNull { - >::encode_by_ref(&self.0.as_ref().into(), buf) - } - } - - impl Type for $outer_type { - fn type_info() -> ::TypeInfo { - >::type_info() - } - } - }; -} - -impl_apca_sqlx_traits!( - Class, - apca::api::v2::asset::Class, - apca::api::v2::asset::Class::Unknown -); - -impl_apca_sqlx_traits!( - Exchange, - apca::api::v2::asset::Exchange, - apca::api::v2::asset::Exchange::Unknown -); - -#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] -pub struct Asset { - pub symbol: String, - pub class: Class, - pub exchange: Exchange, - pub trading: bool, - pub date_added: OffsetDateTime, -} - -#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] -pub struct Bar { - pub timestamp: OffsetDateTime, - pub asset_symbol: String, - pub open: f64, - pub high: f64, - pub low: f64, - pub close: f64, - pub volume: f64, -} diff --git a/backend/src/types/api/asset.rs b/backend/src/types/api/asset.rs new file mode 100644 index 0000000..6717ad2 --- /dev/null +++ b/backend/src/types/api/asset.rs @@ -0,0 +1,19 @@ +use crate::types::{Class, Exchange, Status}; +use serde::Deserialize; + +#[derive(Deserialize)] +pub struct Asset { + pub id: String, + pub class: Class, + pub exchange: Exchange, + pub symbol: String, + pub name: String, + pub status: Status, + pub tradable: bool, + pub marginable: bool, + pub shortable: bool, + pub easy_to_borrow: bool, + pub fractionable: bool, + pub maintenance_margin_requirement: Option, + pub attributes: Option>, +} diff --git a/backend/src/types/api/mod.rs b/backend/src/types/api/mod.rs new file mode 100644 index 0000000..00c8a86 --- /dev/null +++ b/backend/src/types/api/mod.rs @@ -0,0 +1,3 @@ +pub mod asset; + +pub use asset::*; diff --git a/backend/src/types/asset.rs b/backend/src/types/asset.rs new file mode 100644 index 0000000..ec753a4 --- /dev/null +++ b/backend/src/types/asset.rs @@ -0,0 +1,33 @@ +use super::{api, class::Class, exchange::Exchange}; +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] +pub struct Asset { + pub symbol: String, + pub class: Class, + pub exchange: Exchange, + pub trading: bool, + pub date_added: OffsetDateTime, +} + +impl From for Asset { + fn from(asset: api::Asset) -> Self { + Self { + symbol: asset.symbol, + class: asset.class, + exchange: asset.exchange, + trading: asset.tradable, + date_added: OffsetDateTime::now_utc(), + } + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum AssetBroadcastMessage { + Added(Asset), + Updated(Asset), + Deleted(Asset), + Reset(Asset), +} diff --git a/backend/src/types/bar.rs b/backend/src/types/bar.rs new file mode 100644 index 0000000..83548cb --- /dev/null +++ b/backend/src/types/bar.rs @@ -0,0 +1,33 @@ +use super::websocket::BarMessage; +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] +pub struct Bar { + pub timestamp: OffsetDateTime, + pub asset_symbol: String, + pub open: f64, + pub high: f64, + pub low: f64, + pub close: f64, + pub volume: f64, + pub num_trades: i64, + pub volume_weighted: f64, +} + +impl From for Bar { + fn from(bar_message: BarMessage) -> Self { + Self { + timestamp: bar_message.timestamp, + asset_symbol: bar_message.symbol, + open: bar_message.open, + high: bar_message.high, + low: bar_message.low, + close: bar_message.close, + volume: bar_message.volume, + num_trades: bar_message.num_trades, + volume_weighted: bar_message.volume_weighted, + } + } +} diff --git a/backend/src/types/class.rs b/backend/src/types/class.rs new file mode 100644 index 0000000..ecd398b --- /dev/null +++ b/backend/src/types/class.rs @@ -0,0 +1,12 @@ +use serde::{Deserialize, Serialize}; +use sqlx::Type; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)] +pub enum Class { + #[sqlx(rename = "us_equity")] + #[serde(rename = "us_equity")] + UsEquity, + #[sqlx(rename = "crypto")] + #[serde(rename = "crypto")] + Crypto, +} diff --git a/backend/src/types/exchange.rs b/backend/src/types/exchange.rs new file mode 100644 index 0000000..aaba7b2 --- /dev/null +++ b/backend/src/types/exchange.rs @@ -0,0 +1,30 @@ +use serde::{Deserialize, Serialize}; +use sqlx::Type; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)] +pub enum Exchange { + #[sqlx(rename = "AMEX")] + #[serde(rename = "AMEX")] + Amex, + #[sqlx(rename = "ARCA")] + #[serde(rename = "ARCA")] + Arca, + #[sqlx(rename = "BATS")] + #[serde(rename = "BATS")] + Bats, + #[sqlx(rename = "NYSE")] + #[serde(rename = "NYSE")] + Nyse, + #[sqlx(rename = "NASDAQ")] + #[serde(rename = "NASDAQ")] + Nasdaq, + #[sqlx(rename = "NYSEARCA")] + #[serde(rename = "NYSEARCA")] + Nysearca, + #[sqlx(rename = "OTC")] + #[serde(rename = "OTC")] + Otc, + #[sqlx(rename = "CRYPTO")] + #[serde(rename = "CRYPTO")] + Crypto, +} diff --git a/backend/src/types/mod.rs b/backend/src/types/mod.rs new file mode 100644 index 0000000..d55847a --- /dev/null +++ b/backend/src/types/mod.rs @@ -0,0 +1,13 @@ +pub mod api; +pub mod asset; +pub mod bar; +pub mod class; +pub mod exchange; +pub mod status; +pub mod websocket; + +pub use asset::*; +pub use bar::*; +pub use class::*; +pub use exchange::*; +pub use status::*; diff --git a/backend/src/types/status.rs b/backend/src/types/status.rs new file mode 100644 index 0000000..e6f651e --- /dev/null +++ b/backend/src/types/status.rs @@ -0,0 +1,12 @@ +use serde::{Deserialize, Serialize}; +use sqlx::Type; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)] +pub enum Status { + #[sqlx(rename = "active")] + #[serde(rename = "active")] + Active, + #[sqlx(rename = "inactive")] + #[serde(rename = "inactive")] + Inactive, +} diff --git a/backend/src/types/websocket/incoming.rs b/backend/src/types/websocket/incoming.rs new file mode 100644 index 0000000..6f75a2e --- /dev/null +++ b/backend/src/types/websocket/incoming.rs @@ -0,0 +1,63 @@ +use serde::Deserialize; +use time::OffsetDateTime; + +#[derive(Debug, Deserialize, PartialEq)] +#[serde(tag = "T")] +pub enum IncomingMessage { + #[serde(rename = "success")] + Success(SuccessMessage), + #[serde(rename = "subscription")] + Subscription(SubscriptionMessage), + #[serde(rename = "b")] + Bars(BarMessage), + #[serde(rename = "u")] + UpdatedBars(BarMessage), +} + +#[derive(Debug, PartialEq, Deserialize)] +pub enum SuccessMessageType { + #[serde(rename = "connected")] + Connected, + #[serde(rename = "authenticated")] + Authenticated, +} + +#[derive(Debug, PartialEq, Deserialize)] +pub struct SuccessMessage { + pub msg: SuccessMessageType, +} + +#[derive(Debug, PartialEq, Deserialize)] +pub struct SubscriptionMessage { + pub trades: Vec, + pub quotes: Vec, + pub orderbooks: Vec, + pub bars: Vec, + #[serde(rename = "updatedBars")] + pub updated_bars: Vec, + #[serde(rename = "dailyBars")] + pub daily_bars: Vec, +} + +#[derive(Debug, PartialEq, Deserialize)] +pub struct BarMessage { + #[serde(rename = "t")] + #[serde(with = "time::serde::rfc3339")] + pub timestamp: OffsetDateTime, + #[serde(rename = "S")] + pub symbol: String, + #[serde(rename = "o")] + pub open: f64, + #[serde(rename = "h")] + pub high: f64, + #[serde(rename = "l")] + pub low: f64, + #[serde(rename = "c")] + pub close: f64, + #[serde(rename = "v")] + pub volume: f64, + #[serde(rename = "n")] + pub num_trades: i64, + #[serde(rename = "vw")] + pub volume_weighted: f64, +} diff --git a/backend/src/types/websocket/mod.rs b/backend/src/types/websocket/mod.rs new file mode 100644 index 0000000..58f5a05 --- /dev/null +++ b/backend/src/types/websocket/mod.rs @@ -0,0 +1,5 @@ +pub mod incoming; +pub mod outgoing; + +pub use incoming::*; +pub use outgoing::*; diff --git a/backend/src/types/websocket/outgoing.rs b/backend/src/types/websocket/outgoing.rs new file mode 100644 index 0000000..e6bfa2c --- /dev/null +++ b/backend/src/types/websocket/outgoing.rs @@ -0,0 +1,47 @@ +use serde::Serialize; + +#[derive(Debug, Serialize)] +#[serde(tag = "action")] +pub enum OutgoingMessage { + #[serde(rename = "auth")] + Auth(AuthMessage), + #[serde(rename = "subscribe")] + Subscribe(SubscribeMessage), + #[serde(rename = "unsubscribe")] + Unsubscribe(SubscribeMessage), +} + +#[derive(Debug, Serialize)] +pub struct AuthMessage { + key: String, + secret: String, +} + +impl AuthMessage { + pub fn new(key: String, secret: String) -> Self { + Self { key, secret } + } +} + +#[derive(Debug, Serialize)] +pub struct SubscribeMessage { + bars: Vec, + #[serde(rename = "updatedBars")] + updated_bars: Vec, +} + +impl SubscribeMessage { + pub fn new(symbol: String) -> Self { + Self { + bars: vec![symbol.clone()], + updated_bars: vec![symbol], + } + } + + pub fn from_vec(symbols: Vec) -> Self { + Self { + bars: symbols.clone(), + updated_bars: symbols, + } + } +} diff --git a/support/timescaledb/999_init.sh b/support/timescaledb/999_init.sh index e6249f3..ec8c5de 100644 --- a/support/timescaledb/999_init.sh +++ b/support/timescaledb/999_init.sh @@ -1,7 +1,7 @@ #!/bin/bash psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL - CREATE TYPE CLASS AS ENUM ('us_equity', 'crypto', 'unknown'); + CREATE TYPE CLASS AS ENUM ('us_equity', 'crypto'); CREATE TYPE EXCHANGE AS ENUM ( 'AMEX', @@ -11,7 +11,7 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL 'NYSE', 'NYSEARCA', 'OTC', - 'unknown' + 'CRYPTO' ); CREATE TABLE assets ( @@ -30,6 +30,8 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL low DOUBLE PRECISION NOT NULL, close DOUBLE PRECISION NOT NULL, volume DOUBLE PRECISION NOT NULL, + num_trades BIGINT NOT NULL, + volume_weighted DOUBLE PRECISION NOT NULL, PRIMARY KEY (asset_symbol, timestamp) );