From e26d2b95e7df87ab6588d90ac780a8c4199c15be Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Sat, 9 Sep 2023 17:48:49 +0300 Subject: [PATCH] Add market data backfilling Signed-off-by: Nikolaos Karaolidis --- backend/.dockerignore => .dockerignore | 1 + ...f4ce3a9318721bed46666cd4cd74542bc24ba.json | 71 ++++ ...dcbf852e3e30364c3ea23269f29a331f807be.json | 22 ++ ...3059aa8ddab93e52c95761de2de022af91ad8.json | 78 +++++ ...0712731ae3449ac49d1f278ca4a4b8a2c9497.json | 20 +- ...b495fb273194a8278435cf4ed84028a55dbc0.json | 15 +- ...94b9515cedf85d4b7432142a859638772aaf7.json | 18 +- ...715642e95b6d0a2982792c6ee6853d33c6c5a.json | 18 +- ...1554ca40f35d2c87f3acf67c0d284c693cc8b.json | 18 +- ...857e73ae69df9152ece55a8569698a5b13f8b.json | 18 +- ...8a16634fe25ac2d8ef86084fe5b554e7ce832.json | 18 +- ...b163c3beb6709fbefd8039b9b82f343d5a6c2.json | 22 ++ ...01ac5ca7ccfc1414fc39b28f66beff87e328e.json | 85 +++++ ...e71bf99e5d92a7ee1e5c13090706afde9147c.json | 8 +- backend/Cargo.lock => Cargo.lock | 148 ++++----- backend/Cargo.toml => Cargo.toml | 3 +- backend/Dockerfile => Dockerfile | 4 +- ...2e0981687c4e0ff48ccc538cf06b3bd616c60.json | 36 -- ...5ded2148c9e46409975b7bf0b76f7ba0552e8.json | 32 -- backend/docker-compose.yml | 9 - backend/src/config.rs | 45 --- backend/src/data/calendar.rs | 41 --- backend/src/data/live.rs | 163 --------- backend/src/data/mod.rs | 2 - backend/src/database/assets.rs | 87 ----- backend/src/database/bars.rs | 58 ---- backend/src/database/calendar.rs | 54 --- backend/src/main.rs | 46 --- backend/src/routes/assets.rs | 141 -------- backend/src/routes/mod.rs | 30 -- backend/src/types/api/incoming/mod.rs | 5 - backend/src/types/api/outgoing/mod.rs | 0 backend/src/types/asset.rs | 33 -- backend/src/types/bar.rs | 33 -- backend/src/types/calendar.rs | 18 - backend/src/types/mod.rs | 15 - .../src/types/websocket/incoming/success.rs | 14 - docker-compose.yml | 12 +- backend/log4rs.yaml => log4rs.yaml | 0 src/config.rs | 73 ++++ src/data/historical.rs | 143 ++++++++ src/data/live.rs | 313 ++++++++++++++++++ src/data/mod.rs | 2 + src/database/assets.rs | 92 +++++ src/database/bars.rs | 89 +++++ src/database/bars_filled.rs | 133 ++++++++ {backend/src => src}/database/mod.rs | 2 +- src/main.rs | 49 +++ src/routes/assets.rs | 171 ++++++++++ src/routes/mod.rs | 30 ++ src/time.rs | 34 ++ .../src => src}/types/api/incoming/asset.rs | 2 + src/types/api/incoming/bar.rs | 30 ++ .../types/api/incoming/calendar_date.rs | 2 +- src/types/api/incoming/mod.rs | 7 + .../types/websocket => src/types/api}/mod.rs | 1 - src/types/asset.rs | 37 +++ src/types/bar.rs | 72 ++++ {backend/src => src}/types/class.rs | 2 +- {backend/src => src}/types/exchange.rs | 2 +- src/types/mod.rs | 22 ++ src/types/source.rs | 31 ++ {backend/src => src}/types/status.rs | 2 +- .../types/websocket/incoming/bar.rs | 2 +- .../types/websocket/incoming/mod.rs | 14 +- .../types/websocket/incoming/subscription.rs | 4 +- src/types/websocket/incoming/success.rs | 14 + .../types/api => src/types/websocket}/mod.rs | 0 .../types/websocket/outgoing/auth.rs | 6 +- .../types/websocket/outgoing/mod.rs | 11 +- .../types/websocket/outgoing/subscribe.rs | 11 +- support/timescaledb/999_init.sh | 47 ++- 72 files changed, 1847 insertions(+), 1044 deletions(-) rename backend/.dockerignore => .dockerignore (96%) create mode 100644 .sqlx/query-073ee42ebcc5a5dffd34abaf3e1f4ce3a9318721bed46666cd4cd74542bc24ba.json create mode 100644 .sqlx/query-08656bb2e5424ab67014cc40c6cdcbf852e3e30364c3ea23269f29a331f807be.json create mode 100644 .sqlx/query-26f7ea563429e395d270cfae4993059aa8ddab93e52c95761de2de022af91ad8.json rename backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json => .sqlx/query-503ed46c4f7f8bb7d418a101ed80712731ae3449ac49d1f278ca4a4b8a2c9497.json (74%) rename backend/.sqlx/query-b940befc2fbef48069c41f18485a2b6b3e523ee3106af735235701a5a151a29f.json => .sqlx/query-615dcbdc8f624ee990566b21f61b495fb273194a8278435cf4ed84028a55dbc0.json (63%) rename backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json => .sqlx/query-742fef7dab68fe792675866c57394b9515cedf85d4b7432142a859638772aaf7.json (76%) rename backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json => .sqlx/query-7adf5172b6a3c8641f2a9fee848715642e95b6d0a2982792c6ee6853d33c6c5a.json (73%) rename backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json => .sqlx/query-8e8173b6e769fe9299a33a6e62a1554ca40f35d2c87f3acf67c0d284c693cc8b.json (75%) rename backend/.sqlx/query-d1e9b79a4bb2651b4dde42770576a2776f5881039c8f17c04747770a5bf97214.json => .sqlx/query-9d1121766d12528f51b3352d2fe857e73ae69df9152ece55a8569698a5b13f8b.json (75%) rename backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json => .sqlx/query-cb8a317dff39b7624abc7e78d6a8a16634fe25ac2d8ef86084fe5b554e7ce832.json (73%) create mode 100644 .sqlx/query-e594f833a1e3435039c6e28e2c6b163c3beb6709fbefd8039b9b82f343d5a6c2.json create mode 100644 .sqlx/query-e7d8b69f3f4eede80c1ce1451e301ac5ca7ccfc1414fc39b28f66beff87e328e.json rename {backend/.sqlx => .sqlx}/query-ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c.json (97%) rename backend/Cargo.lock => Cargo.lock (96%) rename backend/Cargo.toml => Cargo.toml (95%) rename backend/Dockerfile => Dockerfile (81%) delete mode 100644 backend/.sqlx/query-8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60.json delete mode 100644 backend/.sqlx/query-b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8.json delete mode 100644 backend/docker-compose.yml delete mode 100644 backend/src/config.rs delete mode 100644 backend/src/data/calendar.rs delete mode 100644 backend/src/data/live.rs delete mode 100644 backend/src/data/mod.rs delete mode 100644 backend/src/database/assets.rs delete mode 100644 backend/src/database/bars.rs delete mode 100644 backend/src/database/calendar.rs delete mode 100644 backend/src/main.rs delete mode 100644 backend/src/routes/assets.rs delete mode 100644 backend/src/routes/mod.rs delete mode 100644 backend/src/types/api/incoming/mod.rs delete mode 100644 backend/src/types/api/outgoing/mod.rs delete mode 100644 backend/src/types/asset.rs delete mode 100644 backend/src/types/bar.rs delete mode 100644 backend/src/types/calendar.rs delete mode 100644 backend/src/types/mod.rs delete mode 100644 backend/src/types/websocket/incoming/success.rs rename backend/log4rs.yaml => log4rs.yaml (100%) create mode 100644 src/config.rs create mode 100644 src/data/historical.rs create mode 100644 src/data/live.rs create mode 100644 src/data/mod.rs create mode 100644 src/database/assets.rs create mode 100644 src/database/bars.rs create mode 100644 src/database/bars_filled.rs rename {backend/src => src}/database/mod.rs (58%) create mode 100644 src/main.rs create mode 100644 src/routes/assets.rs create mode 100644 src/routes/mod.rs create mode 100644 src/time.rs rename {backend/src => src}/types/api/incoming/asset.rs (91%) create mode 100644 src/types/api/incoming/bar.rs rename backend/src/types/api/incoming/calendar.rs => src/types/api/incoming/calendar_date.rs (95%) create mode 100644 src/types/api/incoming/mod.rs rename {backend/src/types/websocket => src/types/api}/mod.rs (50%) create mode 100644 src/types/asset.rs create mode 100644 src/types/bar.rs rename {backend/src => src}/types/class.rs (74%) rename {backend/src => src}/types/exchange.rs (88%) create mode 100644 src/types/mod.rs create mode 100644 src/types/source.rs rename {backend/src => src}/types/status.rs (75%) rename {backend/src => src}/types/websocket/incoming/bar.rs (96%) rename {backend/src => src}/types/websocket/incoming/mod.rs (56%) rename {backend/src => src}/types/websocket/incoming/subscription.rs (85%) create mode 100644 src/types/websocket/incoming/success.rs rename {backend/src/types/api => src/types/websocket}/mod.rs (100%) rename {backend/src => src}/types/websocket/outgoing/auth.rs (56%) rename {backend/src => src}/types/websocket/outgoing/mod.rs (57%) rename {backend/src => src}/types/websocket/outgoing/subscribe.rs (58%) diff --git a/backend/.dockerignore b/.dockerignore similarity index 96% rename from backend/.dockerignore rename to .dockerignore index 6ed4352..6b926a0 100644 --- a/backend/.dockerignore +++ b/.dockerignore @@ -12,3 +12,4 @@ target/ .env* Dockerfile .dockerignore +support/ diff --git a/.sqlx/query-073ee42ebcc5a5dffd34abaf3e1f4ce3a9318721bed46666cd4cd74542bc24ba.json b/.sqlx/query-073ee42ebcc5a5dffd34abaf3e1f4ce3a9318721bed46666cd4cd74542bc24ba.json new file mode 100644 index 0000000..7d4f6d4 --- /dev/null +++ b/.sqlx/query-073ee42ebcc5a5dffd34abaf3e1f4ce3a9318721bed46666cd4cd74542bc24ba.json @@ -0,0 +1,71 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT * FROM bars WHERE asset_symbol = $1 AND timestamp > $2 ORDER BY timestamp ASC", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "timestamp", + "type_info": "Timestamptz" + }, + { + "ordinal": 1, + "name": "asset_symbol", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "open", + "type_info": "Float8" + }, + { + "ordinal": 3, + "name": "high", + "type_info": "Float8" + }, + { + "ordinal": 4, + "name": "low", + "type_info": "Float8" + }, + { + "ordinal": 5, + "name": "close", + "type_info": "Float8" + }, + { + "ordinal": 6, + "name": "volume", + "type_info": "Float8" + }, + { + "ordinal": 7, + "name": "num_trades", + "type_info": "Int8" + }, + { + "ordinal": 8, + "name": "volume_weighted", + "type_info": "Float8" + } + ], + "parameters": { + "Left": [ + "Text", + "Timestamptz" + ] + }, + "nullable": [ + false, + false, + true, + true, + true, + true, + false, + false, + false + ] + }, + "hash": "073ee42ebcc5a5dffd34abaf3e1f4ce3a9318721bed46666cd4cd74542bc24ba" +} diff --git a/.sqlx/query-08656bb2e5424ab67014cc40c6cdcbf852e3e30364c3ea23269f29a331f807be.json b/.sqlx/query-08656bb2e5424ab67014cc40c6cdcbf852e3e30364c3ea23269f29a331f807be.json new file mode 100644 index 0000000..e3708d1 --- /dev/null +++ b/.sqlx/query-08656bb2e5424ab67014cc40c6cdcbf852e3e30364c3ea23269f29a331f807be.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)\n ON CONFLICT (timestamp, asset_symbol) DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Text", + "Float8", + "Float8", + "Float8", + "Float8", + "Float8", + "Int8", + "Float8" + ] + }, + "nullable": [] + }, + "hash": "08656bb2e5424ab67014cc40c6cdcbf852e3e30364c3ea23269f29a331f807be" +} diff --git a/.sqlx/query-26f7ea563429e395d270cfae4993059aa8ddab93e52c95761de2de022af91ad8.json b/.sqlx/query-26f7ea563429e395d270cfae4993059aa8ddab93e52c95761de2de022af91ad8.json new file mode 100644 index 0000000..41a238b --- /dev/null +++ b/.sqlx/query-26f7ea563429e395d270cfae4993059aa8ddab93e52c95761de2de022af91ad8.json @@ -0,0 +1,78 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)\n ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9\n RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "timestamp", + "type_info": "Timestamptz" + }, + { + "ordinal": 1, + "name": "asset_symbol", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "open", + "type_info": "Float8" + }, + { + "ordinal": 3, + "name": "high", + "type_info": "Float8" + }, + { + "ordinal": 4, + "name": "low", + "type_info": "Float8" + }, + { + "ordinal": 5, + "name": "close", + "type_info": "Float8" + }, + { + "ordinal": 6, + "name": "volume", + "type_info": "Float8" + }, + { + "ordinal": 7, + "name": "num_trades", + "type_info": "Int8" + }, + { + "ordinal": 8, + "name": "volume_weighted", + "type_info": "Float8" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Text", + "Float8", + "Float8", + "Float8", + "Float8", + "Float8", + "Int8", + "Float8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "26f7ea563429e395d270cfae4993059aa8ddab93e52c95761de2de022af91ad8" +} diff --git a/backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json b/.sqlx/query-503ed46c4f7f8bb7d418a101ed80712731ae3449ac49d1f278ca4a4b8a2c9497.json similarity index 74% rename from backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json rename to .sqlx/query-503ed46c4f7f8bb7d418a101ed80712731ae3449ac49d1f278ca4a4b8a2c9497.json index 985456a..13beaf7 100644 --- a/backend/.sqlx/query-987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469.json +++ b/.sqlx/query-503ed46c4f7f8bb7d418a101ed80712731ae3449ac49d1f278ca4a4b8a2c9497.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5)\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "query": "INSERT INTO assets (symbol, class, exchange, trading, timestamp_added, timestamp_first, timestamp_last) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5, $6, $7)\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, timestamp_added, timestamp_first, timestamp_last", "describe": { "columns": [ { @@ -51,7 +51,17 @@ }, { "ordinal": 4, - "name": "date_added", + "name": "timestamp_added", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "timestamp_first", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "timestamp_last", "type_info": "Timestamptz" } ], @@ -87,6 +97,8 @@ } }, "Bool", + "Timestamptz", + "Timestamptz", "Timestamptz" ] }, @@ -95,8 +107,10 @@ false, false, false, + false, + false, false ] }, - "hash": "987795db0b392cb0a44effbd2307eae7f3eaa3147ac5b5e616471ea293cb6469" + "hash": "503ed46c4f7f8bb7d418a101ed80712731ae3449ac49d1f278ca4a4b8a2c9497" } diff --git a/backend/.sqlx/query-b940befc2fbef48069c41f18485a2b6b3e523ee3106af735235701a5a151a29f.json b/.sqlx/query-615dcbdc8f624ee990566b21f61b495fb273194a8278435cf4ed84028a55dbc0.json similarity index 63% rename from backend/.sqlx/query-b940befc2fbef48069c41f18485a2b6b3e523ee3106af735235701a5a151a29f.json rename to .sqlx/query-615dcbdc8f624ee990566b21f61b495fb273194a8278435cf4ed84028a55dbc0.json index e8f260a..1222540 100644 --- a/backend/.sqlx/query-b940befc2fbef48069c41f18485a2b6b3e523ee3106af735235701a5a151a29f.json +++ b/.sqlx/query-615dcbdc8f624ee990566b21f61b495fb273194a8278435cf4ed84028a55dbc0.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted)\n SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[])\n RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted", + "query": "SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1", "describe": { "columns": [ { @@ -51,15 +51,8 @@ ], "parameters": { "Left": [ - "TimestamptzArray", - "TextArray", - "Float8Array", - "Float8Array", - "Float8Array", - "Float8Array", - "Float8Array", - "Int8Array", - "Float8Array" + "Timestamptz", + "Text" ] }, "nullable": [ @@ -74,5 +67,5 @@ false ] }, - "hash": "b940befc2fbef48069c41f18485a2b6b3e523ee3106af735235701a5a151a29f" + "hash": "615dcbdc8f624ee990566b21f61b495fb273194a8278435cf4ed84028a55dbc0" } diff --git a/backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json b/.sqlx/query-742fef7dab68fe792675866c57394b9515cedf85d4b7432142a859638772aaf7.json similarity index 76% rename from backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json rename to .sqlx/query-742fef7dab68fe792675866c57394b9515cedf85d4b7432142a859638772aaf7.json index a6d4765..d299ddc 100644 --- a/backend/.sqlx/query-515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440.json +++ b/.sqlx/query-742fef7dab68fe792675866c57394b9515cedf85d4b7432142a859638772aaf7.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "DELETE FROM assets WHERE symbol = $1\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "query": "DELETE FROM assets WHERE symbol = $1\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, timestamp_added, timestamp_first, timestamp_last", "describe": { "columns": [ { @@ -51,7 +51,17 @@ }, { "ordinal": 4, - "name": "date_added", + "name": "timestamp_added", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "timestamp_first", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "timestamp_last", "type_info": "Timestamptz" } ], @@ -65,8 +75,10 @@ false, false, false, + false, + false, false ] }, - "hash": "515943b639b1a5cf24a9bbc1274aa36045ebe6a2d19d925bc490f606ff01b440" + "hash": "742fef7dab68fe792675866c57394b9515cedf85d4b7432142a859638772aaf7" } diff --git a/backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json b/.sqlx/query-7adf5172b6a3c8641f2a9fee848715642e95b6d0a2982792c6ee6853d33c6c5a.json similarity index 73% rename from backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json rename to .sqlx/query-7adf5172b6a3c8641f2a9fee848715642e95b6d0a2982792c6ee6853d33c6c5a.json index f24ff16..3475bad 100644 --- a/backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json +++ b/.sqlx/query-7adf5172b6a3c8641f2a9fee848715642e95b6d0a2982792c6ee6853d33c6c5a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE symbol = $1", + "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, timestamp_added, timestamp_first, timestamp_last FROM assets WHERE symbol = $1", "describe": { "columns": [ { @@ -51,7 +51,17 @@ }, { "ordinal": 4, - "name": "date_added", + "name": "timestamp_added", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "timestamp_first", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "timestamp_last", "type_info": "Timestamptz" } ], @@ -65,8 +75,10 @@ false, false, false, + false, + false, false ] }, - "hash": "2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa" + "hash": "7adf5172b6a3c8641f2a9fee848715642e95b6d0a2982792c6ee6853d33c6c5a" } diff --git a/backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json b/.sqlx/query-8e8173b6e769fe9299a33a6e62a1554ca40f35d2c87f3acf67c0d284c693cc8b.json similarity index 75% rename from backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json rename to .sqlx/query-8e8173b6e769fe9299a33a6e62a1554ca40f35d2c87f3acf67c0d284c693cc8b.json index 8a64b8b..4304086 100644 --- a/backend/.sqlx/query-cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2.json +++ b/.sqlx/query-8e8173b6e769fe9299a33a6e62a1554ca40f35d2c87f3acf67c0d284c693cc8b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE assets SET trading = $1 WHERE symbol = $2\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "query": "UPDATE assets SET trading = $1 WHERE symbol = $2\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, timestamp_added, timestamp_first, timestamp_last", "describe": { "columns": [ { @@ -51,7 +51,17 @@ }, { "ordinal": 4, - "name": "date_added", + "name": "timestamp_added", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "timestamp_first", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "timestamp_last", "type_info": "Timestamptz" } ], @@ -66,8 +76,10 @@ false, false, false, + false, + false, false ] }, - "hash": "cc23c11a827e26e7c68a35c7ae5044071e3750f6d9ddee8cdc2e29f3f207e2f2" + "hash": "8e8173b6e769fe9299a33a6e62a1554ca40f35d2c87f3acf67c0d284c693cc8b" } diff --git a/backend/.sqlx/query-d1e9b79a4bb2651b4dde42770576a2776f5881039c8f17c04747770a5bf97214.json b/.sqlx/query-9d1121766d12528f51b3352d2fe857e73ae69df9152ece55a8569698a5b13f8b.json similarity index 75% rename from backend/.sqlx/query-d1e9b79a4bb2651b4dde42770576a2776f5881039c8f17c04747770a5bf97214.json rename to .sqlx/query-9d1121766d12528f51b3352d2fe857e73ae69df9152ece55a8569698a5b13f8b.json index 85365cd..97438d3 100644 --- a/backend/.sqlx/query-d1e9b79a4bb2651b4dde42770576a2776f5881039c8f17c04747770a5bf97214.json +++ b/.sqlx/query-9d1121766d12528f51b3352d2fe857e73ae69df9152ece55a8569698a5b13f8b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = $1::CLASS", + "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, timestamp_added, timestamp_first, timestamp_last FROM assets WHERE class = $1::CLASS", "describe": { "columns": [ { @@ -51,7 +51,17 @@ }, { "ordinal": 4, - "name": "date_added", + "name": "timestamp_added", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "timestamp_first", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "timestamp_last", "type_info": "Timestamptz" } ], @@ -75,8 +85,10 @@ false, false, false, + false, + false, false ] }, - "hash": "d1e9b79a4bb2651b4dde42770576a2776f5881039c8f17c04747770a5bf97214" + "hash": "9d1121766d12528f51b3352d2fe857e73ae69df9152ece55a8569698a5b13f8b" } diff --git a/backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json b/.sqlx/query-cb8a317dff39b7624abc7e78d6a8a16634fe25ac2d8ef86084fe5b554e7ce832.json similarity index 73% rename from backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json rename to .sqlx/query-cb8a317dff39b7624abc7e78d6a8a16634fe25ac2d8ef86084fe5b554e7ce832.json index d234a02..6cce28b 100644 --- a/backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json +++ b/.sqlx/query-cb8a317dff39b7624abc7e78d6a8a16634fe25ac2d8ef86084fe5b554e7ce832.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets", + "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, timestamp_added, timestamp_first, timestamp_last FROM assets", "describe": { "columns": [ { @@ -51,7 +51,17 @@ }, { "ordinal": 4, - "name": "date_added", + "name": "timestamp_added", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "timestamp_first", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "timestamp_last", "type_info": "Timestamptz" } ], @@ -63,8 +73,10 @@ false, false, false, + false, + false, false ] }, - "hash": "48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f" + "hash": "cb8a317dff39b7624abc7e78d6a8a16634fe25ac2d8ef86084fe5b554e7ce832" } diff --git a/.sqlx/query-e594f833a1e3435039c6e28e2c6b163c3beb6709fbefd8039b9b82f343d5a6c2.json b/.sqlx/query-e594f833a1e3435039c6e28e2c6b163c3beb6709fbefd8039b9b82f343d5a6c2.json new file mode 100644 index 0000000..3ce0d60 --- /dev/null +++ b/.sqlx/query-e594f833a1e3435039c6e28e2c6b163c3beb6709fbefd8039b9b82f343d5a6c2.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)\n ON CONFLICT (timestamp, asset_symbol) DO NOTHING", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Text", + "Float8", + "Float8", + "Float8", + "Float8", + "Float8", + "Int8", + "Float8" + ] + }, + "nullable": [] + }, + "hash": "e594f833a1e3435039c6e28e2c6b163c3beb6709fbefd8039b9b82f343d5a6c2" +} diff --git a/.sqlx/query-e7d8b69f3f4eede80c1ce1451e301ac5ca7ccfc1414fc39b28f66beff87e328e.json b/.sqlx/query-e7d8b69f3f4eede80c1ce1451e301ac5ca7ccfc1414fc39b28f66beff87e328e.json new file mode 100644 index 0000000..1e6544c --- /dev/null +++ b/.sqlx/query-e7d8b69f3f4eede80c1ce1451e301ac5ca7ccfc1414fc39b28f66beff87e328e.json @@ -0,0 +1,85 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE assets SET timestamp_last = $1 WHERE symbol = $2\n RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, timestamp_added, timestamp_first, timestamp_last", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "symbol", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "class: Class", + "type_info": { + "Custom": { + "name": "class", + "kind": { + "Enum": [ + "us_equity", + "crypto" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "exchange: Exchange", + "type_info": { + "Custom": { + "name": "exchange", + "kind": { + "Enum": [ + "AMEX", + "ARCA", + "BATS", + "NASDAQ", + "NYSE", + "NYSEARCA", + "OTC", + "CRYPTO" + ] + } + } + } + }, + { + "ordinal": 3, + "name": "trading", + "type_info": "Bool" + }, + { + "ordinal": 4, + "name": "timestamp_added", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "timestamp_first", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "timestamp_last", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Timestamptz", + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "e7d8b69f3f4eede80c1ce1451e301ac5ca7ccfc1414fc39b28f66beff87e328e" +} diff --git a/backend/.sqlx/query-ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c.json b/.sqlx/query-ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c.json similarity index 97% rename from backend/.sqlx/query-ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c.json rename to .sqlx/query-ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c.json index 9c45d27..7be0803 100644 --- a/backend/.sqlx/query-ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c.json +++ b/.sqlx/query-ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c.json @@ -65,10 +65,10 @@ "nullable": [ false, false, - false, - false, - false, - false, + true, + true, + true, + true, false, false, false diff --git a/backend/Cargo.lock b/Cargo.lock similarity index 96% rename from backend/Cargo.lock rename to Cargo.lock index 7462155..39629c7 100644 --- a/backend/Cargo.lock +++ b/Cargo.lock @@ -70,7 +70,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -137,26 +137,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "backend" -version = "0.1.0" -dependencies = [ - "axum", - "dotenv", - "futures-util", - "governor", - "http", - "log", - "log4rs", - "reqwest", - "serde", - "serde_json", - "sqlx", - "time 0.3.28", - "tokio", - "tokio-tungstenite", -] - [[package]] name = "backtrace" version = "0.3.69" @@ -222,9 +202,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" [[package]] name = "cc" @@ -243,15 +223,14 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.28" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ed24df0632f708f5f6d8082675bef2596f7084dee3dd55f632290bf35bfe0f" +checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877" dependencies = [ "android-tzdata", "iana-time-zone", "js-sys", "num-traits", - "time 0.1.45", "wasm-bindgen", "windows-targets", ] @@ -479,6 +458,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +[[package]] +name = "finl_unicode" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" + [[package]] name = "flume" version = "0.10.14" @@ -588,7 +573,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -645,7 +630,7 @@ checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -902,9 +887,9 @@ checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" [[package]] name = "itertools" -version = "0.10.5" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" dependencies = [ "either", ] @@ -1045,9 +1030,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.2" +version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5486aed0026218e61b8a01d5fbd5a0a134649abb71a0e53b7bc088529dced86e" +checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "mime" @@ -1077,7 +1062,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "windows-sys", ] @@ -1181,9 +1166,9 @@ dependencies = [ [[package]] name = "object" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ac5bbd07aea88c60a577a1ce218075ffd59208b2d7ca97adf9bfc5aeb21ebe" +checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" dependencies = [ "memchr", ] @@ -1217,7 +1202,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1228,9 +1213,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.92" +version = "0.9.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db7e971c2c2bba161b2d2fdf37080177eff520b3bc044787c7f1f5f9e78d869b" +checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d" dependencies = [ "cc", "libc", @@ -1308,7 +1293,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1365,6 +1350,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "qrust" +version = "0.1.0" +dependencies = [ + "axum", + "dotenv", + "futures-util", + "governor", + "http", + "indexmap 2.0.0", + "log", + "log4rs", + "reqwest", + "serde", + "serde_json", + "sqlx", + "time", + "tokio", + "tokio-tungstenite", +] + [[package]] name = "quanta" version = "0.11.1" @@ -1376,7 +1382,7 @@ dependencies = [ "mach2", "once_cell", "raw-cpuid", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "web-sys", "winapi", ] @@ -1514,9 +1520,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.10" +version = "0.38.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed6248e1caa625eb708e266e06159f135e8c26f2bb7ceb72dc4b2766d0340964" +checksum = "c0c3dde1fc030af041adc40e79c0e7fbcf431dd24870053d187d7c66e4b87453" dependencies = [ "bitflags 2.4.0", "errno", @@ -1602,7 +1608,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -1744,9 +1750,9 @@ dependencies = [ [[package]] name = "sqlformat" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" +checksum = "6b7b278788e7be4d0d29c0f39497a0eef3fba6bbc8e70d8bf7fde46edeaa9e85" dependencies = [ "itertools", "nom", @@ -1800,7 +1806,7 @@ dependencies = [ "smallvec", "sqlformat", "thiserror", - "time 0.3.28", + "time", "tokio", "tokio-stream", "tracing", @@ -1885,7 +1891,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", - "time 0.3.28", + "time", "tracing", "uuid", "whoami", @@ -1926,7 +1932,7 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror", - "time 0.3.28", + "time", "tracing", "uuid", "whoami", @@ -1950,7 +1956,7 @@ dependencies = [ "percent-encoding", "serde", "sqlx-core", - "time 0.3.28", + "time", "tracing", "url", "uuid", @@ -1958,10 +1964,11 @@ dependencies = [ [[package]] name = "stringprep" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db3737bde7edce97102e0e2b15365bf7a20bfdb5f60f4f9e8d7004258a51a8da" +checksum = "bb41d74e231a107a1b4ee36bd1214b11285b77768d2e3824aedafa988fd36ee6" dependencies = [ + "finl_unicode", "unicode-bidi", "unicode-normalization", ] @@ -1985,9 +1992,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.29" +version = "2.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" +checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398" dependencies = [ "proc-macro2", "quote", @@ -2015,22 +2022,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.47" +version = "1.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2044,17 +2051,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "time" -version = "0.1.45" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", -] - [[package]] name = "time" version = "0.3.28" @@ -2123,7 +2119,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2224,7 +2220,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", ] [[package]] @@ -2363,12 +2359,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2396,7 +2386,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", "wasm-bindgen-shared", ] @@ -2430,7 +2420,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.31", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/backend/Cargo.toml b/Cargo.toml similarity index 95% rename from backend/Cargo.toml rename to Cargo.toml index f63deda..f6bdc58 100644 --- a/backend/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "backend" +name = "qrust" version = "0.1.0" edition = "2021" @@ -42,3 +42,4 @@ tokio-tungstenite = { version = "0.20.0", features = [ ] } http = "0.2.9" governor = "0.6.0" +indexmap = "2.0.0" diff --git a/backend/Dockerfile b/Dockerfile similarity index 81% rename from backend/Dockerfile rename to Dockerfile index 106f778..e516912 100644 --- a/backend/Dockerfile +++ b/Dockerfile @@ -13,11 +13,11 @@ RUN rm -rf src COPY . . RUN cargo build --release -FROM alpine AS backend +FROM alpine AS qrust WORKDIR /usr/src/qrust -COPY --from=builder /usr/src/qrust/target/release/backend . +COPY --from=builder /usr/src/qrust/target/release/qrust . COPY log4rs.yaml . EXPOSE 7878 diff --git a/backend/.sqlx/query-8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60.json b/backend/.sqlx/query-8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60.json deleted file mode 100644 index 7a89b3f..0000000 --- a/backend/.sqlx/query-8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO calendar (date, open, close)\n SELECT * FROM UNNEST($1::date[], $2::time[], $3::time[])\n RETURNING date, open, close", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "date", - "type_info": "Date" - }, - { - "ordinal": 1, - "name": "open", - "type_info": "Time" - }, - { - "ordinal": 2, - "name": "close", - "type_info": "Time" - } - ], - "parameters": { - "Left": [ - "DateArray", - "TimeArray", - "TimeArray" - ] - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "8d268f6532ab7fbad0b31286d3c2e0981687c4e0ff48ccc538cf06b3bd616c60" -} diff --git a/backend/.sqlx/query-b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8.json b/backend/.sqlx/query-b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8.json deleted file mode 100644 index c206585..0000000 --- a/backend/.sqlx/query-b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM calendar RETURNING date, open, close", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "date", - "type_info": "Date" - }, - { - "ordinal": 1, - "name": "open", - "type_info": "Time" - }, - { - "ordinal": 2, - "name": "close", - "type_info": "Time" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "b3fbaff539723326ac5599b9ef25ded2148c9e46409975b7bf0b76f7ba0552e8" -} diff --git a/backend/docker-compose.yml b/backend/docker-compose.yml deleted file mode 100644 index dbae8c7..0000000 --- a/backend/docker-compose.yml +++ /dev/null @@ -1,9 +0,0 @@ -services: - backend: - build: - context: . - dockerfile: Dockerfile - hostname: backend - restart: unless-stopped - ports: - - 7878:7878 diff --git a/backend/src/config.rs b/backend/src/config.rs deleted file mode 100644 index 5b44f9d..0000000 --- a/backend/src/config.rs +++ /dev/null @@ -1,45 +0,0 @@ -use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; -use http::HeaderMap; -use reqwest::Client; -use sqlx::{postgres::PgPoolOptions, PgPool}; -use std::{env, sync::Arc}; - -pub struct AppConfig { - pub alpaca_api_key: String, - pub alpaca_api_secret: String, - pub alpaca_client: Client, - pub alpaca_rate_limit: DefaultDirectRateLimiter, - pub postgres_pool: PgPool, -} - -const NUM_CLIENTS: usize = 10; - -impl AppConfig { - pub async fn from_env() -> Result> { - let alpaca_api_key = env::var("ALPACA_API_KEY")?; - let alpaca_api_secret = env::var("ALPACA_API_SECRET")?; - let alpaca_rate_limit = env::var("ALPACA_RATE_LIMIT")?; - - Ok(AppConfig { - alpaca_api_key: alpaca_api_key.clone(), - alpaca_api_secret: alpaca_api_secret.clone(), - alpaca_client: Client::builder() - .default_headers({ - let mut headers = HeaderMap::new(); - headers.insert("APCA-API-KEY-ID", alpaca_api_key.parse()?); - headers.insert("APCA-API-SECRET-KEY", alpaca_api_secret.parse()?); - headers - }) - .build()?, - alpaca_rate_limit: RateLimiter::direct(Quota::per_minute(alpaca_rate_limit.parse()?)), - postgres_pool: PgPoolOptions::new() - .max_connections(NUM_CLIENTS as u32) - .connect(&env::var("DATABASE_URL")?) - .await?, - }) - } - - pub async fn arc_from_env() -> Result, Box> { - Ok(Arc::new(AppConfig::from_env().await?)) - } -} diff --git a/backend/src/data/calendar.rs b/backend/src/data/calendar.rs deleted file mode 100644 index c856dff..0000000 --- a/backend/src/data/calendar.rs +++ /dev/null @@ -1,41 +0,0 @@ -use crate::{ - config::AppConfig, - database, - types::{api, CalendarDate}, -}; -use log::info; -use std::{error::Error, sync::Arc, time::Duration}; -use tokio::time::interval; - -const ALPACA_CALENDAR_API_URL: &str = "https://api.alpaca.markets/v2/calendar"; -const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 3); -const EARLIEST_DATE: &str = "1970-01-01"; -const LATEST_DATE: &str = "2029-12-31"; - -pub async fn run(app_config: Arc) -> Result<(), Box> { - let mut interval = interval(REFRESH_INTERVAL); - - loop { - interval.tick().await; - - info!("Refreshing calendar..."); - - app_config.alpaca_rate_limit.until_ready().await; - let calendar_dates = app_config - .alpaca_client - .get(ALPACA_CALENDAR_API_URL) - .query(&[("start", EARLIEST_DATE), ("end", LATEST_DATE)]) - .send() - .await? - .json::>() - .await? - .iter() - .map(CalendarDate::from) - .collect::>(); - - database::calendar::reset_calendar_dates(&app_config.postgres_pool, &calendar_dates) - .await?; - - info!("Refreshed calendar."); - } -} diff --git a/backend/src/data/live.rs b/backend/src/data/live.rs deleted file mode 100644 index ea5088c..0000000 --- a/backend/src/data/live.rs +++ /dev/null @@ -1,163 +0,0 @@ -use crate::{ - config::AppConfig, - database::{assets::get_assets_with_class, bars::add_bar}, - types::{ - websocket::{ - incoming::{IncomingMessage, SuccessMessage, SuccessMessageType}, - outgoing::{AuthMessage, OutgoingMessage, SubscribeMessage}, - }, - AssetBroadcastMessage, Bar, Class, - }, -}; -use core::panic; -use futures_util::{ - stream::{SplitSink, SplitStream}, - SinkExt, StreamExt, -}; -use log::{debug, error, info, warn}; -use serde_json::{from_str, to_string}; -use std::{error::Error, sync::Arc}; -use tokio::{ - net::TcpStream, - spawn, - sync::{broadcast::Receiver, RwLock}, -}; -use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; - -const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2/iex"; -const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; - -pub async fn run( - class: Class, - app_config: Arc, - asset_broadcast_receiver: Receiver, -) -> Result<(), Box> { - let websocket_url = match class { - Class::UsEquity => ALPACA_STOCK_WEBSOCKET_URL, - Class::Crypto => ALPACA_CRYPTO_WEBSOCKET_URL, - }; - - let (stream, _) = connect_async(websocket_url).await?; - let (mut sink, mut stream) = stream.split(); - - match stream.next().await { - Some(Ok(Message::Text(data))) - if from_str::>(&data)?.get(0) - == Some(&IncomingMessage::Success(SuccessMessage { - msg: SuccessMessageType::Connected, - })) => {} - _ => panic!(), - } - - sink.send(Message::Text(to_string(&OutgoingMessage::Auth( - AuthMessage::new( - app_config.alpaca_api_key.clone(), - app_config.alpaca_api_secret.clone(), - ), - ))?)) - .await?; - - match stream.next().await { - Some(Ok(Message::Text(data))) - if from_str::>(&data)?.get(0) - == Some(&IncomingMessage::Success(SuccessMessage { - msg: SuccessMessageType::Authenticated, - })) => {} - _ => panic!(), - } - - let symbols = get_assets_with_class(&app_config.postgres_pool, &class) - .await? - .into_iter() - .map(|asset| asset.symbol) - .collect::>(); - - if !symbols.is_empty() { - sink.send(Message::Text(to_string(&OutgoingMessage::Subscribe( - SubscribeMessage::from_vec(symbols), - ))?)) - .await?; - } - - let sink = Arc::new(RwLock::new(sink)); - - info!("Running live data thread for {:?}.", class); - - spawn(broadcast_handler( - class, - sink.clone(), - asset_broadcast_receiver, - )); - - websocket_handler(app_config, class, sink, stream).await?; - - unreachable!() -} - -pub async fn websocket_handler( - app_config: Arc, - class: Class, - sink: Arc>, Message>>>, - mut stream: SplitStream>>, -) -> Result<(), Box> { - loop { - match stream.next().await { - Some(Ok(Message::Text(data))) => match from_str::>(&data) { - Ok(parsed_data) => { - for message in parsed_data { - match message { - IncomingMessage::Subscription(subscription_message) => { - info!( - "Current {:?} subscriptions: {:?}", - class, subscription_message.bars - ); - } - IncomingMessage::Bars(bar_message) - | IncomingMessage::UpdatedBars(bar_message) => { - debug!("Incoming bar: {:?}", bar_message); - add_bar(&app_config.postgres_pool, &Bar::from(bar_message)).await?; - } - message => { - warn!("Unhandled incoming message: {:?}", message); - } - } - } - } - Err(e) => { - warn!("Unparsed incoming message: {:?}: {}", data, e); - } - }, - Some(Ok(Message::Ping(_))) => sink.write().await.send(Message::Pong(vec![])).await?, - Some(unknown) => error!("Unknown incoming message: {:?}", unknown), - None => panic!(), - } - } -} - -pub async fn broadcast_handler( - class: Class, - sink: Arc>, Message>>>, - mut asset_broadcast_receiver: Receiver, -) -> Result<(), Box> { - loop { - match asset_broadcast_receiver.recv().await? { - AssetBroadcastMessage::Added(asset) if asset.class == class => { - sink.write() - .await - .send(Message::Text(serde_json::to_string( - &OutgoingMessage::Subscribe(SubscribeMessage::new(asset.symbol)), - )?)) - .await?; - } - AssetBroadcastMessage::Deleted(asset) if asset.class == class => { - sink.write() - .await - .send(Message::Text(serde_json::to_string( - &OutgoingMessage::Unsubscribe(SubscribeMessage::new(asset.symbol)), - )?)) - .await?; - } - _ => {} - } - } -} diff --git a/backend/src/data/mod.rs b/backend/src/data/mod.rs deleted file mode 100644 index b9b5fb9..0000000 --- a/backend/src/data/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod calendar; -pub mod live; diff --git a/backend/src/database/assets.rs b/backend/src/database/assets.rs deleted file mode 100644 index b93f5c7..0000000 --- a/backend/src/database/assets.rs +++ /dev/null @@ -1,87 +0,0 @@ -use crate::types::{Asset, Class, Exchange}; -use sqlx::{query_as, PgPool}; -use std::error::Error; - -pub async fn get_assets( - postgres_pool: &PgPool, -) -> Result, Box> { - query_as!( - Asset, - r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"# - ) - .fetch_all(postgres_pool) - .await - .map_err(|e| e.into()) -} - -pub async fn get_assets_with_class( - postgres_pool: &PgPool, - class: &Class, -) -> Result, Box> { - query_as!( - Asset, - r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = $1::CLASS"#, &class as &Class - ) - .fetch_all(postgres_pool) - .await - .map_err(|e| e.into()) -} - -pub async fn get_asset( - postgres_pool: &PgPool, - symbol: &str, -) -> Result, Box> { - query_as!( - Asset, - r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol - ) - .fetch_optional(postgres_pool) - .await - .map_err(|e| e.into()) -} - -pub async fn add_asset( - postgres_pool: &PgPool, - asset: &Asset, -) -> Result> { - query_as!( - Asset, - r#"INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5) - RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, - asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added - ) - .fetch_one(postgres_pool) - .await - .map_err(|e| e.into()) -} - -pub async fn update_asset_trading( - postgres_pool: &PgPool, - symbol: &str, - trading: &bool, -) -> Result, Box> { - query_as!( - Asset, - r#"UPDATE assets SET trading = $1 WHERE symbol = $2 - RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, - trading, symbol - ) - .fetch_optional(postgres_pool) - .await - .map_err(|e| e.into()) -} - -pub async fn delete_asset( - postgres_pool: &PgPool, - symbol: &str, -) -> Result, Box> { - Ok(query_as!( - Asset, - r#"DELETE FROM assets WHERE symbol = $1 - RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, - symbol - ) - .fetch_optional(postgres_pool) - .await - .unwrap()) -} diff --git a/backend/src/database/bars.rs b/backend/src/database/bars.rs deleted file mode 100644 index 8cdcbab..0000000 --- a/backend/src/database/bars.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::types::Bar; -use sqlx::{query_as, PgPool}; -use std::error::Error; - -pub async fn add_bar( - postgres_pool: &PgPool, - bar: &Bar, -) -> Result> { - query_as!( - Bar, - r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9 - RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, - bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted - ) - .fetch_one(postgres_pool) - .await - .map_err(|e| e.into()) -} - -#[allow(dead_code)] -pub async fn add_bars( - postgres_pool: &PgPool, - bars: &Vec, -) -> Result, Box> { - let mut timestamps = Vec::with_capacity(bars.len()); - let mut asset_symbols = Vec::with_capacity(bars.len()); - let mut opens = Vec::with_capacity(bars.len()); - let mut highs = Vec::with_capacity(bars.len()); - let mut lows = Vec::with_capacity(bars.len()); - let mut closes = Vec::with_capacity(bars.len()); - let mut volumes = Vec::with_capacity(bars.len()); - let mut num_trades = Vec::with_capacity(bars.len()); - let mut volumes_weighted = Vec::with_capacity(bars.len()); - - for bar in bars { - timestamps.push(bar.timestamp); - asset_symbols.push(bar.asset_symbol.clone()); - opens.push(bar.open); - highs.push(bar.high); - lows.push(bar.low); - closes.push(bar.close); - volumes.push(bar.volume); - num_trades.push(bar.num_trades); - volumes_weighted.push(bar.volume_weighted); - } - - query_as!( - Bar, - r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) - SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[]) - RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, - ×tamps, &asset_symbols, &opens, &highs, &lows, &closes, &volumes, &num_trades, &volumes_weighted - ) - .fetch_all(postgres_pool) - .await - .map_err(|e| e.into()) -} diff --git a/backend/src/database/calendar.rs b/backend/src/database/calendar.rs deleted file mode 100644 index b066438..0000000 --- a/backend/src/database/calendar.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::types::CalendarDate; -use sqlx::{query_as, PgPool}; -use std::error::Error; - -pub async fn add_calendar_dates( - postgres_pool: &PgPool, - calendar_dates: &Vec, -) -> Result, Box> { - let mut dates = Vec::with_capacity(calendar_dates.len()); - let mut opens = Vec::with_capacity(calendar_dates.len()); - let mut closes = Vec::with_capacity(calendar_dates.len()); - - for calendar_date in calendar_dates { - dates.push(calendar_date.date); - opens.push(calendar_date.open); - closes.push(calendar_date.close); - } - - query_as!( - CalendarDate, - r#"INSERT INTO calendar (date, open, close) - SELECT * FROM UNNEST($1::date[], $2::time[], $3::time[]) - RETURNING date, open, close"#, - &dates, - &opens, - &closes - ) - .fetch_all(postgres_pool) - .await - .map_err(|e| e.into()) -} - -pub async fn delete_all_calendar_dates( - postgres_pool: &PgPool, -) -> Result, Box> { - query_as!( - CalendarDate, - "DELETE FROM calendar RETURNING date, open, close" - ) - .fetch_all(postgres_pool) - .await - .map_err(|e| e.into()) -} - -pub async fn reset_calendar_dates( - postgres_pool: &PgPool, - calendar_dates: &Vec, -) -> Result, Box> { - let transaction = postgres_pool.begin().await?; - delete_all_calendar_dates(postgres_pool).await?; - let calendar_dates = add_calendar_dates(postgres_pool, calendar_dates).await; - transaction.commit().await?; - calendar_dates -} diff --git a/backend/src/main.rs b/backend/src/main.rs deleted file mode 100644 index 0bef77b..0000000 --- a/backend/src/main.rs +++ /dev/null @@ -1,46 +0,0 @@ -mod config; -mod data; -mod database; -mod routes; -mod types; - -use config::AppConfig; -use dotenv::dotenv; -use std::error::Error; -use tokio::{spawn, sync::broadcast}; -use types::{AssetBroadcastMessage, Class}; - -#[tokio::main] -async fn main() -> Result<(), Box> { - dotenv().ok(); - log4rs::init_file("log4rs.yaml", Default::default()).unwrap(); - let app_config = AppConfig::arc_from_env().await.unwrap(); - let mut threads = Vec::new(); - - threads.push(spawn(data::calendar::run(app_config.clone()))); - - let (asset_broadcast_sender, _) = broadcast::channel::(100); - - threads.push(spawn(data::live::run( - Class::UsEquity, - app_config.clone(), - asset_broadcast_sender.subscribe(), - ))); - - threads.push(spawn(data::live::run( - Class::Crypto, - app_config.clone(), - asset_broadcast_sender.subscribe(), - ))); - - threads.push(spawn(routes::run( - app_config.clone(), - asset_broadcast_sender, - ))); - - for thread in threads { - thread.await??; - } - - unreachable!() -} diff --git a/backend/src/routes/assets.rs b/backend/src/routes/assets.rs deleted file mode 100644 index af258c6..0000000 --- a/backend/src/routes/assets.rs +++ /dev/null @@ -1,141 +0,0 @@ -use crate::config::AppConfig; -use crate::database; -use crate::database::assets::update_asset_trading; -use crate::types::api; -use crate::types::{Asset, AssetBroadcastMessage, Status}; -use axum::{extract::Path, http::StatusCode, Extension, Json}; -use log::info; -use serde::Deserialize; -use std::sync::Arc; -use tokio::sync::broadcast::Sender; - -const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets"; - -pub async fn get_assets( - Extension(app_config): Extension>, -) -> Result<(StatusCode, Json>), StatusCode> { - let assets = database::assets::get_assets(&app_config.postgres_pool) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - Ok((StatusCode::OK, Json(assets))) -} - -pub async fn get_asset( - Extension(app_config): Extension>, - Path(symbol): Path, -) -> Result<(StatusCode, Json), StatusCode> { - let asset = database::assets::get_asset(&app_config.postgres_pool, &symbol) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - match asset { - Some(asset) => Ok((StatusCode::OK, Json(asset))), - None => Err(StatusCode::NOT_FOUND), - } -} - -#[derive(Deserialize)] -pub struct AddAssetRequest { - symbol: String, - trading: Option, -} - -pub async fn add_asset( - Extension(app_config): Extension>, - Extension(asset_broadcast_sender): Extension>, - Json(request): Json, -) -> Result<(StatusCode, Json), StatusCode> { - if database::assets::get_asset(&app_config.postgres_pool, &request.symbol) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? - .is_some() - { - return Err(StatusCode::CONFLICT); - } - - app_config.alpaca_rate_limit.until_ready().await; - let asset = app_config - .alpaca_client - .get(&format!("{}/{}", ALPACA_ASSET_API_URL, request.symbol)) - .send() - .await - .map_err(|e| match e.status() { - Some(StatusCode::NOT_FOUND) => StatusCode::NOT_FOUND, - Some(StatusCode::FORBIDDEN) => panic!(), - _ => StatusCode::INTERNAL_SERVER_ERROR, - })?; - - let asset = asset.json::().await.unwrap(); - - if asset.status != Status::Active || !asset.tradable || !asset.fractionable { - return Err(StatusCode::FORBIDDEN); - } - - let mut asset = Asset::from(asset); - if let Some(trading) = request.trading { - asset.trading = trading; - } - - database::assets::add_asset(&app_config.postgres_pool, &asset) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - asset_broadcast_sender - .send(AssetBroadcastMessage::Added(asset.clone())) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - info!("Added asset {}.", asset.symbol); - Ok((StatusCode::CREATED, Json(asset))) -} - -#[allow(dead_code)] -#[derive(Deserialize)] -pub struct UpdateAssetRequest { - trading: bool, -} - -pub async fn update_asset( - Extension(app_config): Extension>, - Extension(asset_broadcast_sender): Extension>, - Path(symbol): Path, - Json(request): Json, -) -> Result<(StatusCode, Json), StatusCode> { - let asset = update_asset_trading(&app_config.postgres_pool, &symbol, &request.trading) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - match asset { - Some(asset) => { - asset_broadcast_sender - .send(AssetBroadcastMessage::Updated(asset.clone())) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - info!("Updated asset {}.", symbol); - Ok((StatusCode::OK, Json(asset))) - } - None => Err(StatusCode::NOT_FOUND), - } -} - -pub async fn delete_asset( - Extension(app_config): Extension>, - Extension(asset_broadcast_sender): Extension>, - Path(symbol): Path, -) -> Result { - let asset = database::assets::delete_asset(&app_config.postgres_pool, &symbol) - .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - match asset { - Some(asset) => { - asset_broadcast_sender - .send(AssetBroadcastMessage::Deleted(asset.clone())) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - - info!("Deleted asset {}.", symbol); - Ok(StatusCode::NO_CONTENT) - } - None => Err(StatusCode::NOT_FOUND), - } -} diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs deleted file mode 100644 index b68b63a..0000000 --- a/backend/src/routes/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::{config::AppConfig, types::AssetBroadcastMessage}; -use axum::{ - routing::{delete, get, post}, - Extension, Router, Server, -}; -use log::info; -use std::{net::SocketAddr, sync::Arc}; -use tokio::sync::broadcast::Sender; - -pub mod assets; - -pub async fn run( - app_config: Arc, - asset_broadcast_sender: Sender, -) -> Result<(), Box> { - let app = Router::new() - .route("/assets", get(assets::get_assets)) - .route("/assets/:symbol", get(assets::get_asset)) - .route("/assets", post(assets::add_asset)) - .route("/assets/:symbol", post(assets::update_asset)) - .route("/assets/:symbol", delete(assets::delete_asset)) - .layer(Extension(app_config)) - .layer(Extension(asset_broadcast_sender)); - - let addr = SocketAddr::from(([0, 0, 0, 0], 7878)); - info!("Listening on {}...", addr); - Server::bind(&addr).serve(app.into_make_service()).await?; - - unreachable!() -} diff --git a/backend/src/types/api/incoming/mod.rs b/backend/src/types/api/incoming/mod.rs deleted file mode 100644 index 068f05a..0000000 --- a/backend/src/types/api/incoming/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod asset; -pub mod calendar; - -pub use asset::*; -pub use calendar::*; diff --git a/backend/src/types/api/outgoing/mod.rs b/backend/src/types/api/outgoing/mod.rs deleted file mode 100644 index e69de29..0000000 diff --git a/backend/src/types/asset.rs b/backend/src/types/asset.rs deleted file mode 100644 index cd77b13..0000000 --- a/backend/src/types/asset.rs +++ /dev/null @@ -1,33 +0,0 @@ -use super::{api, class::Class, exchange::Exchange}; -use serde::{Deserialize, Serialize}; -use sqlx::FromRow; -use time::OffsetDateTime; - -#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] -pub struct Asset { - pub symbol: String, - pub class: Class, - pub exchange: Exchange, - pub trading: bool, - pub date_added: OffsetDateTime, -} - -impl From for Asset { - fn from(asset: api::incoming::Asset) -> Self { - Self { - symbol: asset.symbol, - class: asset.class, - exchange: asset.exchange, - trading: asset.tradable, - date_added: OffsetDateTime::now_utc(), - } - } -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum AssetBroadcastMessage { - Added(Asset), - Updated(Asset), - Deleted(Asset), - Reset(Asset), -} diff --git a/backend/src/types/bar.rs b/backend/src/types/bar.rs deleted file mode 100644 index 452d0c2..0000000 --- a/backend/src/types/bar.rs +++ /dev/null @@ -1,33 +0,0 @@ -use super::websocket; -use serde::{Deserialize, Serialize}; -use sqlx::FromRow; -use time::OffsetDateTime; - -#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] -pub struct Bar { - pub timestamp: OffsetDateTime, - pub asset_symbol: String, - pub open: f64, - pub high: f64, - pub low: f64, - pub close: f64, - pub volume: f64, - pub num_trades: i64, - pub volume_weighted: f64, -} - -impl From for Bar { - fn from(bar_message: websocket::incoming::BarMessage) -> Self { - Self { - timestamp: bar_message.timestamp, - asset_symbol: bar_message.symbol, - open: bar_message.open, - high: bar_message.high, - low: bar_message.low, - close: bar_message.close, - volume: bar_message.volume, - num_trades: bar_message.num_trades, - volume_weighted: bar_message.volume_weighted, - } - } -} diff --git a/backend/src/types/calendar.rs b/backend/src/types/calendar.rs deleted file mode 100644 index 0e6dc73..0000000 --- a/backend/src/types/calendar.rs +++ /dev/null @@ -1,18 +0,0 @@ -use super::api; -use time::{Date, Time}; - -pub struct CalendarDate { - pub date: Date, - pub open: Time, - pub close: Time, -} - -impl From<&api::incoming::CalendarDate> for CalendarDate { - fn from(calendar: &api::incoming::CalendarDate) -> Self { - Self { - date: calendar.date, - open: calendar.open, - close: calendar.close, - } - } -} diff --git a/backend/src/types/mod.rs b/backend/src/types/mod.rs deleted file mode 100644 index b9896d6..0000000 --- a/backend/src/types/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -pub mod api; -pub mod asset; -pub mod bar; -pub mod calendar; -pub mod class; -pub mod exchange; -pub mod status; -pub mod websocket; - -pub use asset::*; -pub use bar::*; -pub use calendar::*; -pub use class::*; -pub use exchange::*; -pub use status::*; diff --git a/backend/src/types/websocket/incoming/success.rs b/backend/src/types/websocket/incoming/success.rs deleted file mode 100644 index 0415760..0000000 --- a/backend/src/types/websocket/incoming/success.rs +++ /dev/null @@ -1,14 +0,0 @@ -use serde::Deserialize; - -#[derive(Debug, PartialEq, Deserialize)] -pub enum SuccessMessageType { - #[serde(rename = "connected")] - Connected, - #[serde(rename = "authenticated")] - Authenticated, -} - -#[derive(Debug, PartialEq, Deserialize)] -pub struct SuccessMessage { - pub msg: SuccessMessageType, -} diff --git a/docker-compose.yml b/docker-compose.yml index 3fad3ba..eba2485 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,10 +4,14 @@ services: file: support/timescaledb/docker-compose.yml service: timescaledb - backend: - extends: - file: backend/docker-compose.yml - service: backend + qrust: + build: + context: . + dockerfile: Dockerfile + hostname: qrust + restart: unless-stopped + ports: + - 7878:7878 depends_on: - timescaledb env_file: diff --git a/backend/log4rs.yaml b/log4rs.yaml similarity index 100% rename from backend/log4rs.yaml rename to log4rs.yaml diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..9fb1f7d --- /dev/null +++ b/src/config.rs @@ -0,0 +1,73 @@ +use crate::types::Source; +use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; +use http::HeaderMap; +use reqwest::Client; +use sqlx::{postgres::PgPoolOptions, PgPool}; +use std::{env, num::NonZeroU32, sync::Arc}; +use time::{format_description::FormatItem, macros::format_description}; +use tokio::time::Duration; + +pub const ALPACA_ASSET_API_URL: &str = "https://api.alpaca.markets/v2/assets"; +pub const ALPACA_STOCK_DATA_URL: &str = "https://data.alpaca.markets/v2/stocks/bars"; +pub const ALPACA_CRYPTO_DATA_URL: &str = "https://data.alpaca.markets/v1beta3/crypto/us/bars"; +pub const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2"; +pub const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us"; +pub const ALPACA_TIMESTAMP_FORMAT: &[FormatItem] = + format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z"); + +const NUM_CLIENTS: u32 = 10; + +pub struct Config { + pub alpaca_api_key: String, + pub alpaca_api_secret: String, + pub alpaca_client: Client, + pub alpaca_rate_limit: DefaultDirectRateLimiter, + pub alpaca_historical_offset: Duration, + pub alpaca_source: Source, + pub postgres_pool: PgPool, +} + +impl Config { + pub async fn from_env() -> Self { + let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set."); + let alpaca_api_key = env::var("ALPACA_API_KEY").expect("ALPACA_API_KEY must be set."); + let alpaca_api_secret = + env::var("ALPACA_API_SECRET").expect("ALPACA_API_SECRET must be set."); + let alpaca_source: Source = env::var("ALPACA_SOURCE") + .expect("ALPACA_SOURCE must be set.") + .parse() + .expect("ALPACA_SOURCE must be a either 'iex' or 'sip'."); + + Self { + alpaca_api_key: alpaca_api_key.clone(), + alpaca_api_secret: alpaca_api_secret.clone(), + alpaca_client: Client::builder() + .default_headers({ + let mut headers = HeaderMap::new(); + headers.insert("APCA-API-KEY-ID", alpaca_api_key.parse().unwrap()); + headers.insert("APCA-API-SECRET-KEY", alpaca_api_secret.parse().unwrap()); + headers + }) + .build() + .unwrap(), + alpaca_rate_limit: RateLimiter::direct(Quota::per_minute(match alpaca_source { + Source::Iex => NonZeroU32::new(200).unwrap(), + Source::Sip => NonZeroU32::new(1000).unwrap(), + })), + alpaca_historical_offset: Duration::from_secs(match alpaca_source { + Source::Iex => 900, + Source::Sip => 0, + }), + alpaca_source, + postgres_pool: PgPoolOptions::new() + .max_connections(NUM_CLIENTS) + .connect(&database_url) + .await + .unwrap(), + } + } + + pub async fn arc_from_env() -> Arc { + Arc::new(Self::from_env().await) + } +} diff --git a/src/data/historical.rs b/src/data/historical.rs new file mode 100644 index 0000000..02eeb7e --- /dev/null +++ b/src/data/historical.rs @@ -0,0 +1,143 @@ +use crate::{ + config::{Config, ALPACA_CRYPTO_DATA_URL, ALPACA_STOCK_DATA_URL, ALPACA_TIMESTAMP_FORMAT}, + database, + time::{next_minute, ONE_MINUTE}, + types::{api::incoming, Asset, Bar, Class}, +}; +use http::StatusCode; +use indexmap::IndexMap; +use log::{error, info}; +use std::{collections::HashMap, sync::Arc}; +use time::OffsetDateTime; +use tokio::{sync::RwLock, task::spawn_blocking, time::sleep}; + +pub async fn backfill( + app_config: Arc, + asset: Asset, + backfilled: Arc>>, +) { + info!("Backfilling historical data for {}...", asset.symbol); + + let task_run_offsetdatetime = next_minute() + app_config.alpaca_historical_offset; + let fetch_from = asset.timestamp_last + ONE_MINUTE; + let fetch_until = task_run_offsetdatetime - app_config.alpaca_historical_offset - ONE_MINUTE; + if fetch_from > fetch_until { + return; + } + + let mut current_time = fetch_from; + let asset_clone = asset.clone(); + + let mut bars = spawn_blocking(move || { + let mut bars = IndexMap::new(); + while current_time <= fetch_until { + bars.insert( + current_time, + Bar::empty(current_time, asset_clone.symbol.clone()), + ); + current_time += ONE_MINUTE; + } + bars + }) + .await + .unwrap(); + + let wait_duration = task_run_offsetdatetime - OffsetDateTime::now_utc(); + if wait_duration.is_positive() { + sleep(wait_duration.unsigned_abs()).await; + } + + let mut next_page_token = None; + loop { + let request = app_config + .alpaca_client + .get(match asset.class { + Class::UsEquity => ALPACA_STOCK_DATA_URL, + Class::Crypto => ALPACA_CRYPTO_DATA_URL, + }) + .query(&[ + ("symbols", &asset.symbol), + ("timeframe", &String::from("1Min")), + ( + "start", + &fetch_from + .format(ALPACA_TIMESTAMP_FORMAT) + .unwrap() + .to_string(), + ), + ( + "end", + &fetch_until + .format(ALPACA_TIMESTAMP_FORMAT) + .unwrap() + .to_string(), + ), + ("limit", &String::from("10000")), + ("page_token", &next_page_token.clone().unwrap_or_default()), + ]); + + app_config.alpaca_rate_limit.until_ready().await; + let response = request.send().await.unwrap(); + let mut response = if response.status() == StatusCode::OK { + response.json::().await.unwrap() + } else { + error!( + "Failed to backfill historical data for {} from {} to {}: {}", + asset.symbol, + fetch_from, + fetch_until, + response.text().await.unwrap() + ); + break; + }; + + for bar in response.bars.remove(&asset.symbol).unwrap().unwrap() { + bars.insert(bar.timestamp, Bar::from((bar, asset.symbol.clone()))); + } + + if response.next_page_token.is_none() { + break; + } + next_page_token = response.next_page_token; + } + + let bars = bars.into_values().collect::>(); + + let transaction = app_config.postgres_pool.begin().await.unwrap(); + database::bars::upsert_batch(&app_config.postgres_pool, &bars).await; + database::bars_filled::upsert_batch(&app_config.postgres_pool, &bars).await; + database::assets::update_timestamp_last_where_symbol( + &app_config.postgres_pool, + &asset.symbol, + &fetch_until, + ) + .await; + backfill_recent_nulls(&app_config, &asset, &fetch_until, &backfilled).await; + transaction.commit().await.unwrap(); + + info!("Backfilled historical data for {}.", asset.symbol); +} + +#[allow(clippy::significant_drop_tightening)] +async fn backfill_recent_nulls( + app_config: &Arc, + asset: &Asset, + from: &OffsetDateTime, + backfilled: &Arc>>, +) { + let mut backfilled = backfilled.write().await; + let bars = database::bars::select_where_symbol_where_timestamp_larger_than( + &app_config.postgres_pool, + &asset.symbol, + from, + ) + .await; + database::bars_filled::upsert_batch(&app_config.postgres_pool, &bars).await; + database::assets::update_timestamp_last_where_symbol( + &app_config.postgres_pool, + &asset.symbol, + &bars.last().unwrap().timestamp, + ) + .await; + backfilled.insert(asset.symbol.clone(), true); +} diff --git a/src/data/live.rs b/src/data/live.rs new file mode 100644 index 0000000..512f27f --- /dev/null +++ b/src/data/live.rs @@ -0,0 +1,313 @@ +use crate::{ + config::{Config, ALPACA_CRYPTO_WEBSOCKET_URL, ALPACA_STOCK_WEBSOCKET_URL}, + data::historical::backfill, + database, + time::{duration_until, last_minute, next_30s, ONE_MINUTE, THIRTY_SECONDS}, + types::{ + asset, + websocket::{incoming, outgoing}, + Bar, BroadcastMessage, Class, + }, +}; +use core::panic; +use futures_util::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use log::{error, info, warn}; +use serde_json::{from_str, to_string}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Instant, +}; +use tokio::{ + net::TcpStream, + spawn, + sync::{ + broadcast::{Receiver, Sender}, + RwLock, + }, + time::interval_at, +}; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +pub async fn run( + app_config: Arc, + class: Class, + asset_broadcast_sender: Sender, +) { + info!("Running live data threads for {:?}.", class); + + let websocket_url = match class { + Class::UsEquity => format!( + "{}/{}", + ALPACA_STOCK_WEBSOCKET_URL, app_config.alpaca_source + ), + Class::Crypto => ALPACA_CRYPTO_WEBSOCKET_URL.to_string(), + }; + + let (stream, _) = connect_async(websocket_url).await.unwrap(); + let (mut sink, mut stream) = stream.split(); + authenticate_websocket(&app_config, &mut stream, &mut sink).await; + let sink = Arc::new(RwLock::new(sink)); + + let backfilled = Arc::new(RwLock::new(HashMap::new())); + + spawn(websocket_broadcast_handler( + class, + sink.clone(), + asset_broadcast_sender.subscribe(), + )); + + database::assets::select_where_class(&app_config.postgres_pool, class) + .await + .into_iter() + .for_each(|asset| { + asset_broadcast_sender + .send(BroadcastMessage::Asset(asset::BroadcastMessage::Added( + asset, + ))) + .unwrap(); + }); + + spawn(null_handler(app_config.clone(), backfilled.clone())); + websocket_message_handler(app_config, class, stream, sink, backfilled).await; + + unreachable!() +} + +async fn authenticate_websocket( + app_config: &Arc, + stream: &mut SplitStream>>, + sink: &mut SplitSink>, Message>, +) { + match stream.next().await { + Some(Ok(Message::Text(data))) + if from_str::>(&data).unwrap().get(0) + == Some(&incoming::Message::Success(incoming::success::Message { + msg: incoming::success::MessageType::Connected, + })) => {} + _ => panic!(), + } + + sink.send(Message::Text( + to_string(&outgoing::Message::Auth(outgoing::auth::Message::new( + app_config.alpaca_api_key.clone(), + app_config.alpaca_api_secret.clone(), + ))) + .unwrap(), + )) + .await + .unwrap(); + + match stream.next().await { + Some(Ok(Message::Text(data))) + if from_str::>(&data).unwrap().get(0) + == Some(&incoming::Message::Success(incoming::success::Message { + msg: incoming::success::MessageType::Authenticated, + })) => {} + _ => panic!(), + }; +} + +async fn websocket_broadcast_handler( + class: Class, + sink: Arc>, Message>>>, + mut asset_broadcast_receiver: Receiver, +) { + loop { + match asset_broadcast_receiver.recv().await.unwrap() { + BroadcastMessage::Asset(asset::BroadcastMessage::Added(asset)) + if asset.class == class => + { + sink.write() + .await + .send(Message::Text( + serde_json::to_string(&outgoing::Message::Subscribe( + outgoing::subscribe::Message::new(asset.clone().symbol), + )) + .unwrap(), + )) + .await + .unwrap(); + } + BroadcastMessage::Asset(asset::BroadcastMessage::Deleted(asset)) + if asset.class == class => + { + sink.write() + .await + .send(Message::Text( + serde_json::to_string(&outgoing::Message::Unsubscribe( + outgoing::subscribe::Message::new(asset.clone().symbol), + )) + .unwrap(), + )) + .await + .unwrap(); + } + BroadcastMessage::Asset(_) => {} + } + } +} + +async fn websocket_message_handler( + app_config: Arc, + class: Class, + mut stream: SplitStream>>, + sink: Arc>, Message>>>, + backfilled: Arc>>, +) { + loop { + match stream.next().await { + Some(Ok(Message::Text(data))) => { + let parsed_data = from_str::>(&data); + if let Err(e) = &parsed_data { + warn!("Unparsed incoming message: {:?}: {}", data, e); + } + + for message in parsed_data.unwrap_or_default() { + handle_message(&app_config, class, message, &backfilled).await; + } + } + Some(Ok(Message::Ping(_))) => sink + .write() + .await + .send(Message::Pong(vec![])) + .await + .unwrap(), + Some(unknown) => error!("Unknown incoming message: {:?}", unknown), + None => panic!(), + } + } +} + +async fn handle_message( + app_config: &Arc, + class: Class, + message: incoming::Message, + backfilled: &Arc>>, +) { + match message { + incoming::Message::Subscription(subscription_message) => { + let old_assets = backfilled + .read() + .await + .keys() + .cloned() + .collect::>(); + let new_assets = subscription_message + .bars + .into_iter() + .collect::>(); + + let added_assets = new_assets.difference(&old_assets).collect::>(); + let deleted_assets = old_assets.difference(&new_assets).collect::>(); + + for asset_symbol in &added_assets { + let asset = + database::assets::select_where_symbol(&app_config.postgres_pool, asset_symbol) + .await + .unwrap(); + + backfilled.write().await.insert(asset.symbol.clone(), false); + spawn(backfill( + app_config.clone(), + asset.clone(), + backfilled.clone(), + )); + } + + for asset_symbol in &deleted_assets { + backfilled.write().await.remove(*asset_symbol); + } + + info!( + "Subscription update for {:?}: {:?} added, {:?} deleted.", + class, added_assets, deleted_assets + ); + } + incoming::Message::Bars(bar_message) => { + let bar = Bar::from(bar_message); + info!("Incoming bar for {}: {}", bar.asset_symbol, bar.timestamp); + + let transaction = app_config.postgres_pool.begin().await.unwrap(); + database::bars::upsert(&app_config.postgres_pool, &bar).await; + if *backfilled.read().await.get(&bar.asset_symbol).unwrap() { + database::bars_filled::upsert(&app_config.postgres_pool, &bar).await; + } + transaction.commit().await.unwrap(); + } + incoming::Message::UpdatedBars(bar_message) => { + let bar = Bar::from(bar_message); + info!("Incoming bar for {}: {}", bar.asset_symbol, bar.timestamp); + + let transaction = app_config.postgres_pool.begin().await.unwrap(); + database::bars::upsert(&app_config.postgres_pool, &bar).await; + if *backfilled.read().await.get(&bar.asset_symbol).unwrap() { + database::bars_filled::upsert(&app_config.postgres_pool, &bar).await; + database::assets::update_timestamp_last_where_symbol( + &app_config.postgres_pool, + &bar.asset_symbol, + &bar.timestamp, + ) + .await; + } + transaction.commit().await.unwrap(); + } + incoming::Message::Success(_) => {} + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum NullHandlerState { + Bars, + UpdatedBars, +} + +#[allow(clippy::significant_drop_in_scrutinee)] +async fn null_handler(app_config: Arc, backfilled: Arc>>) { + let next_30s = next_30s(); + let mut state = if next_30s.unix_timestamp() % 30 == 0 { + NullHandlerState::Bars + } else { + NullHandlerState::UpdatedBars + }; + let mut interval = interval_at( + (Instant::now() + duration_until(next_30s)).into(), + THIRTY_SECONDS, + ); + + loop { + interval.tick().await; + let timestamp = last_minute() - ONE_MINUTE; + + let backfilled = backfilled.read().await; + for asset_symbol in backfilled.keys().cloned() { + let bar = Bar::empty(timestamp, asset_symbol); + + let transaction = app_config.postgres_pool.begin().await.unwrap(); + + database::bars::insert_or_skip(&app_config.postgres_pool, &bar).await; + if *backfilled.get(&bar.asset_symbol).unwrap() { + database::bars_filled::insert_or_skip(&app_config.postgres_pool, &bar).await; + + if state == NullHandlerState::UpdatedBars { + database::assets::update_timestamp_last_where_symbol( + &app_config.postgres_pool, + &bar.asset_symbol, + &bar.timestamp, + ) + .await; + } + } + + transaction.commit().await.unwrap(); + } + + state = match state { + NullHandlerState::Bars => NullHandlerState::UpdatedBars, + NullHandlerState::UpdatedBars => NullHandlerState::Bars, + }; + } +} diff --git a/src/data/mod.rs b/src/data/mod.rs new file mode 100644 index 0000000..5c9c723 --- /dev/null +++ b/src/data/mod.rs @@ -0,0 +1,2 @@ +pub mod historical; +pub mod live; diff --git a/src/database/assets.rs b/src/database/assets.rs new file mode 100644 index 0000000..dc5e36a --- /dev/null +++ b/src/database/assets.rs @@ -0,0 +1,92 @@ +use crate::types::{Asset, Class, Exchange}; +use sqlx::{query_as, PgPool}; +use std::convert::Into; +use time::OffsetDateTime; + +pub async fn select(postgres_pool: &PgPool) -> Vec { + query_as!( + Asset, + r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last FROM assets"# + ) + .fetch_all(postgres_pool) + .await + .unwrap() +} + +pub async fn select_where_class(postgres_pool: &PgPool, class: Class) -> Vec { + query_as!( + Asset, + r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last FROM assets WHERE class = $1::CLASS"#, + class as Class + ) + .fetch_all(postgres_pool) + .await + .unwrap() +} + +pub async fn select_where_symbol(postgres_pool: &PgPool, symbol: &str) -> Option { + query_as!( + Asset, + r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last FROM assets WHERE symbol = $1"#, + symbol + ) + .fetch_optional(postgres_pool) + .await + .unwrap() +} + +pub async fn insert(postgres_pool: &PgPool, asset: &Asset) -> Asset { + query_as!( + Asset, + r#"INSERT INTO assets (symbol, class, exchange, trading, timestamp_added, timestamp_first, timestamp_last) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5, $6, $7) + RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last"#, + asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.timestamp_added, asset.timestamp_first, asset.timestamp_last + ) + .fetch_one(postgres_pool) + .await + .unwrap() +} + +pub async fn update_trading_where_symbol( + postgres_pool: &PgPool, + symbol: &str, + trading: &bool, +) -> Option { + query_as!( + Asset, + r#"UPDATE assets SET trading = $1 WHERE symbol = $2 + RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last"#, + trading, symbol + ) + .fetch_optional(postgres_pool) + .await + .unwrap() +} + +pub async fn update_timestamp_last_where_symbol( + postgres_pool: &PgPool, + symbol: &str, + timestamp_last: &OffsetDateTime, +) -> Option { + query_as!( + Asset, + r#"UPDATE assets SET timestamp_last = $1 WHERE symbol = $2 + RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last"#, + timestamp_last, symbol + ) + .fetch_optional(postgres_pool) + .await + .unwrap() +} + +pub async fn delete_where_symbol(postgres_pool: &PgPool, symbol: &str) -> Option { + query_as!( + Asset, + r#"DELETE FROM assets WHERE symbol = $1 + RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, timestamp_added, timestamp_first, timestamp_last"#, + symbol + ) + .fetch_optional(postgres_pool) + .await + .unwrap() +} diff --git a/src/database/bars.rs b/src/database/bars.rs new file mode 100644 index 0000000..d27f445 --- /dev/null +++ b/src/database/bars.rs @@ -0,0 +1,89 @@ +use crate::types::Bar; +use sqlx::{query_as, PgPool, Postgres}; +use std::convert::Into; +use time::OffsetDateTime; + +pub async fn upsert(postgres_pool: &PgPool, bar: &Bar) -> Bar { + query_as!( + Bar, + r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9 + RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, + bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted + ) + .fetch_one(postgres_pool) + .await + .unwrap() +} + +pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar) { + query_as!( + Bar, + r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (timestamp, asset_symbol) DO NOTHING"#, + bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted + ) + .execute(postgres_pool) + .await + .unwrap(); +} + +pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec { + let mut timestamp = Vec::with_capacity(bars.len()); + let mut asset_symbol = Vec::with_capacity(bars.len()); + let mut open = Vec::with_capacity(bars.len()); + let mut high = Vec::with_capacity(bars.len()); + let mut low = Vec::with_capacity(bars.len()); + let mut close = Vec::with_capacity(bars.len()); + let mut volume = Vec::with_capacity(bars.len()); + let mut num_trades = Vec::with_capacity(bars.len()); + let mut volume_weighted = Vec::with_capacity(bars.len()); + + for bar in bars { + timestamp.push(bar.timestamp); + asset_symbol.push(bar.asset_symbol.clone()); + open.push(bar.open); + high.push(bar.high); + low.push(bar.low); + close.push(bar.close); + volume.push(bar.volume); + num_trades.push(bar.num_trades); + volume_weighted.push(bar.volume_weighted); + } + + // No type-safety here because of NULLABLE bulk insert + query_as::( + r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) + SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[]) + ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, num_trades = EXCLUDED.num_trades, volume_weighted = EXCLUDED.volume_weighted + RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, + ) + .bind(timestamp) + .bind(asset_symbol) + .bind(open) + .bind(high) + .bind(low) + .bind(close) + .bind(volume) + .bind(num_trades) + .bind(volume_weighted) + .fetch_all(postgres_pool) + .await + .unwrap() +} + +pub async fn select_where_symbol_where_timestamp_larger_than( + postgres_pool: &PgPool, + symbol: &str, + timestamp: &OffsetDateTime, +) -> Vec { + query_as!( + Bar, + r#"SELECT * FROM bars WHERE asset_symbol = $1 AND timestamp > $2 ORDER BY timestamp ASC"#, + symbol, + timestamp + ) + .fetch_all(postgres_pool) + .await + .unwrap() +} diff --git a/src/database/bars_filled.rs b/src/database/bars_filled.rs new file mode 100644 index 0000000..4e9c2b8 --- /dev/null +++ b/src/database/bars_filled.rs @@ -0,0 +1,133 @@ +use crate::types::Bar; +use sqlx::{query_as, PgPool, Postgres}; +use std::convert::Into; + +pub async fn upsert(postgres_pool: &PgPool, bar: &Bar) -> Bar { + let mut bar = bar.clone(); + + if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() { + let filled_bar = query_as!( + Bar, + r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#, + bar.timestamp, + bar.asset_symbol + ) + .fetch_one(postgres_pool) + .await + .unwrap(); + bar.merge_empty(&filled_bar); + } + + query_as!( + Bar, + r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9 + RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, + bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted + ) + .fetch_one(postgres_pool) + .await + .unwrap() +} + +pub async fn insert_or_skip(postgres_pool: &PgPool, bar: &Bar) { + let mut bar = bar.clone(); + + if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() { + let filled_bar = query_as!( + Bar, + r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#, + bar.timestamp, + bar.asset_symbol + ) + .fetch_one(postgres_pool) + .await + .unwrap(); + bar.merge_empty(&filled_bar); + } + + query_as!( + Bar, + r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (timestamp, asset_symbol) DO NOTHING"#, + bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted + ) + .execute(postgres_pool) + .await + .unwrap(); +} + +pub async fn upsert_batch(postgres_pool: &PgPool, bars: &[Bar]) -> Vec { + let mut bars = bars.to_vec(); + + if bars.is_empty() { + return bars; + } + + if bars[0].open.is_none() + || bars[0].high.is_none() + || bars[0].low.is_none() + || bars[0].close.is_none() + { + let filled_bar = &query_as!( + Bar, + r#"SELECT * FROM bars_filled WHERE timestamp < $1 AND asset_symbol = $2 ORDER BY timestamp DESC LIMIT 1"#, + bars[0].timestamp, + bars[0].asset_symbol + ) + .fetch_one(postgres_pool) + .await + .unwrap(); + bars[0].merge_empty(filled_bar); + } + + let mut timestamp = Vec::with_capacity(bars.len()); + let mut asset_symbol = Vec::with_capacity(bars.len()); + let mut open = Vec::with_capacity(bars.len()); + let mut high = Vec::with_capacity(bars.len()); + let mut low = Vec::with_capacity(bars.len()); + let mut close = Vec::with_capacity(bars.len()); + let mut volume = Vec::with_capacity(bars.len()); + let mut num_trades = Vec::with_capacity(bars.len()); + let mut volume_weighted = Vec::with_capacity(bars.len()); + + let mut last_filled_bar = bars[0].clone(); + + for mut bar in bars { + if bar.open.is_none() || bar.high.is_none() || bar.low.is_none() || bar.close.is_none() { + bar.merge_empty(&last_filled_bar); + } else { + last_filled_bar = bar.clone(); + } + + timestamp.push(bar.timestamp); + asset_symbol.push(bar.asset_symbol.clone()); + open.push(bar.open); + high.push(bar.high); + low.push(bar.low); + close.push(bar.close); + volume.push(bar.volume); + num_trades.push(bar.num_trades); + volume_weighted.push(bar.volume_weighted); + } + + // No type-safety here because of NULLABLE bulk insert + query_as::( + r#"INSERT INTO bars_filled (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) + SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[]) + ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, num_trades = EXCLUDED.num_trades, volume_weighted = EXCLUDED.volume_weighted + RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#, + ) + .bind(timestamp) + .bind(asset_symbol) + .bind(open) + .bind(high) + .bind(low) + .bind(close) + .bind(volume) + .bind(num_trades) + .bind(volume_weighted) + .fetch_all(postgres_pool) + .await + .unwrap() +} diff --git a/backend/src/database/mod.rs b/src/database/mod.rs similarity index 58% rename from backend/src/database/mod.rs rename to src/database/mod.rs index d760015..1c4a99e 100644 --- a/backend/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,3 +1,3 @@ pub mod assets; pub mod bars; -pub mod calendar; +pub mod bars_filled; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..c5c205b --- /dev/null +++ b/src/main.rs @@ -0,0 +1,49 @@ +#![warn(clippy::all, clippy::pedantic, clippy::nursery)] +#![allow(clippy::missing_docs_in_private_items)] + +mod config; +mod data; +mod database; +mod routes; +mod time; +mod types; + +use config::Config; +use dotenv::dotenv; +use log4rs::config::Deserializers; +use sqlx::error::BoxDynError; +use tokio::{spawn, sync::broadcast}; +use types::{BroadcastMessage, Class}; + +#[tokio::main] +async fn main() -> Result<(), BoxDynError> { + dotenv().ok(); + log4rs::init_file("log4rs.yaml", Deserializers::default())?; + let app_config = Config::arc_from_env().await; + let mut threads = Vec::new(); + + let (asset_broadcast_sender, _) = broadcast::channel::(100); + + threads.push(spawn(data::live::run( + app_config.clone(), + Class::UsEquity, + asset_broadcast_sender.clone(), + ))); + + threads.push(spawn(data::live::run( + app_config.clone(), + Class::Crypto, + asset_broadcast_sender.clone(), + ))); + + threads.push(spawn(routes::run( + app_config.clone(), + asset_broadcast_sender, + ))); + + for thread in threads { + thread.await?; + } + + unreachable!() +} diff --git a/src/routes/assets.rs b/src/routes/assets.rs new file mode 100644 index 0000000..b3b56aa --- /dev/null +++ b/src/routes/assets.rs @@ -0,0 +1,171 @@ +use crate::config::{ + Config, ALPACA_ASSET_API_URL, ALPACA_CRYPTO_DATA_URL, ALPACA_STOCK_DATA_URL, + ALPACA_TIMESTAMP_FORMAT, +}; +use crate::database; +use crate::types::Class; +use crate::types::{api::incoming, asset, Asset, BroadcastMessage, Status}; +use axum::{extract::Path, http::StatusCode, Extension, Json}; +use log::info; +use serde::Deserialize; +use std::sync::Arc; +use time::OffsetDateTime; +use tokio::sync::broadcast::Sender; + +pub async fn get_all( + Extension(app_config): Extension>, +) -> Result<(StatusCode, Json>), StatusCode> { + let assets = database::assets::select(&app_config.postgres_pool).await; + Ok((StatusCode::OK, Json(assets))) +} + +pub async fn get( + Extension(app_config): Extension>, + Path(symbol): Path, +) -> Result<(StatusCode, Json), StatusCode> { + let asset = database::assets::select_where_symbol(&app_config.postgres_pool, &symbol).await; + asset.map_or(Err(StatusCode::NOT_FOUND), |asset| { + Ok((StatusCode::OK, Json(asset))) + }) +} + +#[derive(Deserialize)] +pub struct AddAssetRequest { + symbol: String, + trading: Option, +} + +pub async fn add( + Extension(app_config): Extension>, + Extension(asset_broadcast_sender): Extension>, + Json(request): Json, +) -> Result<(StatusCode, Json), StatusCode> { + if database::assets::select_where_symbol(&app_config.postgres_pool, &request.symbol) + .await + .is_some() + { + return Err(StatusCode::CONFLICT); + } + + app_config.alpaca_rate_limit.until_ready().await; + let asset = app_config + .alpaca_client + .get(&format!("{}/{}", ALPACA_ASSET_API_URL, request.symbol)) + .send() + .await + .map_err(|e| match e.status() { + Some(StatusCode::NOT_FOUND) => StatusCode::NOT_FOUND, + _ => panic!(), + })?; + + let asset = asset.json::().await.unwrap(); + + if asset.status != Status::Active || !asset.tradable || !asset.fractionable { + return Err(StatusCode::FORBIDDEN); + } + + let mut earliest_bar_request = app_config + .alpaca_client + .get(match asset.class { + Class::UsEquity => ALPACA_STOCK_DATA_URL, + Class::Crypto => ALPACA_CRYPTO_DATA_URL, + }) + .query(&[ + ("symbols", &asset.symbol), + ("timeframe", &String::from("1Min")), + ( + "start", + &OffsetDateTime::UNIX_EPOCH + .format(ALPACA_TIMESTAMP_FORMAT) + .unwrap(), + ), + ("limit", &String::from("1")), + ]); + + if asset.class == Class::UsEquity { + earliest_bar_request = + earliest_bar_request.query(&[("feed", &app_config.alpaca_source.to_string())]); + } + + let earliest_bar = earliest_bar_request + .send() + .await + .unwrap() + .json::() + .await + .unwrap(); + + let earliest_bar = earliest_bar + .bars + .get(&asset.symbol) + .ok_or(StatusCode::NOT_FOUND)? + .as_ref() + .ok_or(StatusCode::NOT_FOUND)? + .first() + .ok_or(StatusCode::NOT_FOUND)?; + + let asset = Asset::from(( + asset, + request.trading.unwrap_or(false), + earliest_bar.timestamp, + )); + + database::assets::insert(&app_config.postgres_pool, &asset).await; + + asset_broadcast_sender + .send(BroadcastMessage::Asset(asset::BroadcastMessage::Added( + asset.clone(), + ))) + .unwrap(); + + info!("Added asset {}.", asset.symbol); + Ok((StatusCode::CREATED, Json(asset))) +} + +#[allow(dead_code)] +#[derive(Deserialize)] +pub struct UpdateAssetRequest { + trading: bool, +} + +pub async fn update( + Extension(app_config): Extension>, + Extension(asset_broadcast_sender): Extension>, + Path(symbol): Path, + Json(request): Json, +) -> Result<(StatusCode, Json), StatusCode> { + let asset = database::assets::update_trading_where_symbol( + &app_config.postgres_pool, + &symbol, + &request.trading, + ) + .await; + + asset.map_or(Err(StatusCode::NOT_FOUND), |asset| { + asset_broadcast_sender + .send(BroadcastMessage::Asset(asset::BroadcastMessage::Updated( + asset.clone(), + ))) + .unwrap(); + info!("Updated asset {}.", symbol); + Ok((StatusCode::OK, Json(asset))) + }) +} + +pub async fn delete( + Extension(app_config): Extension>, + Extension(asset_broadcast_sender): Extension>, + Path(symbol): Path, +) -> Result { + let asset = database::assets::delete_where_symbol(&app_config.postgres_pool, &symbol).await; + + asset.map_or(Err(StatusCode::NOT_FOUND), |asset| { + asset_broadcast_sender + .send(BroadcastMessage::Asset(asset::BroadcastMessage::Deleted( + asset, + ))) + .unwrap(); + info!("Deleted asset {}.", symbol); + Ok(StatusCode::NO_CONTENT) + }) +} diff --git a/src/routes/mod.rs b/src/routes/mod.rs new file mode 100644 index 0000000..0925256 --- /dev/null +++ b/src/routes/mod.rs @@ -0,0 +1,30 @@ +use crate::{config::Config, types::BroadcastMessage}; +use axum::{ + routing::{delete, get, post}, + Extension, Router, Server, +}; +use log::info; +use std::{net::SocketAddr, sync::Arc}; +use tokio::sync::broadcast::Sender; + +pub mod assets; + +pub async fn run(app_config: Arc, asset_broadcast_sender: Sender) { + let app = Router::new() + .route("/assets", get(assets::get_all)) + .route("/assets/:symbol", get(assets::get)) + .route("/assets", post(assets::add)) + .route("/assets/:symbol", post(assets::update)) + .route("/assets/:symbol", delete(assets::delete)) + .layer(Extension(app_config)) + .layer(Extension(asset_broadcast_sender)); + + let addr = SocketAddr::from(([0, 0, 0, 0], 7878)); + info!("Listening on {}.", addr); + Server::bind(&addr) + .serve(app.into_make_service()) + .await + .unwrap(); + + unreachable!() +} diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 0000000..d2e4733 --- /dev/null +++ b/src/time.rs @@ -0,0 +1,34 @@ +use std::time::Duration; +use time::OffsetDateTime; + +pub const THIRTY_SECONDS: Duration = Duration::from_secs(30); +pub const ONE_MINUTE: Duration = Duration::from_secs(60); + +pub fn last_minute() -> OffsetDateTime { + let now_timestamp = OffsetDateTime::now_utc().unix_timestamp(); + OffsetDateTime::from_unix_timestamp(now_timestamp - now_timestamp % 60).unwrap() +} + +pub fn next_minute() -> OffsetDateTime { + last_minute() + ONE_MINUTE +} + +pub fn last_30s() -> OffsetDateTime { + let now_timestamp = OffsetDateTime::now_utc().unix_timestamp(); + OffsetDateTime::from_unix_timestamp(now_timestamp - now_timestamp % 30).unwrap() +} + +pub fn next_30s() -> OffsetDateTime { + last_30s() + THIRTY_SECONDS +} + +pub fn duration_until(time: OffsetDateTime) -> Duration { + let now = OffsetDateTime::now_utc(); + let duration = time - now; + + if duration.is_positive() { + duration.unsigned_abs() + } else { + Duration::default() + } +} diff --git a/backend/src/types/api/incoming/asset.rs b/src/types/api/incoming/asset.rs similarity index 91% rename from backend/src/types/api/incoming/asset.rs rename to src/types/api/incoming/asset.rs index 6717ad2..b137715 100644 --- a/backend/src/types/api/incoming/asset.rs +++ b/src/types/api/incoming/asset.rs @@ -1,3 +1,5 @@ +#![allow(clippy::struct_excessive_bools)] + use crate::types::{Class, Exchange, Status}; use serde::Deserialize; diff --git a/src/types/api/incoming/bar.rs b/src/types/api/incoming/bar.rs new file mode 100644 index 0000000..3d1508d --- /dev/null +++ b/src/types/api/incoming/bar.rs @@ -0,0 +1,30 @@ +use serde::Deserialize; +use std::collections::HashMap; +use time::OffsetDateTime; + +#[derive(Debug, PartialEq, Deserialize)] +pub struct Bar { + #[serde(rename = "t")] + #[serde(with = "time::serde::rfc3339")] + pub timestamp: OffsetDateTime, + #[serde(rename = "o")] + pub open: f64, + #[serde(rename = "h")] + pub high: f64, + #[serde(rename = "l")] + pub low: f64, + #[serde(rename = "c")] + pub close: f64, + #[serde(rename = "v")] + pub volume: f64, + #[serde(rename = "n")] + pub num_trades: i64, + #[serde(rename = "vw")] + pub volume_weighted: f64, +} + +#[derive(Debug, PartialEq, Deserialize)] +pub struct Message { + pub bars: HashMap>>, + pub next_page_token: Option, +} diff --git a/backend/src/types/api/incoming/calendar.rs b/src/types/api/incoming/calendar_date.rs similarity index 95% rename from backend/src/types/api/incoming/calendar.rs rename to src/types/api/incoming/calendar_date.rs index 1f4e769..b92213b 100644 --- a/backend/src/types/api/incoming/calendar.rs +++ b/src/types/api/incoming/calendar_date.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Deserializer}; use time::{macros::format_description, Date, Time}; -#[derive(Debug, PartialEq, Deserialize)] +#[derive(Debug, PartialEq, Eq, Deserialize)] pub struct CalendarDate { #[serde(deserialize_with = "deserialize_date")] pub date: Date, diff --git a/src/types/api/incoming/mod.rs b/src/types/api/incoming/mod.rs new file mode 100644 index 0000000..7609673 --- /dev/null +++ b/src/types/api/incoming/mod.rs @@ -0,0 +1,7 @@ +pub mod asset; +pub mod bar; +pub mod calendar_date; + +pub use asset::Asset; +pub use bar::Bar; +pub use calendar_date::CalendarDate; diff --git a/backend/src/types/websocket/mod.rs b/src/types/api/mod.rs similarity index 50% rename from backend/src/types/websocket/mod.rs rename to src/types/api/mod.rs index 9aac270..28f2603 100644 --- a/backend/src/types/websocket/mod.rs +++ b/src/types/api/mod.rs @@ -1,2 +1 @@ pub mod incoming; -pub mod outgoing; diff --git a/src/types/asset.rs b/src/types/asset.rs new file mode 100644 index 0000000..cb0289b --- /dev/null +++ b/src/types/asset.rs @@ -0,0 +1,37 @@ +use super::{api::incoming, class::Class, exchange::Exchange}; +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, Eq, FromRow, Serialize, Deserialize, Hash)] +pub struct Asset { + pub symbol: String, + pub class: Class, + pub exchange: Exchange, + pub trading: bool, + pub timestamp_added: OffsetDateTime, + pub timestamp_first: OffsetDateTime, + pub timestamp_last: OffsetDateTime, +} + +impl From<(incoming::Asset, bool, OffsetDateTime)> for Asset { + fn from((asset, trading, timestamp_first): (incoming::Asset, bool, OffsetDateTime)) -> Self { + Self { + symbol: asset.symbol, + class: asset.class, + exchange: asset.exchange, + trading, + timestamp_added: OffsetDateTime::now_utc(), + timestamp_first, + timestamp_last: timestamp_first, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum BroadcastMessage { + Added(Asset), + Updated(Asset), + Deleted(Asset), + Reset(Asset), +} diff --git a/src/types/bar.rs b/src/types/bar.rs new file mode 100644 index 0000000..8ec7223 --- /dev/null +++ b/src/types/bar.rs @@ -0,0 +1,72 @@ +use super::{api, websocket}; +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use time::OffsetDateTime; + +#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] +pub struct Bar { + pub timestamp: OffsetDateTime, + pub asset_symbol: String, + pub open: Option, + pub high: Option, + pub low: Option, + pub close: Option, + pub volume: f64, + pub num_trades: i64, + pub volume_weighted: f64, +} + +impl Bar { + pub const fn empty(timestamp: OffsetDateTime, asset_symbol: String) -> Self { + Self { + timestamp, + asset_symbol, + open: None, + high: None, + low: None, + close: None, + volume: 0.0, + num_trades: 0, + volume_weighted: 0.0, + } + } + + pub fn merge_empty(&mut self, other: &Self) { + self.open = other.open; + self.high = other.high; + self.low = other.low; + self.close = other.close; + } +} + +impl From for Bar { + fn from(bar_message: websocket::incoming::bar::Message) -> Self { + Self { + timestamp: bar_message.timestamp, + asset_symbol: bar_message.symbol, + open: Some(bar_message.open), + high: Some(bar_message.high), + low: Some(bar_message.low), + close: Some(bar_message.close), + volume: bar_message.volume, + num_trades: bar_message.num_trades, + volume_weighted: bar_message.volume_weighted, + } + } +} + +impl From<(api::incoming::Bar, String)> for Bar { + fn from((bar, asset_symbol): (api::incoming::Bar, String)) -> Self { + Self { + timestamp: bar.timestamp, + asset_symbol, + open: Some(bar.open), + high: Some(bar.high), + low: Some(bar.low), + close: Some(bar.close), + volume: bar.volume, + num_trades: bar.num_trades, + volume_weighted: bar.volume_weighted, + } + } +} diff --git a/backend/src/types/class.rs b/src/types/class.rs similarity index 74% rename from backend/src/types/class.rs rename to src/types/class.rs index f835602..f6fbc0a 100644 --- a/backend/src/types/class.rs +++ b/src/types/class.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use sqlx::Type; -#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Type)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type, Hash)] pub enum Class { #[sqlx(rename = "us_equity")] #[serde(rename = "us_equity")] diff --git a/backend/src/types/exchange.rs b/src/types/exchange.rs similarity index 88% rename from backend/src/types/exchange.rs rename to src/types/exchange.rs index 1ac9d1b..27933fe 100644 --- a/backend/src/types/exchange.rs +++ b/src/types/exchange.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use sqlx::Type; -#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Type)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type, Hash)] pub enum Exchange { #[sqlx(rename = "AMEX")] #[serde(rename = "AMEX")] diff --git a/src/types/mod.rs b/src/types/mod.rs new file mode 100644 index 0000000..cbdee21 --- /dev/null +++ b/src/types/mod.rs @@ -0,0 +1,22 @@ +pub mod api; +pub mod asset; +pub mod bar; +pub mod class; +pub mod exchange; +pub mod source; +pub mod status; +pub mod websocket; + +pub use asset::Asset; +pub use bar::Bar; +pub use class::Class; +pub use exchange::Exchange; +pub use source::Source; +pub use status::Status; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum BroadcastMessage { + Asset(asset::BroadcastMessage), +} diff --git a/src/types/source.rs b/src/types/source.rs new file mode 100644 index 0000000..f734d3e --- /dev/null +++ b/src/types/source.rs @@ -0,0 +1,31 @@ +use std::{ + fmt::{Display, Formatter}, + str::FromStr, +}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Source { + Iex, + Sip, +} + +impl FromStr for Source { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "iex" => Ok(Self::Iex), + "sip" => Ok(Self::Sip), + _ => Err(format!("Unknown source: {s}")), + } + } +} + +impl Display for Source { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + match self { + Self::Iex => write!(f, "iex"), + Self::Sip => write!(f, "sip"), + } + } +} diff --git a/backend/src/types/status.rs b/src/types/status.rs similarity index 75% rename from backend/src/types/status.rs rename to src/types/status.rs index a6c868b..7b1374f 100644 --- a/backend/src/types/status.rs +++ b/src/types/status.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use sqlx::Type; -#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Type)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Type)] pub enum Status { #[sqlx(rename = "active")] #[serde(rename = "active")] diff --git a/backend/src/types/websocket/incoming/bar.rs b/src/types/websocket/incoming/bar.rs similarity index 96% rename from backend/src/types/websocket/incoming/bar.rs rename to src/types/websocket/incoming/bar.rs index c7c69c9..6ef71b8 100644 --- a/backend/src/types/websocket/incoming/bar.rs +++ b/src/types/websocket/incoming/bar.rs @@ -2,7 +2,7 @@ use serde::Deserialize; use time::OffsetDateTime; #[derive(Debug, PartialEq, Deserialize)] -pub struct BarMessage { +pub struct Message { #[serde(rename = "t")] #[serde(with = "time::serde::rfc3339")] pub timestamp: OffsetDateTime, diff --git a/backend/src/types/websocket/incoming/mod.rs b/src/types/websocket/incoming/mod.rs similarity index 56% rename from backend/src/types/websocket/incoming/mod.rs rename to src/types/websocket/incoming/mod.rs index 82a92ef..49496d1 100644 --- a/backend/src/types/websocket/incoming/mod.rs +++ b/src/types/websocket/incoming/mod.rs @@ -2,21 +2,17 @@ pub mod bar; pub mod subscription; pub mod success; -pub use bar::*; -pub use subscription::*; -pub use success::*; - use serde::Deserialize; #[derive(Debug, Deserialize, PartialEq)] #[serde(tag = "T")] -pub enum IncomingMessage { +pub enum Message { #[serde(rename = "success")] - Success(SuccessMessage), + Success(success::Message), #[serde(rename = "subscription")] - Subscription(SubscriptionMessage), + Subscription(subscription::Message), #[serde(rename = "b")] - Bars(BarMessage), + Bars(bar::Message), #[serde(rename = "u")] - UpdatedBars(BarMessage), + UpdatedBars(bar::Message), } diff --git a/backend/src/types/websocket/incoming/subscription.rs b/src/types/websocket/incoming/subscription.rs similarity index 85% rename from backend/src/types/websocket/incoming/subscription.rs rename to src/types/websocket/incoming/subscription.rs index af251dd..4644c3b 100644 --- a/backend/src/types/websocket/incoming/subscription.rs +++ b/src/types/websocket/incoming/subscription.rs @@ -1,7 +1,7 @@ use serde::Deserialize; -#[derive(Debug, PartialEq, Deserialize)] -pub struct SubscriptionMessage { +#[derive(Debug, PartialEq, Eq, Deserialize)] +pub struct Message { pub trades: Vec, pub quotes: Vec, pub bars: Vec, diff --git a/src/types/websocket/incoming/success.rs b/src/types/websocket/incoming/success.rs new file mode 100644 index 0000000..2d29509 --- /dev/null +++ b/src/types/websocket/incoming/success.rs @@ -0,0 +1,14 @@ +use serde::Deserialize; + +#[derive(Debug, PartialEq, Eq, Deserialize)] +pub enum MessageType { + #[serde(rename = "connected")] + Connected, + #[serde(rename = "authenticated")] + Authenticated, +} + +#[derive(Debug, PartialEq, Eq, Deserialize)] +pub struct Message { + pub msg: MessageType, +} diff --git a/backend/src/types/api/mod.rs b/src/types/websocket/mod.rs similarity index 100% rename from backend/src/types/api/mod.rs rename to src/types/websocket/mod.rs diff --git a/backend/src/types/websocket/outgoing/auth.rs b/src/types/websocket/outgoing/auth.rs similarity index 56% rename from backend/src/types/websocket/outgoing/auth.rs rename to src/types/websocket/outgoing/auth.rs index b20375b..98a414b 100644 --- a/backend/src/types/websocket/outgoing/auth.rs +++ b/src/types/websocket/outgoing/auth.rs @@ -1,13 +1,13 @@ use serde::Serialize; #[derive(Debug, Serialize)] -pub struct AuthMessage { +pub struct Message { key: String, secret: String, } -impl AuthMessage { - pub fn new(key: String, secret: String) -> Self { +impl Message { + pub const fn new(key: String, secret: String) -> Self { Self { key, secret } } } diff --git a/backend/src/types/websocket/outgoing/mod.rs b/src/types/websocket/outgoing/mod.rs similarity index 57% rename from backend/src/types/websocket/outgoing/mod.rs rename to src/types/websocket/outgoing/mod.rs index 95a111c..049333d 100644 --- a/backend/src/types/websocket/outgoing/mod.rs +++ b/src/types/websocket/outgoing/mod.rs @@ -1,18 +1,15 @@ pub mod auth; pub mod subscribe; -pub use auth::*; -pub use subscribe::*; - use serde::Serialize; #[derive(Debug, Serialize)] #[serde(tag = "action")] -pub enum OutgoingMessage { +pub enum Message { #[serde(rename = "auth")] - Auth(AuthMessage), + Auth(auth::Message), #[serde(rename = "subscribe")] - Subscribe(SubscribeMessage), + Subscribe(subscribe::Message), #[serde(rename = "unsubscribe")] - Unsubscribe(SubscribeMessage), + Unsubscribe(subscribe::Message), } diff --git a/backend/src/types/websocket/outgoing/subscribe.rs b/src/types/websocket/outgoing/subscribe.rs similarity index 58% rename from backend/src/types/websocket/outgoing/subscribe.rs rename to src/types/websocket/outgoing/subscribe.rs index dea1a4b..3e1e66b 100644 --- a/backend/src/types/websocket/outgoing/subscribe.rs +++ b/src/types/websocket/outgoing/subscribe.rs @@ -1,24 +1,17 @@ use serde::Serialize; #[derive(Debug, Serialize)] -pub struct SubscribeMessage { +pub struct Message { bars: Vec, #[serde(rename = "updatedBars")] updated_bars: Vec, } -impl SubscribeMessage { +impl Message { pub fn new(symbol: String) -> Self { Self { bars: vec![symbol.clone()], updated_bars: vec![symbol], } } - - pub fn from_vec(symbols: Vec) -> Self { - Self { - bars: symbols.clone(), - updated_bars: symbols, - } - } } diff --git a/support/timescaledb/999_init.sh b/support/timescaledb/999_init.sh index 985eac8..6253f98 100644 --- a/support/timescaledb/999_init.sh +++ b/support/timescaledb/999_init.sh @@ -19,12 +19,29 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL class CLASS NOT NULL, exchange EXCHANGE NOT NULL, trading BOOLEAN NOT NULL DEFAULT FALSE, - date_added TIMESTAMPTZ NOT NULL DEFAULT NOW() + timestamp_added TIMESTAMPTZ NOT NULL DEFAULT NOW(), + timestamp_first TIMESTAMPTZ NOT NULL, + timestamp_last TIMESTAMPTZ NOT NULL ); CREATE TABLE bars ( - timestamp TIMESTAMPTZ NOT NULL, - asset_symbol TEXT NOT NULL REFERENCES assets(symbol) ON DELETE CASCADE ON UPDATE CASCADE, + timestamp TIMESTAMPTZ, + asset_symbol TEXT REFERENCES assets(symbol) ON DELETE CASCADE ON UPDATE CASCADE, + open DOUBLE PRECISION, + high DOUBLE PRECISION, + low DOUBLE PRECISION, + close DOUBLE PRECISION, + volume DOUBLE PRECISION NOT NULL, + num_trades BIGINT NOT NULL, + volume_weighted DOUBLE PRECISION NOT NULL, + PRIMARY KEY (asset_symbol, timestamp) + ); + + SELECT create_hypertable('bars', 'timestamp', 'asset_symbol', 15); + + CREATE TABLE bars_filled ( + timestamp TIMESTAMPTZ, + asset_symbol TEXT REFERENCES assets(symbol) ON DELETE CASCADE ON UPDATE CASCADE, open DOUBLE PRECISION NOT NULL, high DOUBLE PRECISION NOT NULL, low DOUBLE PRECISION NOT NULL, @@ -35,27 +52,5 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL PRIMARY KEY (asset_symbol, timestamp) ); - SELECT create_hypertable('bars', 'timestamp', 'asset_symbol', 2); - - CREATE TABLE calendar ( - date DATE NOT NULL PRIMARY KEY, - open TIME NOT NULL, - close TIME NOT NULL - ); - - CREATE VIEW bars_missing AS - WITH time_series AS ( - SELECT - asset_symbol, - generate_series(MIN(timestamp), NOW(), interval '1 minute')::TIMESTAMPTZ AS expected_time - FROM bars - GROUP BY asset_symbol - ) - SELECT - ts.asset_symbol, - ts.expected_time AS missing_time - FROM time_series ts - LEFT JOIN bars b - ON ts.asset_symbol = b.asset_symbol AND ts.expected_time = b.timestamp - WHERE b.timestamp IS NULL; + SELECT create_hypertable('bars_filled', 'timestamp', 'asset_symbol', 15); EOSQL