From a542225680adfe763d0ddfd74da1ed5589049610 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Tue, 29 Aug 2023 16:59:34 +0300 Subject: [PATCH] Add managed Alpaca pool Signed-off-by: Nikolaos Karaolidis --- .vscode/settings.json | 14 +++ ...7cb26f1253f7d66328bea8fed38dc6a2cdd8.json} | 20 ++-- ...106a243e8433e9844359b0dfd77ba5f892fa.json} | 20 ++-- ...4f20fa74d7e5f8c67305cfe5cff41ba6527f.json} | 20 ++-- ...121b6aab8b1867c257492edf5411ce6e1c1c.json} | 20 ++-- ...2ede6a656d439a99da5fb865745d607b699c.json} | 23 ++--- backend/Cargo.lock | 33 ++++++- backend/Cargo.toml | 11 +++ backend/src/database/assets.rs | 37 +++---- backend/src/main.rs | 13 +-- backend/src/pool.rs | 53 ---------- backend/src/pool/alpaca.rs | 98 +++++++++++++++++++ backend/src/pool/mod.rs | 2 + backend/src/pool/postgres.rs | 21 ++++ backend/src/routes/assets.rs | 34 +++---- backend/src/routes/mod.rs | 9 +- backend/src/types.rs | 7 +- support/timescaledb/999_init.sh | 13 +-- 18 files changed, 265 insertions(+), 183 deletions(-) create mode 100644 .vscode/settings.json rename backend/.sqlx/{query-3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf.json => query-053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8.json} (81%) rename backend/.sqlx/{query-798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7.json => query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json} (78%) rename backend/.sqlx/{query-98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee.json => query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json} (78%) rename backend/.sqlx/{query-3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8.json => query-edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c.json} (77%) rename backend/.sqlx/{query-e304538ad4380d75d473ca2f4d4a9693f5ff124a40f8811b8bc208ebfee54b36.json => query-f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c.json} (80%) delete mode 100644 backend/src/pool.rs create mode 100644 backend/src/pool/alpaca.rs create mode 100644 backend/src/pool/mod.rs create mode 100644 backend/src/pool/postgres.rs diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..fa5b14a --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,14 @@ +{ + "sqltools.connections": [ + { + "previewLimit": 50, + "server": "localhost", + "port": 5432, + "driver": "PostgreSQL", + "name": "QRust", + "database": "qrust", + "username": "qrust", + "password": "qrust" + } + ] +} diff --git a/backend/.sqlx/query-3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf.json b/backend/.sqlx/query-053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8.json similarity index 81% rename from backend/.sqlx/query-3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf.json rename to backend/.sqlx/query-053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8.json index fbb4770..11e3884 100644 --- a/backend/.sqlx/query-3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf.json +++ b/backend/.sqlx/query-053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8.json @@ -1,20 +1,15 @@ { "db_name": "PostgreSQL", - "query": "UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "query": "UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "describe": { "columns": [ { "ordinal": 0, - "name": "id", - "type_info": "Uuid" + "name": "symbol", + "type_info": "Text" }, { "ordinal": 1, - "name": "symbol", - "type_info": "Varchar" - }, - { - "ordinal": 2, "name": "class: Class", "type_info": { "Custom": { @@ -30,7 +25,7 @@ } }, { - "ordinal": 3, + "ordinal": 2, "name": "exchange: Exchange", "type_info": { "Custom": { @@ -51,12 +46,12 @@ } }, { - "ordinal": 4, + "ordinal": 3, "name": "trading", "type_info": "Bool" }, { - "ordinal": 5, + "ordinal": 4, "name": "date_added", "type_info": "Timestamptz" } @@ -72,9 +67,8 @@ false, false, false, - false, false ] }, - "hash": "3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf" + "hash": "053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8" } diff --git a/backend/.sqlx/query-798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7.json b/backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json similarity index 78% rename from backend/.sqlx/query-798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7.json rename to backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json index 0856918..5fd5872 100644 --- a/backend/.sqlx/query-798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7.json +++ b/backend/.sqlx/query-2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa.json @@ -1,20 +1,15 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE symbol = $1", + "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE symbol = $1", "describe": { "columns": [ { "ordinal": 0, - "name": "id", - "type_info": "Uuid" + "name": "symbol", + "type_info": "Text" }, { "ordinal": 1, - "name": "symbol", - "type_info": "Varchar" - }, - { - "ordinal": 2, "name": "class: Class", "type_info": { "Custom": { @@ -30,7 +25,7 @@ } }, { - "ordinal": 3, + "ordinal": 2, "name": "exchange: Exchange", "type_info": { "Custom": { @@ -51,12 +46,12 @@ } }, { - "ordinal": 4, + "ordinal": 3, "name": "trading", "type_info": "Bool" }, { - "ordinal": 5, + "ordinal": 4, "name": "date_added", "type_info": "Timestamptz" } @@ -71,9 +66,8 @@ false, false, false, - false, false ] }, - "hash": "798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7" + "hash": "2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa" } diff --git a/backend/.sqlx/query-98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee.json b/backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json similarity index 78% rename from backend/.sqlx/query-98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee.json rename to backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json index 7b20e48..a53e391 100644 --- a/backend/.sqlx/query-98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee.json +++ b/backend/.sqlx/query-48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f.json @@ -1,20 +1,15 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets", + "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets", "describe": { "columns": [ { "ordinal": 0, - "name": "id", - "type_info": "Uuid" + "name": "symbol", + "type_info": "Text" }, { "ordinal": 1, - "name": "symbol", - "type_info": "Varchar" - }, - { - "ordinal": 2, "name": "class: Class", "type_info": { "Custom": { @@ -30,7 +25,7 @@ } }, { - "ordinal": 3, + "ordinal": 2, "name": "exchange: Exchange", "type_info": { "Custom": { @@ -51,12 +46,12 @@ } }, { - "ordinal": 4, + "ordinal": 3, "name": "trading", "type_info": "Bool" }, { - "ordinal": 5, + "ordinal": 4, "name": "date_added", "type_info": "Timestamptz" } @@ -69,9 +64,8 @@ false, false, false, - false, false ] }, - "hash": "98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee" + "hash": "48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f" } diff --git a/backend/.sqlx/query-3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8.json b/backend/.sqlx/query-edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c.json similarity index 77% rename from backend/.sqlx/query-3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8.json rename to backend/.sqlx/query-edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c.json index a398026..be21401 100644 --- a/backend/.sqlx/query-3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8.json +++ b/backend/.sqlx/query-edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c.json @@ -1,20 +1,15 @@ { "db_name": "PostgreSQL", - "query": "DELETE FROM assets WHERE symbol = $1 RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "query": "DELETE FROM assets WHERE symbol = $1 RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "describe": { "columns": [ { "ordinal": 0, - "name": "id", - "type_info": "Uuid" + "name": "symbol", + "type_info": "Text" }, { "ordinal": 1, - "name": "symbol", - "type_info": "Varchar" - }, - { - "ordinal": 2, "name": "class: Class", "type_info": { "Custom": { @@ -30,7 +25,7 @@ } }, { - "ordinal": 3, + "ordinal": 2, "name": "exchange: Exchange", "type_info": { "Custom": { @@ -51,12 +46,12 @@ } }, { - "ordinal": 4, + "ordinal": 3, "name": "trading", "type_info": "Bool" }, { - "ordinal": 5, + "ordinal": 4, "name": "date_added", "type_info": "Timestamptz" } @@ -71,9 +66,8 @@ false, false, false, - false, false ] }, - "hash": "3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8" + "hash": "edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c" } diff --git a/backend/.sqlx/query-e304538ad4380d75d473ca2f4d4a9693f5ff124a40f8811b8bc208ebfee54b36.json b/backend/.sqlx/query-f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c.json similarity index 80% rename from backend/.sqlx/query-e304538ad4380d75d473ca2f4d4a9693f5ff124a40f8811b8bc208ebfee54b36.json rename to backend/.sqlx/query-f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c.json index 3284ad2..dab780c 100644 --- a/backend/.sqlx/query-e304538ad4380d75d473ca2f4d4a9693f5ff124a40f8811b8bc208ebfee54b36.json +++ b/backend/.sqlx/query-f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c.json @@ -1,20 +1,15 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO assets (id, symbol, class, exchange, trading, date_added) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", + "query": "INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5) RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "describe": { "columns": [ { "ordinal": 0, - "name": "id", - "type_info": "Uuid" + "name": "symbol", + "type_info": "Text" }, { "ordinal": 1, - "name": "symbol", - "type_info": "Varchar" - }, - { - "ordinal": 2, "name": "class: Class", "type_info": { "Custom": { @@ -30,7 +25,7 @@ } }, { - "ordinal": 3, + "ordinal": 2, "name": "exchange: Exchange", "type_info": { "Custom": { @@ -51,20 +46,19 @@ } }, { - "ordinal": 4, + "ordinal": 3, "name": "trading", "type_info": "Bool" }, { - "ordinal": 5, + "ordinal": 4, "name": "date_added", "type_info": "Timestamptz" } ], "parameters": { "Left": [ - "Uuid", - "Varchar", + "Text", { "Custom": { "name": "class", @@ -103,9 +97,8 @@ false, false, false, - false, false ] }, - "hash": "e304538ad4380d75d473ca2f4d4a9693f5ff124a40f8811b8bc208ebfee54b36" + "hash": "f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c" } diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 5ecda39..e5cb791 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -184,9 +184,12 @@ name = "backend" version = "0.1.0" dependencies = [ "apca", + "async-trait", "axum", "deadpool", "dotenv", + "futures", + "futures-util", "log", "log4rs", "serde", @@ -194,6 +197,8 @@ dependencies = [ "sqlx", "time 0.3.28", "tokio", + "tungstenite 0.20.0", + "websocket-util", ] [[package]] @@ -386,6 +391,12 @@ dependencies = [ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" + [[package]] name = "deadpool" version = "0.9.5" @@ -588,6 +599,7 @@ checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -2142,7 +2154,7 @@ dependencies = [ "native-tls", "tokio", "tokio-native-tls", - "tungstenite", + "tungstenite 0.18.0", ] [[package]] @@ -2242,6 +2254,25 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typemap-ors" version = "1.0.0" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index b0de2e4..ba34bd3 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -3,6 +3,12 @@ name = "backend" version = "0.1.0" edition = "2021" +[profile.release] +panic = 'abort' + +[profile.dev] +panic = 'abort' + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] @@ -29,3 +35,8 @@ log4rs = "1.2.0" time = { version = "0.3.27", features = [ "serde", ] } +futures = "0.3.28" +websocket-util = "0.11.2" +futures-util = "0.3.28" +tungstenite = "0.20.0" +async-trait = "0.1.73" diff --git a/backend/src/database/assets.rs b/backend/src/database/assets.rs index f2cbb3e..2798171 100644 --- a/backend/src/database/assets.rs +++ b/backend/src/database/assets.rs @@ -1,53 +1,56 @@ -use crate::types::{Asset, Class, Exchange}; -use sqlx::{query_as, PgPool}; +use crate::{ + pool::postgres::PostgresPool, + types::{Asset, Class, Exchange}, +}; +use sqlx::query_as; use std::error::Error; pub async fn get_assets( - database_pool: &PgPool, + postgres_pool: &PostgresPool, ) -> Result, Box> { - query_as!(Asset, r#"SELECT id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"#) - .fetch_all(database_pool) + query_as!(Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"#) + .fetch_all(postgres_pool) .await .map_err(|e| e.into()) } pub async fn get_asset( - database_pool: &PgPool, + postgres_pool: &PostgresPool, symbol: &str, ) -> Result, Box> { - query_as!(Asset, r#"SELECT id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol) - .fetch_optional(database_pool) + query_as!(Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol) + .fetch_optional(postgres_pool) .await .map_err(|e| e.into()) } pub async fn add_asset( - database_pool: &PgPool, + postgres_pool: &PostgresPool, asset: Asset, ) -> Result> { - query_as!(Asset, r#"INSERT INTO assets (id, symbol, class, exchange, trading, date_added) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, asset.id, asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added) - .fetch_one(database_pool) + query_as!(Asset, r#"INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5) RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added) + .fetch_one(postgres_pool) .await .map_err(|e| e.into()) } pub async fn update_asset_trading( - database_pool: &PgPool, + postgres_pool: &PostgresPool, symbol: &str, trading: bool, ) -> Result, Box> { - query_as!(Asset, r#"UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, trading, symbol) - .fetch_optional(database_pool) + query_as!(Asset, r#"UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, trading, symbol) + .fetch_optional(postgres_pool) .await .map_err(|e| e.into()) } pub async fn delete_asset( - database_pool: &PgPool, + postgres_pool: &PostgresPool, symbol: &str, ) -> Result, Box> { - query_as!(Asset, r#"DELETE FROM assets WHERE symbol = $1 RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, symbol) - .fetch_optional(database_pool) + query_as!(Asset, r#"DELETE FROM assets WHERE symbol = $1 RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, symbol) + .fetch_optional(postgres_pool) .await .map_err(|e| e.into()) } diff --git a/backend/src/main.rs b/backend/src/main.rs index f0a3b5b..ba6763a 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -4,8 +4,8 @@ mod routes; mod types; use dotenv::dotenv; -use pool::{create_alpaca_pool_from_env, create_database_pool_from_env}; -use routes::initialize_api; +use pool::{alpaca::create_alpaca_pool_from_env, postgres::create_postgres_pool_from_env}; +use routes::run_api; use std::error::Error; use tokio::spawn; @@ -15,15 +15,10 @@ async fn main() -> Result<(), Box> { log4rs::init_file("log4rs.yaml", Default::default()).unwrap(); let num_clients = 10; - let database_pool = create_database_pool_from_env(num_clients).await?; + let postgres_pool = create_postgres_pool_from_env(num_clients).await?; let alpaca_pool = create_alpaca_pool_from_env(num_clients).await?; - let mut threads = Vec::new(); - - threads.push(spawn(initialize_api( - database_pool.clone(), - alpaca_pool.clone(), - ))); + let threads = vec![spawn(run_api(postgres_pool.clone(), alpaca_pool.clone()))]; for thread in threads { let _ = thread.await?; diff --git a/backend/src/pool.rs b/backend/src/pool.rs deleted file mode 100644 index 049e6d7..0000000 --- a/backend/src/pool.rs +++ /dev/null @@ -1,53 +0,0 @@ -use apca::{ApiInfo, Client}; -use deadpool::unmanaged::Pool; -use sqlx::postgres::PgPoolOptions; -use std::{env, error::Error}; - -pub type AlpacaPool = Pool; - -pub async fn create_alpaca_pool( - apca_api_base_url: &str, - apca_api_key_id: &str, - apca_api_secret_key: &str, - num_clients: usize, -) -> Result, Box> { - let mut alpaca_clients = Vec::new(); - for _ in 0..num_clients { - let client = Client::new(ApiInfo::from_parts( - apca_api_base_url, - apca_api_key_id, - apca_api_secret_key, - )?); - alpaca_clients.push(client); - } - Ok(Pool::from(alpaca_clients)) -} - -pub async fn create_alpaca_pool_from_env( - num_clients: usize, -) -> Result, Box> { - create_alpaca_pool( - &env::var("APCA_API_BASE_URL")?, - &env::var("APCA_API_KEY_ID")?, - &env::var("APCA_API_SECRET_KEY")?, - num_clients, - ) - .await -} - -pub async fn create_database_pool( - database_url: &str, - num_clients: usize, -) -> Result> { - PgPoolOptions::new() - .max_connections(num_clients as u32) - .connect(database_url) - .await - .map_err(|e| e.into()) -} - -pub async fn create_database_pool_from_env( - num_clients: usize, -) -> Result> { - create_database_pool(&env::var("DATABASE_URL")?, num_clients).await -} diff --git a/backend/src/pool/alpaca.rs b/backend/src/pool/alpaca.rs new file mode 100644 index 0000000..a8d77a5 --- /dev/null +++ b/backend/src/pool/alpaca.rs @@ -0,0 +1,98 @@ +use apca::{ApiInfo, Client}; +use async_trait::async_trait; +use deadpool::managed::{BuildError, Manager, Pool, RecycleResult}; +use std::{env, error::Error}; + +pub struct AlpacaManager { + apca_api_base_url: String, + apca_api_key_id: String, + apca_api_secret_key: String, +} + +impl AlpacaManager { + pub fn new( + apca_api_base_url: String, + apca_api_key_id: String, + apca_api_secret_key: String, + ) -> Self { + Self { + apca_api_base_url, + apca_api_key_id, + apca_api_secret_key, + } + } +} + +pub type AlpacaPool = Pool; + +#[async_trait] +impl Manager for AlpacaManager { + type Type = Client; + type Error = Box; + + async fn create(&self) -> Result { + let client = Client::new(ApiInfo::from_parts( + &self.apca_api_base_url, + &self.apca_api_key_id, + &self.apca_api_secret_key, + )?); + Ok(client) + } + + async fn recycle(&self, _: &mut Self::Type) -> RecycleResult { + Ok(()) + } +} + +pub async fn create_alpaca_client( + apca_api_base_url: &str, + apca_api_key_id: &str, + apca_api_secret_key: &str, +) -> Result> { + Ok(Client::new(ApiInfo::from_parts( + apca_api_base_url, + apca_api_key_id, + apca_api_secret_key, + )?)) +} + +pub async fn create_alpaca_client_from_env() -> Result> { + create_alpaca_client( + &env::var("APCA_API_BASE_URL")?, + &env::var("APCA_API_KEY_ID")?, + &env::var("APCA_API_SECRET_KEY")?, + ) + .await +} + +pub async fn create_alpaca_pool( + apca_api_base_url: &str, + apca_api_key_id: &str, + apca_api_secret_key: &str, + num_clients: usize, +) -> Result> { + let manager = AlpacaManager::new( + apca_api_base_url.to_owned(), + apca_api_key_id.to_owned(), + apca_api_secret_key.to_owned(), + ); + Pool::builder(manager) + .max_size(num_clients) + .build() + .map_err(|e| match e { + BuildError::Backend(e) => e, + BuildError::NoRuntimeSpecified(_) => unreachable!(), + }) +} + +pub async fn create_alpaca_pool_from_env( + num_clients: usize, +) -> Result> { + create_alpaca_pool( + &env::var("APCA_API_BASE_URL")?, + &env::var("APCA_API_KEY_ID")?, + &env::var("APCA_API_SECRET_KEY")?, + num_clients, + ) + .await +} diff --git a/backend/src/pool/mod.rs b/backend/src/pool/mod.rs new file mode 100644 index 0000000..d0cd51c --- /dev/null +++ b/backend/src/pool/mod.rs @@ -0,0 +1,2 @@ +pub mod alpaca; +pub mod postgres; diff --git a/backend/src/pool/postgres.rs b/backend/src/pool/postgres.rs new file mode 100644 index 0000000..e745aaa --- /dev/null +++ b/backend/src/pool/postgres.rs @@ -0,0 +1,21 @@ +use sqlx::{postgres::PgPoolOptions, PgPool}; +use std::{env, error::Error}; + +pub type PostgresPool = PgPool; + +pub async fn create_postgres_pool( + database_url: &str, + num_clients: usize, +) -> Result> { + PgPoolOptions::new() + .max_connections(num_clients as u32) + .connect(database_url) + .await + .map_err(|e| e.into()) +} + +pub async fn create_postgres_pool_from_env( + num_clients: usize, +) -> Result> { + create_postgres_pool(&env::var("DATABASE_URL")?, num_clients).await +} diff --git a/backend/src/routes/assets.rs b/backend/src/routes/assets.rs index 873950f..7639c78 100644 --- a/backend/src/routes/assets.rs +++ b/backend/src/routes/assets.rs @@ -1,19 +1,17 @@ use crate::database; use crate::database::assets::update_asset_trading; -use crate::pool::AlpacaPool; +use crate::pool::alpaca::AlpacaPool; +use crate::pool::postgres::PostgresPool; use crate::types::{Asset, Class, Exchange}; use apca::api::v2::asset::{self, Symbol}; use axum::{extract::Path, http::StatusCode, Extension, Json}; use serde::Deserialize; -use sqlx::{ - types::{time::OffsetDateTime, Uuid}, - PgPool, -}; +use sqlx::types::time::OffsetDateTime; pub async fn get_assets( - Extension(database_pool): Extension, + Extension(postgres_pool): Extension, ) -> Result<(StatusCode, Json>), StatusCode> { - let assets = database::assets::get_assets(&database_pool) + let assets = database::assets::get_assets(&postgres_pool) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -21,10 +19,10 @@ pub async fn get_assets( } pub async fn get_asset( - Extension(database_pool): Extension, + Extension(postgres_pool): Extension, Path(symbol): Path, ) -> Result<(StatusCode, Json), StatusCode> { - let asset = database::assets::get_asset(&database_pool, &symbol) + let asset = database::assets::get_asset(&postgres_pool, &symbol) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -42,11 +40,11 @@ pub struct AddAssetRequest { } pub async fn add_asset( - Extension(database_pool): Extension, + Extension(postgres_pool): Extension, Extension(alpaca_pool): Extension, Json(request): Json, ) -> Result<(StatusCode, Json), StatusCode> { - if database::assets::get_asset(&database_pool, &request.symbol) + if database::assets::get_asset(&postgres_pool, &request.symbol) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .is_some() @@ -60,11 +58,9 @@ pub async fn add_asset( .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .issue::(&Symbol::Sym(request.symbol)) .await - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + .map_err(|_| StatusCode::NOT_FOUND)?; let asset = Asset { - id: Uuid::parse_str(&asset.id.to_string()) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?, symbol: asset.symbol, class: Class::from(asset.class) as Class, exchange: Exchange::from(asset.exchange) as Exchange, @@ -72,7 +68,7 @@ pub async fn add_asset( date_added: OffsetDateTime::now_utc(), }; - let asset = database::assets::add_asset(&database_pool, asset) + let asset = database::assets::add_asset(&postgres_pool, asset) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -86,11 +82,11 @@ pub struct UpdateAssetRequest { } pub async fn update_asset( - Extension(database_pool): Extension, + Extension(postgres_pool): Extension, Path(symbol): Path, Json(request): Json, ) -> Result<(StatusCode, Json), StatusCode> { - let asset = update_asset_trading(&database_pool, &symbol, request.trading) + let asset = update_asset_trading(&postgres_pool, &symbol, request.trading) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -101,10 +97,10 @@ pub async fn update_asset( } pub async fn delete_asset( - Extension(database_pool): Extension, + Extension(postgres_pool): Extension, Path(symbol): Path, ) -> Result<(StatusCode, Json), StatusCode> { - let asset = database::assets::delete_asset(&database_pool, &symbol) + let asset = database::assets::delete_asset(&postgres_pool, &symbol) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs index a2a372d..24d30ff 100644 --- a/backend/src/routes/mod.rs +++ b/backend/src/routes/mod.rs @@ -1,16 +1,15 @@ -use crate::pool::AlpacaPool; +use crate::pool::{alpaca::AlpacaPool, postgres::PostgresPool}; use axum::{ routing::{delete, get, post}, Extension, Router, Server, }; use log::info; -use sqlx::PgPool; use std::net::SocketAddr; pub mod assets; -pub async fn initialize_api( - database_pool: PgPool, +pub async fn run_api( + postgres_pool: PostgresPool, alpaca_pool: AlpacaPool, ) -> Result<(), Box> { let app = Router::new() @@ -19,7 +18,7 @@ pub async fn initialize_api( .route("/assets", post(assets::add_asset)) .route("/assets/:symbol", post(assets::update_asset)) .route("/assets/:symbol", delete(assets::delete_asset)) - .layer(Extension(database_pool)) + .layer(Extension(postgres_pool)) .layer(Extension(alpaca_pool)); let addr = SocketAddr::from(([0, 0, 0, 0], 7878)); diff --git a/backend/src/types.rs b/backend/src/types.rs index f733f16..7529e8b 100644 --- a/backend/src/types.rs +++ b/backend/src/types.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use sqlx::{error::BoxDynError, types::Uuid, Decode, Encode, FromRow, Postgres, Type}; +use sqlx::{error::BoxDynError, Decode, Encode, FromRow, Postgres, Type}; use std::ops::Deref; use time::OffsetDateTime; @@ -69,7 +69,6 @@ impl_apca_sqlx_traits!( #[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] pub struct Asset { - pub id: Uuid, pub symbol: String, pub class: Class, pub exchange: Exchange, @@ -80,10 +79,10 @@ pub struct Asset { #[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] pub struct Bar { pub timestamp: OffsetDateTime, - pub asset_id: Uuid, + pub asset_symbol: String, pub open: f64, pub high: f64, pub low: f64, pub close: f64, - pub volume: u64, + pub volume: f64, } diff --git a/support/timescaledb/999_init.sh b/support/timescaledb/999_init.sh index a741f97..f83697a 100644 --- a/support/timescaledb/999_init.sh +++ b/support/timescaledb/999_init.sh @@ -15,27 +15,24 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL ); CREATE TABLE assets ( - id UUID PRIMARY KEY, - symbol VARCHAR(20) NOT NULL UNIQUE, + symbol TEXT PRIMARY KEY, class CLASS NOT NULL, exchange EXCHANGE NOT NULL, trading BOOLEAN NOT NULL DEFAULT FALSE, date_added TIMESTAMPTZ NOT NULL DEFAULT NOW() ); - CREATE INDEX assets_symbol_idx ON assets (symbol); - CREATE TABLE bars ( timestamp TIMESTAMPTZ NOT NULL, - asset_id UUID NOT NULL REFERENCES assets(id), + asset_symbol TEXT NOT NULL REFERENCES assets(symbol), open DOUBLE PRECISION NOT NULL, high DOUBLE PRECISION NOT NULL, low DOUBLE PRECISION NOT NULL, close DOUBLE PRECISION NOT NULL, volume DOUBLE PRECISION NOT NULL, - PRIMARY KEY (asset_id, timestamp), - FOREIGN KEY (asset_id) REFERENCES assets(id) + PRIMARY KEY (asset_symbol, timestamp), + FOREIGN KEY (asset_symbol) REFERENCES assets(symbol) ); - SELECT create_hypertable('bars', 'timestamp', 'asset_id', 2); + SELECT create_hypertable('bars', 'timestamp', 'asset_symbol', 2); EOSQL