From 9d3d51f23c0690273c89107cd5dbe80db6c5790b Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Mon, 28 Aug 2023 13:45:51 +0300 Subject: [PATCH] Add RabbitMQ messaging Signed-off-by: Nikolaos Karaolidis --- ...1dc6c522d6e2b28feccd9fc49bcf10299033e.json | 15 - ...88c8355e8cc7718239dfec6e4a52d99e4e7bf.json | 80 ++ ...26d3867c8f56564b9c49793c688c707a772c8.json | 79 ++ ...c7420b716a4529da589c6698874f8836f89aa.json | 111 +++ ...71831bffd7fec02c622d60308471be02b98c7.json | 2 +- ...9e1f1c4be7ed8b5d5ba64e0ce74d775f8f794.json | 45 -- ...503b677327b5fd77acb19edd3440e26402fb7.json | 14 - ...c28a79ed91d04216a847dd5d358df3e6e24ee.json | 2 +- backend/Cargo.lock | 734 +++++++++++++++++- backend/Dockerfile | 1 + backend/assets/Cargo.toml | 3 + backend/assets/src/main.rs | 45 +- backend/assets/src/routes.rs | 86 +- backend/common/Cargo.toml | 18 +- backend/common/src/alpaca.rs | 23 - backend/common/src/database.rs | 12 +- backend/common/src/lib.rs | 2 +- backend/common/src/pool.rs | 79 ++ backend/log4rs.yaml | 8 + docker-compose.yml | 6 - support/timescaledb/999_init.sh | 16 +- support/timescaledb/docker-compose.yml | 2 +- 22 files changed, 1232 insertions(+), 151 deletions(-) delete mode 100644 backend/.sqlx/query-2d06d5d904d93907cf5aed70eb11dc6c522d6e2b28feccd9fc49bcf10299033e.json create mode 100644 backend/.sqlx/query-3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf.json create mode 100644 backend/.sqlx/query-3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8.json create mode 100644 backend/.sqlx/query-60473446809d8d5a8d13ad0fe94c7420b716a4529da589c6698874f8836f89aa.json delete mode 100644 backend/.sqlx/query-82ee2837924b35ae4cce242c22f9e1f1c4be7ed8b5d5ba64e0ce74d775f8f794.json delete mode 100644 backend/.sqlx/query-919f09985c1568dfc2f8cc3c693503b677327b5fd77acb19edd3440e26402fb7.json delete mode 100644 backend/common/src/alpaca.rs create mode 100644 backend/common/src/pool.rs create mode 100644 backend/log4rs.yaml diff --git a/backend/.sqlx/query-2d06d5d904d93907cf5aed70eb11dc6c522d6e2b28feccd9fc49bcf10299033e.json b/backend/.sqlx/query-2d06d5d904d93907cf5aed70eb11dc6c522d6e2b28feccd9fc49bcf10299033e.json deleted file mode 100644 index 619a0db..0000000 --- a/backend/.sqlx/query-2d06d5d904d93907cf5aed70eb11dc6c522d6e2b28feccd9fc49bcf10299033e.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE assets SET trading = $1 WHERE symbol = $2", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bool", - "Text" - ] - }, - "nullable": [] - }, - "hash": "2d06d5d904d93907cf5aed70eb11dc6c522d6e2b28feccd9fc49bcf10299033e" -} diff --git a/backend/.sqlx/query-3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf.json b/backend/.sqlx/query-3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf.json new file mode 100644 index 0000000..fbb4770 --- /dev/null +++ b/backend/.sqlx/query-3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf.json @@ -0,0 +1,80 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "symbol", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "class: Class", + "type_info": { + "Custom": { + "name": "class", + "kind": { + "Enum": [ + "us_equity", + "crypto", + "unknown" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "exchange: Exchange", + "type_info": { + "Custom": { + "name": "exchange", + "kind": { + "Enum": [ + "AMEX", + "ARCA", + "BATS", + "NASDAQ", + "NYSE", + "NYSEARCA", + "OTC", + "unknown" + ] + } + } + } + }, + { + "ordinal": 4, + "name": "trading", + "type_info": "Bool" + }, + { + "ordinal": 5, + "name": "date_added", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Bool", + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf" +} diff --git a/backend/.sqlx/query-3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8.json b/backend/.sqlx/query-3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8.json new file mode 100644 index 0000000..a398026 --- /dev/null +++ b/backend/.sqlx/query-3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8.json @@ -0,0 +1,79 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM assets WHERE symbol = $1 RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "symbol", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "class: Class", + "type_info": { + "Custom": { + "name": "class", + "kind": { + "Enum": [ + "us_equity", + "crypto", + "unknown" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "exchange: Exchange", + "type_info": { + "Custom": { + "name": "exchange", + "kind": { + "Enum": [ + "AMEX", + "ARCA", + "BATS", + "NASDAQ", + "NYSE", + "NYSEARCA", + "OTC", + "unknown" + ] + } + } + } + }, + { + "ordinal": 4, + "name": "trading", + "type_info": "Bool" + }, + { + "ordinal": 5, + "name": "date_added", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8" +} diff --git a/backend/.sqlx/query-60473446809d8d5a8d13ad0fe94c7420b716a4529da589c6698874f8836f89aa.json b/backend/.sqlx/query-60473446809d8d5a8d13ad0fe94c7420b716a4529da589c6698874f8836f89aa.json new file mode 100644 index 0000000..3638a03 --- /dev/null +++ b/backend/.sqlx/query-60473446809d8d5a8d13ad0fe94c7420b716a4529da589c6698874f8836f89aa.json @@ -0,0 +1,111 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO assets (id, symbol, class, exchange, trading, date_added) VALUES ($1, $2, $3::CLASS, $4::EXCHANGE, $5, $6) RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "symbol", + "type_info": "Varchar" + }, + { + "ordinal": 2, + "name": "class: Class", + "type_info": { + "Custom": { + "name": "class", + "kind": { + "Enum": [ + "us_equity", + "crypto", + "unknown" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "exchange: Exchange", + "type_info": { + "Custom": { + "name": "exchange", + "kind": { + "Enum": [ + "AMEX", + "ARCA", + "BATS", + "NASDAQ", + "NYSE", + "NYSEARCA", + "OTC", + "unknown" + ] + } + } + } + }, + { + "ordinal": 4, + "name": "trading", + "type_info": "Bool" + }, + { + "ordinal": 5, + "name": "date_added", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Varchar", + { + "Custom": { + "name": "class", + "kind": { + "Enum": [ + "us_equity", + "crypto", + "unknown" + ] + } + } + }, + { + "Custom": { + "name": "exchange", + "kind": { + "Enum": [ + "AMEX", + "ARCA", + "BATS", + "NASDAQ", + "NYSE", + "NYSEARCA", + "OTC", + "unknown" + ] + } + } + }, + "Bool", + "Timestamptz" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "60473446809d8d5a8d13ad0fe94c7420b716a4529da589c6698874f8836f89aa" +} diff --git a/backend/.sqlx/query-798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7.json b/backend/.sqlx/query-798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7.json index 85b036f..0856918 100644 --- a/backend/.sqlx/query-798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7.json +++ b/backend/.sqlx/query-798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7.json @@ -58,7 +58,7 @@ { "ordinal": 5, "name": "date_added", - "type_info": "Timestamp" + "type_info": "Timestamptz" } ], "parameters": { diff --git a/backend/.sqlx/query-82ee2837924b35ae4cce242c22f9e1f1c4be7ed8b5d5ba64e0ce74d775f8f794.json b/backend/.sqlx/query-82ee2837924b35ae4cce242c22f9e1f1c4be7ed8b5d5ba64e0ce74d775f8f794.json deleted file mode 100644 index ff34e35..0000000 --- a/backend/.sqlx/query-82ee2837924b35ae4cce242c22f9e1f1c4be7ed8b5d5ba64e0ce74d775f8f794.json +++ /dev/null @@ -1,45 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO assets (id, symbol, class, exchange, trading) VALUES ($1, $2, $3::CLASS, $4::EXCHANGE, $5)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Varchar", - { - "Custom": { - "name": "class", - "kind": { - "Enum": [ - "us_equity", - "crypto", - "unknown" - ] - } - } - }, - { - "Custom": { - "name": "exchange", - "kind": { - "Enum": [ - "AMEX", - "ARCA", - "BATS", - "NASDAQ", - "NYSE", - "NYSEARCA", - "OTC", - "unknown" - ] - } - } - }, - "Bool" - ] - }, - "nullable": [] - }, - "hash": "82ee2837924b35ae4cce242c22f9e1f1c4be7ed8b5d5ba64e0ce74d775f8f794" -} diff --git a/backend/.sqlx/query-919f09985c1568dfc2f8cc3c693503b677327b5fd77acb19edd3440e26402fb7.json b/backend/.sqlx/query-919f09985c1568dfc2f8cc3c693503b677327b5fd77acb19edd3440e26402fb7.json deleted file mode 100644 index 3da7766..0000000 --- a/backend/.sqlx/query-919f09985c1568dfc2f8cc3c693503b677327b5fd77acb19edd3440e26402fb7.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM assets WHERE symbol = $1", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Text" - ] - }, - "nullable": [] - }, - "hash": "919f09985c1568dfc2f8cc3c693503b677327b5fd77acb19edd3440e26402fb7" -} diff --git a/backend/.sqlx/query-98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee.json b/backend/.sqlx/query-98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee.json index 09027d5..7b20e48 100644 --- a/backend/.sqlx/query-98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee.json +++ b/backend/.sqlx/query-98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee.json @@ -58,7 +58,7 @@ { "ordinal": 5, "name": "date_added", - "type_info": "Timestamp" + "type_info": "Timestamptz" } ], "parameters": { diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 2f7082e..290dea6 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -35,6 +35,54 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" +[[package]] +name = "amq-protocol" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d40d8b2465c7959dd40cee32ba6ac334b5de57e9fca0cc756759894a4152a5d" +dependencies = [ + "amq-protocol-tcp", + "amq-protocol-types", + "amq-protocol-uri", + "cookie-factory", + "nom", + "serde", +] + +[[package]] +name = "amq-protocol-tcp" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cb2100adae7da61953a2c3a01935d86caae13329fadce3333f524d6d6ce12e2" +dependencies = [ + "amq-protocol-uri", + "tcp-stream", + "tracing", +] + +[[package]] +name = "amq-protocol-types" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "156ff13c8a3ced600b4e54ed826a2ae6242b6069d00dd98466827cef07d3daff" +dependencies = [ + "cookie-factory", + "nom", + "serde", + "serde_json", +] + +[[package]] +name = "amq-protocol-uri" +version = "7.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "751bbd7d440576066233e740576f1b31fdc6ab86cfabfbd48c548de77eca73e4" +dependencies = [ + "amq-protocol-types", + "percent-encoding", + "url", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -50,6 +98,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" + [[package]] name = "apca" version = "0.27.2" @@ -79,6 +133,12 @@ dependencies = [ "websocket-util", ] +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "assets" version = "0.1.0" @@ -88,12 +148,26 @@ dependencies = [ "common", "deadpool", "dotenv", + "lapin", "log", + "log4rs", "serde", + "serde_json", "sqlx", "tokio", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compression" version = "0.4.1" @@ -107,6 +181,93 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-executor" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 1.9.0", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-global-executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33dd14c5a15affd2abcff50d84efd4009ada28a860f01c14f9d654f3e81b3f75" +dependencies = [ + "async-global-executor", + "async-trait", + "executor-trait", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.23", + "slab", + "socket2 0.4.9", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6012d170ad00de56c9ee354aef2e358359deb1ec504254e0e5a3774771de0e" +dependencies = [ + "async-io", + "async-trait", + "futures-core", + "reactor-trait", +] + +[[package]] +name = "async-task" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" + [[package]] name = "async-trait" version = "0.1.73" @@ -127,6 +288,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" + [[package]] name = "autocfg" version = "1.1.0" @@ -239,6 +406,30 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + +[[package]] +name = "blocking" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand 1.9.0", + "futures-lite", + "log", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -257,6 +448,15 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.0.83" @@ -288,23 +488,50 @@ dependencies = [ "winapi", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "common" version = "0.1.0" dependencies = [ "apca", "deadpool", + "deadpool-lapin", + "lapin", "serde", "sqlx", "time 0.3.27", ] +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +[[package]] +name = "cookie-factory" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" + [[package]] name = "core-foundation" version = "0.9.3" @@ -396,6 +623,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "deadpool-lapin" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20ed9da47e51ead8acbf364b4c5c311fb4d7dd3efa9dab91ee41cac05b59f313" +dependencies = [ + "deadpool", + "lapin", + "tokio-executor-trait", +] + [[package]] name = "deadpool-runtime" version = "0.1.2" @@ -425,6 +663,32 @@ dependencies = [ "serde", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "des" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" +dependencies = [ + "cipher", +] + +[[package]] +name = "destructure_traitobject" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c877555693c14d2f84191cfd3ad8582790fc52b5e2274b40b59cf5f5cea25c7" + [[package]] name = "digest" version = "0.10.7" @@ -437,6 +701,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dotenv" version = "0.15.0" @@ -502,6 +772,24 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "executor-trait" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a1052dd43212a7777ec6a69b117da52f5e52f07aec47d00c1a2b33b85d06b08" +dependencies = [ + "async-trait", +] + +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.0" @@ -618,6 +906,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.28" @@ -686,6 +989,12 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.0" @@ -702,7 +1011,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "312f66718a2d7789ffef4f4b7b213138ed9f1eb3aa1d0d82fc99f88fb3ffd26f" dependencies = [ - "hashbrown", + "hashbrown 0.14.0", ] [[package]] @@ -796,6 +1105,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.27" @@ -865,6 +1180,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.0.0" @@ -872,7 +1197,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.0", +] + +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys", ] [[package]] @@ -899,6 +1254,28 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lapin" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f3067a1fcfbc3fc46455809c023e69b8f6602463201010f4ae5a3b572adb9dc" +dependencies = [ + "amq-protocol", + "async-global-executor-trait", + "async-reactor-trait", + "async-trait", + "executor-trait", + "flume", + "futures-core", + "futures-io", + "parking_lot", + "pinky-swear", + "reactor-trait", + "serde", + "tracing", + "waker-fn", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -931,6 +1308,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.5" @@ -952,6 +1341,41 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "serde", +] + +[[package]] +name = "log-mdc" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a94d21414c1f4a51209ad204c1776a3d0765002c76c6abcb602a6f09f1e881c7" + +[[package]] +name = "log4rs" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d36ca1786d9e79b8193a68d480a0907b612f109537115c6ff655a3a1967533fd" +dependencies = [ + "anyhow", + "arc-swap", + "chrono", + "derivative", + "fnv", + "humantime", + "libc", + "log", + "log-mdc", + "parking_lot", + "serde", + "serde-value", + "serde_json", + "serde_yaml", + "thiserror", + "thread-id", + "typemap-ors", + "winapi", +] [[package]] name = "matchit" @@ -1186,6 +1610,38 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + +[[package]] +name = "p12" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4873306de53fe82e7e484df31e1e947d61514b6ea2ed6cd7b45d63006fd9224" +dependencies = [ + "cbc", + "cipher", + "des", + "getrandom", + "hmac", + "lazy_static", + "rc2", + "sha1", + "yasna", +] + +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1204,7 +1660,7 @@ checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.3.5", "smallvec", "windows-targets", ] @@ -1262,6 +1718,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinky-swear" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d894b67aa7a4bf295db5e85349078c604edaa6fa5c8721e8eca3c7729a27f2ac" +dependencies = [ + "doc-comment", + "flume", + "parking_lot", + "tracing", +] + [[package]] name = "pkcs1" version = "0.7.5" @@ -1289,6 +1757,22 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1343,6 +1827,35 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rc2" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62c64daa8e9438b84aaae55010a93f396f8e60e3911590fcba770d04643fc1dd" +dependencies = [ + "cipher", +] + +[[package]] +name = "reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "438a4293e4d097556730f4711998189416232f009c137389e0f961d2bc0ddc58" +dependencies = [ + "async-trait", + "futures-core", + "futures-io", +] + +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -1358,6 +1871,21 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin 0.5.2", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rsa" version = "0.9.2" @@ -1386,6 +1914,20 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustix" +version = "0.37.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys", +] + [[package]] name = "rustix" version = "0.38.9" @@ -1395,10 +1937,65 @@ dependencies = [ "bitflags 2.4.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.5", "windows-sys", ] +[[package]] +name = "rustls" +version = "0.21.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-connector" +version = "0.18.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "060bcc1795b840d0e56d78f3293be5f652aa1611d249b0e63ffe19f4a8c9ae23" +dependencies = [ + "log", + "rustls", + "rustls-native-certs", + "rustls-webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +dependencies = [ + "base64 0.21.3", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d93931baf2d282fff8d3a532bbfd7653f734643161b87e3e01e59a04439bf0d" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -1426,6 +2023,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -1458,6 +2065,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.188" @@ -1511,6 +2128,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "578a7433b776b56a35785ed5ce9a7e777ac0598aac5a6dd1b4b18a307c7fc71b" +dependencies = [ + "indexmap 1.9.3", + "ryu", + "serde", + "yaml-rust", +] + [[package]] name = "sha1" version = "0.10.5" @@ -1649,7 +2278,7 @@ dependencies = [ "futures-util", "hashlink", "hex", - "indexmap", + "indexmap 2.0.0", "log", "memchr", "once_cell", @@ -1861,6 +2490,18 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "tcp-stream" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4da30af7998f51ee1aa48ab24276fe303a697b004e31ff542b192c088d5630a5" +dependencies = [ + "cfg-if", + "p12", + "rustls-connector", + "rustls-pemfile", +] + [[package]] name = "tempfile" version = "3.8.0" @@ -1868,9 +2509,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if", - "fastrand", - "redox_syscall", - "rustix", + "fastrand 2.0.0", + "redox_syscall 0.3.5", + "rustix 0.38.9", "windows-sys", ] @@ -1894,6 +2535,17 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "thread-id" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79474f573561cdc4871a0de34a51c92f7f5a56039113fbb5b9c9f96bdb756669" +dependencies = [ + "libc", + "redox_syscall 0.2.16", + "winapi", +] + [[package]] name = "time" version = "0.1.45" @@ -1965,6 +2617,17 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "tokio-executor-trait" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "802ccf58e108fe16561f35348fabe15ff38218968f033d587e399a84937533cc" +dependencies = [ + "async-trait", + "executor-trait", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -2108,6 +2771,15 @@ dependencies = [ "utf-8", ] +[[package]] +name = "typemap-ors" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a68c24b707f02dd18f1e4ccceb9d49f2058c2fb86384ef9972592904d7a28867" +dependencies = [ + "unsafe-any-ors", +] + [[package]] name = "typenum" version = "1.16.0" @@ -2147,6 +2819,21 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "unsafe-any-ors" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a303d30665362d9680d7d91d78b23f5f899504d4f08b3c4cf08d055d87c0ad" +dependencies = [ + "destructure_traitobject", +] + +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.4.0" @@ -2185,6 +2872,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "want" version = "0.3.1" @@ -2260,6 +2953,16 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "web-sys" +version = "0.3.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "websocket-util" version = "0.11.2" @@ -2375,6 +3078,21 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] + +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" + [[package]] name = "zeroize" version = "1.6.0" diff --git a/backend/Dockerfile b/backend/Dockerfile index de1fb2f..a0f1760 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -7,5 +7,6 @@ RUN cargo build --release FROM frolvlad/alpine-glibc AS assets WORKDIR /usr/src/assets COPY --from=builder /usr/src/qrust/target/release/assets . +COPY log4rs.yaml . EXPOSE 7878 CMD ["./assets"] diff --git a/backend/assets/Cargo.toml b/backend/assets/Cargo.toml index 2fbf742..5d2f43b 100644 --- a/backend/assets/Cargo.toml +++ b/backend/assets/Cargo.toml @@ -25,3 +25,6 @@ deadpool = { version = "0.9.5", features = [ ] } serde = "1.0.188" log = "0.4.20" +lapin = "2.3.1" +serde_json = "1.0.105" +log4rs = "1.2.0" diff --git a/backend/assets/src/main.rs b/backend/assets/src/main.rs index db74972..9f6ae9c 100644 --- a/backend/assets/src/main.rs +++ b/backend/assets/src/main.rs @@ -2,26 +2,46 @@ use axum::{ routing::{delete, get, post}, Extension, Router, Server, }; -use common::alpaca::create_alpaca_pool; +use common::pool::{ + create_alpaca_pool_from_env, create_database_pool_from_env, create_rabbitmq_pool_from_env, +}; +use deadpool::managed::{Hook, HookError, HookErrorCause}; use dotenv::dotenv; +use lapin::ExchangeKind; use log::info; -use sqlx::PgPool; -use std::{env, error::Error, net::SocketAddr}; +use std::{error::Error, net::SocketAddr}; mod routes; #[tokio::main] async fn main() -> Result<(), Box> { dotenv().ok(); + log4rs::init_file("log4rs.yaml", Default::default()).unwrap(); - let database_pool = PgPool::connect(&env::var("DATABASE_URL").unwrap()).await?; - - let alpaca_pool = create_alpaca_pool( - &env::var("APCA_API_BASE_URL").unwrap(), - &env::var("APCA_API_KEY_ID").unwrap(), - &env::var("APCA_API_SECRET_KEY").unwrap(), - 10, - )?; + let num_clients = 10; + let database_pool = create_database_pool_from_env(num_clients).await?; + let alpaca_pool = create_alpaca_pool_from_env(num_clients).await?; + let rabbitmq_pool = create_rabbitmq_pool_from_env( + num_clients, + Hook::async_fn(|connection: &mut lapin::Connection, _| { + Box::pin(async move { + connection + .create_channel() + .await + .map_err(|e| HookError::Abort(HookErrorCause::Backend(e)))? + .exchange_declare( + "assets", + ExchangeKind::Topic, + Default::default(), + Default::default(), + ) + .await + .map_err(|e| HookError::Abort(HookErrorCause::Backend(e)))?; + Ok(()) + }) + }), + ) + .await?; let app = Router::new() .route("/assets", get(routes::get_assets)) @@ -30,7 +50,8 @@ async fn main() -> Result<(), Box> { .route("/assets/:symbol", post(routes::update_asset)) .route("/assets/:symbol", delete(routes::delete_asset)) .layer(Extension(database_pool)) - .layer(Extension(alpaca_pool)); + .layer(Extension(alpaca_pool)) + .layer(Extension(rabbitmq_pool)); let addr = SocketAddr::from(([0, 0, 0, 0], 7878)); info!("Listening on {}...", addr); diff --git a/backend/assets/src/routes.rs b/backend/assets/src/routes.rs index a059800..d0b4b97 100644 --- a/backend/assets/src/routes.rs +++ b/backend/assets/src/routes.rs @@ -1,11 +1,15 @@ use apca::api::v2::asset::{self, Symbol}; use axum::{extract::Path, http::StatusCode, Extension, Json}; use common::{ - alpaca::AlpacaPool, database::{Asset, Class, Exchange}, + pool::{AlpacaPool, RabbitmqPool}, }; use serde::Deserialize; -use sqlx::{query, query_as, types::Uuid, PgPool}; +use sqlx::{ + query_as, + types::{time::OffsetDateTime, Uuid}, + PgPool, +}; pub async fn get_assets( Extension(database_pool): Extension, @@ -40,6 +44,7 @@ pub struct AddAssetRequest { pub async fn add_asset( Extension(database_pool): Extension, Extension(alpaca_pool): Extension, + Extension(rabbitmq_pool): Extension, Json(request): Json, ) -> Result { if query_as!(Asset, r#"SELECT id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, request.symbol) @@ -58,17 +63,36 @@ pub async fn add_asset( .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - query!( - r#"INSERT INTO assets (id, symbol, class, exchange, trading) VALUES ($1, $2, $3::CLASS, $4::EXCHANGE, $5)"#, - Uuid::parse_str(&asset.id.to_string()).unwrap(), + let asset = query_as!( + Asset, + r#"INSERT INTO assets (id, symbol, class, exchange, trading, date_added) VALUES ($1, $2, $3::CLASS, $4::EXCHANGE, $5, $6) RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, + Uuid::parse_str(&asset.id.to_string()).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?, asset.symbol, Class::from(asset.class) as Class, Exchange::from(asset.exchange) as Exchange, - request.trading + request.trading.unwrap_or(false), + OffsetDateTime::now_utc(), ) - .execute(&database_pool) + .fetch_one(&database_pool) .await - .unwrap(); + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + rabbitmq_pool + .get() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .create_channel() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .basic_publish( + "assets", + "assets.added", + Default::default(), + &serde_json::to_vec(&asset).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?, + Default::default(), + ) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(StatusCode::CREATED) } @@ -81,28 +105,64 @@ pub struct UpdateAssetRequest { pub async fn update_asset( Extension(database_pool): Extension, + Extension(rabbitmq_pool): Extension, Path(symbol): Path, Json(request): Json, ) -> Result { - query_as!( + let asset = query_as!( Asset, - r#"UPDATE assets SET trading = $1 WHERE symbol = $2"#, + r#"UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, request.trading, symbol ) - .execute(&database_pool) + .fetch_one(&database_pool) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + rabbitmq_pool + .get() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .create_channel() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .basic_publish( + "assets", + "assets.updated", + Default::default(), + &serde_json::to_vec(&asset).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?, + Default::default(), + ) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(StatusCode::OK) } pub async fn delete_asset( Extension(database_pool): Extension, + Extension(rabbitmq_pool): Extension, Path(symbol): Path, ) -> Result { - query!(r#"DELETE FROM assets WHERE symbol = $1"#, symbol) - .execute(&database_pool) + let asset = query_as!(Asset, r#"DELETE FROM assets WHERE symbol = $1 RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, symbol) + .fetch_one(&database_pool) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + rabbitmq_pool + .get() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .create_channel() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .basic_publish( + "assets", + "assets.deleted", + Default::default(), + &serde_json::to_vec(&asset).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?, + Default::default(), + ) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; diff --git a/backend/common/Cargo.toml b/backend/common/Cargo.toml index 597c097..f52223e 100644 --- a/backend/common/Cargo.toml +++ b/backend/common/Cargo.toml @@ -1,11 +1,21 @@ [package] -name = "common" +name = "common" version = "0.1.0" edition = "2021" [dependencies] +lapin = "2.3.1" apca = "0.27.2" deadpool = "0.9.5" -serde = { version = "1.0.188", features = ["derive"] } -sqlx = { version = "0.7.1", features = ["uuid", "time", "postgres"] } -time = { version = "0.3.27", features = ["serde"] } +deadpool-lapin = "0.10.0" +serde = { version = "1.0.188", features = [ + "derive", +] } +sqlx = { version = "0.7.1", features = [ + "uuid", + "time", + "postgres", +] } +time = { version = "0.3.27", features = [ + "serde", +] } diff --git a/backend/common/src/alpaca.rs b/backend/common/src/alpaca.rs deleted file mode 100644 index 76b65e1..0000000 --- a/backend/common/src/alpaca.rs +++ /dev/null @@ -1,23 +0,0 @@ -use apca::{ApiInfo, Client}; -use deadpool::unmanaged::Pool; -use std::error::Error; - -pub type AlpacaPool = Pool; - -pub fn create_alpaca_pool( - apca_api_base_url: &str, - apca_api_key_id: &str, - apca_api_secret_key: &str, - num_clients: usize, -) -> Result, Box> { - let mut alpaca_clients = Vec::new(); - for _ in 0..num_clients { - let client = Client::new(ApiInfo::from_parts( - apca_api_base_url, - apca_api_key_id, - apca_api_secret_key, - )?); - alpaca_clients.push(client); - } - Ok(Pool::from(alpaca_clients)) -} diff --git a/backend/common/src/database.rs b/backend/common/src/database.rs index 7abbe3c..f733f16 100644 --- a/backend/common/src/database.rs +++ b/backend/common/src/database.rs @@ -1,11 +1,11 @@ use serde::{Deserialize, Serialize}; use sqlx::{error::BoxDynError, types::Uuid, Decode, Encode, FromRow, Postgres, Type}; use std::ops::Deref; -use time::PrimitiveDateTime; +use time::OffsetDateTime; macro_rules! impl_apca_sqlx_traits { ($outer_type:ident, $inner_type:path, $fallback:expr) => { - #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] + #[derive(Clone, Debug, Copy, PartialEq, Serialize, Deserialize)] pub struct $outer_type($inner_type); impl Deref for $outer_type { @@ -43,7 +43,7 @@ macro_rules! impl_apca_sqlx_traits { &self, buf: &mut >::ArgumentBuffer, ) -> sqlx::encode::IsNull { - >::encode_by_ref(&self.0.as_ref().to_owned(), buf) + >::encode_by_ref(&self.0.as_ref().into(), buf) } } @@ -74,13 +74,13 @@ pub struct Asset { pub class: Class, pub exchange: Exchange, pub trading: bool, - pub date_added: PrimitiveDateTime, + pub date_added: OffsetDateTime, } #[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] pub struct Bar { - pub timestamp: PrimitiveDateTime, - pub symbol_id: Uuid, + pub timestamp: OffsetDateTime, + pub asset_id: Uuid, pub open: f64, pub high: f64, pub low: f64, diff --git a/backend/common/src/lib.rs b/backend/common/src/lib.rs index 773e791..6dc7443 100644 --- a/backend/common/src/lib.rs +++ b/backend/common/src/lib.rs @@ -1,2 +1,2 @@ -pub mod alpaca; +pub mod pool; pub mod database; diff --git a/backend/common/src/pool.rs b/backend/common/src/pool.rs new file mode 100644 index 0000000..2376428 --- /dev/null +++ b/backend/common/src/pool.rs @@ -0,0 +1,79 @@ +use apca::{ApiInfo, Client}; +use deadpool::{unmanaged::Pool, Runtime}; +use deadpool_lapin::Hook; +use sqlx::postgres::PgPoolOptions; +use std::{env, error::Error}; + +pub type AlpacaPool = Pool; + +pub async fn create_alpaca_pool( + apca_api_base_url: &str, + apca_api_key_id: &str, + apca_api_secret_key: &str, + num_clients: usize, +) -> Result, Box> { + let mut alpaca_clients = Vec::new(); + for _ in 0..num_clients { + let client = Client::new(ApiInfo::from_parts( + apca_api_base_url, + apca_api_key_id, + apca_api_secret_key, + )?); + alpaca_clients.push(client); + } + Ok(Pool::from(alpaca_clients)) +} + +pub async fn create_alpaca_pool_from_env( + num_clients: usize, +) -> Result, Box> { + create_alpaca_pool( + &env::var("APCA_API_BASE_URL")?, + &env::var("APCA_API_KEY_ID")?, + &env::var("APCA_API_SECRET_KEY")?, + num_clients, + ) + .await +} + +pub async fn create_database_pool( + database_url: &str, + num_clients: usize, +) -> Result> { + PgPoolOptions::new() + .max_connections(num_clients as u32) + .connect(database_url) + .await + .map_err(|e| e.into()) +} + +pub async fn create_database_pool_from_env( + num_clients: usize, +) -> Result> { + create_database_pool(&env::var("DATABASE_URL")?, num_clients).await +} + +pub type RabbitmqPool = deadpool_lapin::Pool; + +pub async fn create_rabbitmq_pool( + rabbitmq_url: &str, + num_clients: usize, + post_create: impl Into, +) -> Result> { + deadpool_lapin::Config { + url: Some(rabbitmq_url.into()), + ..Default::default() + } + .builder(Some(Runtime::Tokio1)) + .max_size(num_clients) + .post_create(post_create) + .build() + .map_err(|e| e.into()) +} + +pub async fn create_rabbitmq_pool_from_env( + num_clients: usize, + post_create: impl Into, +) -> Result> { + create_rabbitmq_pool(&env::var("RABBITMQ_URL")?, num_clients, post_create).await +} diff --git a/backend/log4rs.yaml b/backend/log4rs.yaml new file mode 100644 index 0000000..6d1312e --- /dev/null +++ b/backend/log4rs.yaml @@ -0,0 +1,8 @@ +appenders: + stdout: + kind: console + +root: + level: info + appenders: + - stdout diff --git a/docker-compose.yml b/docker-compose.yml index c3566b8..8376745 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,22 +3,16 @@ services: extends: file: support/timescaledb/docker-compose.yml service: timescaledb - profiles: - - support rabbitmq: extends: file: support/rabbitmq/docker-compose.yml service: rabbitmq - profiles: - - support nginx: extends: file: support/nginx/docker-compose.yml service: nginx - profiles: - - support assets: extends: diff --git a/support/timescaledb/999_init.sh b/support/timescaledb/999_init.sh index 3481c86..a741f97 100644 --- a/support/timescaledb/999_init.sh +++ b/support/timescaledb/999_init.sh @@ -20,8 +20,22 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL class CLASS NOT NULL, exchange EXCHANGE NOT NULL, trading BOOLEAN NOT NULL DEFAULT FALSE, - date_added TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW() + date_added TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX assets_symbol_idx ON assets (symbol); + + CREATE TABLE bars ( + timestamp TIMESTAMPTZ NOT NULL, + asset_id UUID NOT NULL REFERENCES assets(id), + open DOUBLE PRECISION NOT NULL, + high DOUBLE PRECISION NOT NULL, + low DOUBLE PRECISION NOT NULL, + close DOUBLE PRECISION NOT NULL, + volume DOUBLE PRECISION NOT NULL, + PRIMARY KEY (asset_id, timestamp), + FOREIGN KEY (asset_id) REFERENCES assets(id) + ); + + SELECT create_hypertable('bars', 'timestamp', 'asset_id', 2); EOSQL diff --git a/support/timescaledb/docker-compose.yml b/support/timescaledb/docker-compose.yml index cb8ebfa..5a79a59 100644 --- a/support/timescaledb/docker-compose.yml +++ b/support/timescaledb/docker-compose.yml @@ -8,7 +8,7 @@ services: volumes: - timescaledb-data:/home/postgres/pgdata/data - timescaledb-logs:/home/postgres/pg_log - - ./9999-init.sh:/docker-entrypoint-initdb.d/9999-init.sh + - ./999_init.sh:/docker-entrypoint-initdb.d/999_init.sh environment: - TIMESCALEDB_TELEMETRY=off - POSTGRES_USER=${POSTGRES_USER}