Add managed Alpaca pool

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-08-29 16:59:34 +03:00
parent 003f47339f
commit a542225680
18 changed files with 265 additions and 183 deletions

14
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,14 @@
{
"sqltools.connections": [
{
"previewLimit": 50,
"server": "localhost",
"port": 5432,
"driver": "PostgreSQL",
"name": "QRust",
"database": "qrust",
"username": "qrust",
"password": "qrust"
}
]
}

View File

@@ -1,20 +1,15 @@
{ {
"db_name": "PostgreSQL", "db_name": "PostgreSQL",
"query": "UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "query": "UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added",
"describe": { "describe": {
"columns": [ "columns": [
{ {
"ordinal": 0, "ordinal": 0,
"name": "id", "name": "symbol",
"type_info": "Uuid" "type_info": "Text"
}, },
{ {
"ordinal": 1, "ordinal": 1,
"name": "symbol",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "class: Class", "name": "class: Class",
"type_info": { "type_info": {
"Custom": { "Custom": {
@@ -30,7 +25,7 @@
} }
}, },
{ {
"ordinal": 3, "ordinal": 2,
"name": "exchange: Exchange", "name": "exchange: Exchange",
"type_info": { "type_info": {
"Custom": { "Custom": {
@@ -51,12 +46,12 @@
} }
}, },
{ {
"ordinal": 4, "ordinal": 3,
"name": "trading", "name": "trading",
"type_info": "Bool" "type_info": "Bool"
}, },
{ {
"ordinal": 5, "ordinal": 4,
"name": "date_added", "name": "date_added",
"type_info": "Timestamptz" "type_info": "Timestamptz"
} }
@@ -72,9 +67,8 @@
false, false,
false, false,
false, false,
false,
false false
] ]
}, },
"hash": "3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf" "hash": "053b5a3b5d52f7c06245221930557cb26f1253f7d66328bea8fed38dc6a2cdd8"
} }

View File

@@ -1,20 +1,15 @@
{ {
"db_name": "PostgreSQL", "db_name": "PostgreSQL",
"query": "SELECT id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE symbol = $1", "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE symbol = $1",
"describe": { "describe": {
"columns": [ "columns": [
{ {
"ordinal": 0, "ordinal": 0,
"name": "id", "name": "symbol",
"type_info": "Uuid" "type_info": "Text"
}, },
{ {
"ordinal": 1, "ordinal": 1,
"name": "symbol",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "class: Class", "name": "class: Class",
"type_info": { "type_info": {
"Custom": { "Custom": {
@@ -30,7 +25,7 @@
} }
}, },
{ {
"ordinal": 3, "ordinal": 2,
"name": "exchange: Exchange", "name": "exchange: Exchange",
"type_info": { "type_info": {
"Custom": { "Custom": {
@@ -51,12 +46,12 @@
} }
}, },
{ {
"ordinal": 4, "ordinal": 3,
"name": "trading", "name": "trading",
"type_info": "Bool" "type_info": "Bool"
}, },
{ {
"ordinal": 5, "ordinal": 4,
"name": "date_added", "name": "date_added",
"type_info": "Timestamptz" "type_info": "Timestamptz"
} }
@@ -71,9 +66,8 @@
false, false,
false, false,
false, false,
false,
false false
] ]
}, },
"hash": "798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7" "hash": "2fdf66c1563d95b36a3f23783cf6106a243e8433e9844359b0dfd77ba5f892fa"
} }

View File

@@ -1,20 +1,15 @@
{ {
"db_name": "PostgreSQL", "db_name": "PostgreSQL",
"query": "SELECT id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets", "query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets",
"describe": { "describe": {
"columns": [ "columns": [
{ {
"ordinal": 0, "ordinal": 0,
"name": "id", "name": "symbol",
"type_info": "Uuid" "type_info": "Text"
}, },
{ {
"ordinal": 1, "ordinal": 1,
"name": "symbol",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "class: Class", "name": "class: Class",
"type_info": { "type_info": {
"Custom": { "Custom": {
@@ -30,7 +25,7 @@
} }
}, },
{ {
"ordinal": 3, "ordinal": 2,
"name": "exchange: Exchange", "name": "exchange: Exchange",
"type_info": { "type_info": {
"Custom": { "Custom": {
@@ -51,12 +46,12 @@
} }
}, },
{ {
"ordinal": 4, "ordinal": 3,
"name": "trading", "name": "trading",
"type_info": "Bool" "type_info": "Bool"
}, },
{ {
"ordinal": 5, "ordinal": 4,
"name": "date_added", "name": "date_added",
"type_info": "Timestamptz" "type_info": "Timestamptz"
} }
@@ -69,9 +64,8 @@
false, false,
false, false,
false, false,
false,
false false
] ]
}, },
"hash": "98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee" "hash": "48ceef0501f26b4ce3232791b0764f20fa74d7e5f8c67305cfe5cff41ba6527f"
} }

View File

@@ -1,20 +1,15 @@
{ {
"db_name": "PostgreSQL", "db_name": "PostgreSQL",
"query": "DELETE FROM assets WHERE symbol = $1 RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "query": "DELETE FROM assets WHERE symbol = $1 RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added",
"describe": { "describe": {
"columns": [ "columns": [
{ {
"ordinal": 0, "ordinal": 0,
"name": "id", "name": "symbol",
"type_info": "Uuid" "type_info": "Text"
}, },
{ {
"ordinal": 1, "ordinal": 1,
"name": "symbol",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "class: Class", "name": "class: Class",
"type_info": { "type_info": {
"Custom": { "Custom": {
@@ -30,7 +25,7 @@
} }
}, },
{ {
"ordinal": 3, "ordinal": 2,
"name": "exchange: Exchange", "name": "exchange: Exchange",
"type_info": { "type_info": {
"Custom": { "Custom": {
@@ -51,12 +46,12 @@
} }
}, },
{ {
"ordinal": 4, "ordinal": 3,
"name": "trading", "name": "trading",
"type_info": "Bool" "type_info": "Bool"
}, },
{ {
"ordinal": 5, "ordinal": 4,
"name": "date_added", "name": "date_added",
"type_info": "Timestamptz" "type_info": "Timestamptz"
} }
@@ -71,9 +66,8 @@
false, false,
false, false,
false, false,
false,
false false
] ]
}, },
"hash": "3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8" "hash": "edba75326365bdcbb47002eaf11b121b6aab8b1867c257492edf5411ce6e1c1c"
} }

View File

@@ -1,20 +1,15 @@
{ {
"db_name": "PostgreSQL", "db_name": "PostgreSQL",
"query": "INSERT INTO assets (id, symbol, class, exchange, trading, date_added) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added", "query": "INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5) RETURNING symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added",
"describe": { "describe": {
"columns": [ "columns": [
{ {
"ordinal": 0, "ordinal": 0,
"name": "id", "name": "symbol",
"type_info": "Uuid" "type_info": "Text"
}, },
{ {
"ordinal": 1, "ordinal": 1,
"name": "symbol",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "class: Class", "name": "class: Class",
"type_info": { "type_info": {
"Custom": { "Custom": {
@@ -30,7 +25,7 @@
} }
}, },
{ {
"ordinal": 3, "ordinal": 2,
"name": "exchange: Exchange", "name": "exchange: Exchange",
"type_info": { "type_info": {
"Custom": { "Custom": {
@@ -51,20 +46,19 @@
} }
}, },
{ {
"ordinal": 4, "ordinal": 3,
"name": "trading", "name": "trading",
"type_info": "Bool" "type_info": "Bool"
}, },
{ {
"ordinal": 5, "ordinal": 4,
"name": "date_added", "name": "date_added",
"type_info": "Timestamptz" "type_info": "Timestamptz"
} }
], ],
"parameters": { "parameters": {
"Left": [ "Left": [
"Uuid", "Text",
"Varchar",
{ {
"Custom": { "Custom": {
"name": "class", "name": "class",
@@ -103,9 +97,8 @@
false, false,
false, false,
false, false,
false,
false false
] ]
}, },
"hash": "e304538ad4380d75d473ca2f4d4a9693f5ff124a40f8811b8bc208ebfee54b36" "hash": "f14d8710b0d38d6b7f0315e77aec2ede6a656d439a99da5fb865745d607b699c"
} }

33
backend/Cargo.lock generated
View File

@@ -184,9 +184,12 @@ name = "backend"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"apca", "apca",
"async-trait",
"axum", "axum",
"deadpool", "deadpool",
"dotenv", "dotenv",
"futures",
"futures-util",
"log", "log",
"log4rs", "log4rs",
"serde", "serde",
@@ -194,6 +197,8 @@ dependencies = [
"sqlx", "sqlx",
"time 0.3.28", "time 0.3.28",
"tokio", "tokio",
"tungstenite 0.20.0",
"websocket-util",
] ]
[[package]] [[package]]
@@ -386,6 +391,12 @@ dependencies = [
"typenum", "typenum",
] ]
[[package]]
name = "data-encoding"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]] [[package]]
name = "deadpool" name = "deadpool"
version = "0.9.5" version = "0.9.5"
@@ -588,6 +599,7 @@ checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
"futures-executor",
"futures-io", "futures-io",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
@@ -2142,7 +2154,7 @@ dependencies = [
"native-tls", "native-tls",
"tokio", "tokio",
"tokio-native-tls", "tokio-native-tls",
"tungstenite", "tungstenite 0.18.0",
] ]
[[package]] [[package]]
@@ -2242,6 +2254,25 @@ dependencies = [
"utf-8", "utf-8",
] ]
[[package]]
name = "tungstenite"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"rand",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]] [[package]]
name = "typemap-ors" name = "typemap-ors"
version = "1.0.0" version = "1.0.0"

View File

@@ -3,6 +3,12 @@ name = "backend"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
[profile.release]
panic = 'abort'
[profile.dev]
panic = 'abort'
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
@@ -29,3 +35,8 @@ log4rs = "1.2.0"
time = { version = "0.3.27", features = [ time = { version = "0.3.27", features = [
"serde", "serde",
] } ] }
futures = "0.3.28"
websocket-util = "0.11.2"
futures-util = "0.3.28"
tungstenite = "0.20.0"
async-trait = "0.1.73"

View File

@@ -1,53 +1,56 @@
use crate::types::{Asset, Class, Exchange}; use crate::{
use sqlx::{query_as, PgPool}; pool::postgres::PostgresPool,
types::{Asset, Class, Exchange},
};
use sqlx::query_as;
use std::error::Error; use std::error::Error;
pub async fn get_assets( pub async fn get_assets(
database_pool: &PgPool, postgres_pool: &PostgresPool,
) -> Result<Vec<Asset>, Box<dyn Error + Send + Sync>> { ) -> Result<Vec<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(Asset, r#"SELECT id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"#) query_as!(Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"#)
.fetch_all(database_pool) .fetch_all(postgres_pool)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
pub async fn get_asset( pub async fn get_asset(
database_pool: &PgPool, postgres_pool: &PostgresPool,
symbol: &str, symbol: &str,
) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> { ) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(Asset, r#"SELECT id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol) query_as!(Asset, r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol)
.fetch_optional(database_pool) .fetch_optional(postgres_pool)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
pub async fn add_asset( pub async fn add_asset(
database_pool: &PgPool, postgres_pool: &PostgresPool,
asset: Asset, asset: Asset,
) -> Result<Asset, Box<dyn Error + Send + Sync>> { ) -> Result<Asset, Box<dyn Error + Send + Sync>> {
query_as!(Asset, r#"INSERT INTO assets (id, symbol, class, exchange, trading, date_added) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, asset.id, asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added) query_as!(Asset, r#"INSERT INTO assets (symbol, class, exchange, trading, date_added) VALUES ($1, $2::CLASS, $3::EXCHANGE, $4, $5) RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, asset.symbol, asset.class as Class, asset.exchange as Exchange, asset.trading, asset.date_added)
.fetch_one(database_pool) .fetch_one(postgres_pool)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
pub async fn update_asset_trading( pub async fn update_asset_trading(
database_pool: &PgPool, postgres_pool: &PostgresPool,
symbol: &str, symbol: &str,
trading: bool, trading: bool,
) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> { ) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(Asset, r#"UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, trading, symbol) query_as!(Asset, r#"UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, trading, symbol)
.fetch_optional(database_pool) .fetch_optional(postgres_pool)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
pub async fn delete_asset( pub async fn delete_asset(
database_pool: &PgPool, postgres_pool: &PostgresPool,
symbol: &str, symbol: &str,
) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> { ) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(Asset, r#"DELETE FROM assets WHERE symbol = $1 RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, symbol) query_as!(Asset, r#"DELETE FROM assets WHERE symbol = $1 RETURNING symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, symbol)
.fetch_optional(database_pool) .fetch_optional(postgres_pool)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
} }

View File

@@ -4,8 +4,8 @@ mod routes;
mod types; mod types;
use dotenv::dotenv; use dotenv::dotenv;
use pool::{create_alpaca_pool_from_env, create_database_pool_from_env}; use pool::{alpaca::create_alpaca_pool_from_env, postgres::create_postgres_pool_from_env};
use routes::initialize_api; use routes::run_api;
use std::error::Error; use std::error::Error;
use tokio::spawn; use tokio::spawn;
@@ -15,15 +15,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
log4rs::init_file("log4rs.yaml", Default::default()).unwrap(); log4rs::init_file("log4rs.yaml", Default::default()).unwrap();
let num_clients = 10; let num_clients = 10;
let database_pool = create_database_pool_from_env(num_clients).await?; let postgres_pool = create_postgres_pool_from_env(num_clients).await?;
let alpaca_pool = create_alpaca_pool_from_env(num_clients).await?; let alpaca_pool = create_alpaca_pool_from_env(num_clients).await?;
let mut threads = Vec::new(); let threads = vec![spawn(run_api(postgres_pool.clone(), alpaca_pool.clone()))];
threads.push(spawn(initialize_api(
database_pool.clone(),
alpaca_pool.clone(),
)));
for thread in threads { for thread in threads {
let _ = thread.await?; let _ = thread.await?;

View File

@@ -1,53 +0,0 @@
use apca::{ApiInfo, Client};
use deadpool::unmanaged::Pool;
use sqlx::postgres::PgPoolOptions;
use std::{env, error::Error};
pub type AlpacaPool = Pool<Client>;
pub async fn create_alpaca_pool(
apca_api_base_url: &str,
apca_api_key_id: &str,
apca_api_secret_key: &str,
num_clients: usize,
) -> Result<Pool<Client>, Box<dyn Error + Send + Sync>> {
let mut alpaca_clients = Vec::new();
for _ in 0..num_clients {
let client = Client::new(ApiInfo::from_parts(
apca_api_base_url,
apca_api_key_id,
apca_api_secret_key,
)?);
alpaca_clients.push(client);
}
Ok(Pool::from(alpaca_clients))
}
pub async fn create_alpaca_pool_from_env(
num_clients: usize,
) -> Result<Pool<Client>, Box<dyn Error + Send + Sync>> {
create_alpaca_pool(
&env::var("APCA_API_BASE_URL")?,
&env::var("APCA_API_KEY_ID")?,
&env::var("APCA_API_SECRET_KEY")?,
num_clients,
)
.await
}
pub async fn create_database_pool(
database_url: &str,
num_clients: usize,
) -> Result<sqlx::PgPool, Box<dyn Error + Send + Sync>> {
PgPoolOptions::new()
.max_connections(num_clients as u32)
.connect(database_url)
.await
.map_err(|e| e.into())
}
pub async fn create_database_pool_from_env(
num_clients: usize,
) -> Result<sqlx::PgPool, Box<dyn Error + Send + Sync>> {
create_database_pool(&env::var("DATABASE_URL")?, num_clients).await
}

View File

@@ -0,0 +1,98 @@
use apca::{ApiInfo, Client};
use async_trait::async_trait;
use deadpool::managed::{BuildError, Manager, Pool, RecycleResult};
use std::{env, error::Error};
pub struct AlpacaManager {
apca_api_base_url: String,
apca_api_key_id: String,
apca_api_secret_key: String,
}
impl AlpacaManager {
pub fn new(
apca_api_base_url: String,
apca_api_key_id: String,
apca_api_secret_key: String,
) -> Self {
Self {
apca_api_base_url,
apca_api_key_id,
apca_api_secret_key,
}
}
}
pub type AlpacaPool = Pool<AlpacaManager>;
#[async_trait]
impl Manager for AlpacaManager {
type Type = Client;
type Error = Box<dyn Error + Send + Sync>;
async fn create(&self) -> Result<Self::Type, Self::Error> {
let client = Client::new(ApiInfo::from_parts(
&self.apca_api_base_url,
&self.apca_api_key_id,
&self.apca_api_secret_key,
)?);
Ok(client)
}
async fn recycle(&self, _: &mut Self::Type) -> RecycleResult<Self::Error> {
Ok(())
}
}
pub async fn create_alpaca_client(
apca_api_base_url: &str,
apca_api_key_id: &str,
apca_api_secret_key: &str,
) -> Result<Client, Box<dyn Error + Send + Sync>> {
Ok(Client::new(ApiInfo::from_parts(
apca_api_base_url,
apca_api_key_id,
apca_api_secret_key,
)?))
}
pub async fn create_alpaca_client_from_env() -> Result<Client, Box<dyn Error + Send + Sync>> {
create_alpaca_client(
&env::var("APCA_API_BASE_URL")?,
&env::var("APCA_API_KEY_ID")?,
&env::var("APCA_API_SECRET_KEY")?,
)
.await
}
pub async fn create_alpaca_pool(
apca_api_base_url: &str,
apca_api_key_id: &str,
apca_api_secret_key: &str,
num_clients: usize,
) -> Result<AlpacaPool, Box<dyn Error + Send + Sync>> {
let manager = AlpacaManager::new(
apca_api_base_url.to_owned(),
apca_api_key_id.to_owned(),
apca_api_secret_key.to_owned(),
);
Pool::builder(manager)
.max_size(num_clients)
.build()
.map_err(|e| match e {
BuildError::Backend(e) => e,
BuildError::NoRuntimeSpecified(_) => unreachable!(),
})
}
pub async fn create_alpaca_pool_from_env(
num_clients: usize,
) -> Result<AlpacaPool, Box<dyn Error + Send + Sync>> {
create_alpaca_pool(
&env::var("APCA_API_BASE_URL")?,
&env::var("APCA_API_KEY_ID")?,
&env::var("APCA_API_SECRET_KEY")?,
num_clients,
)
.await
}

2
backend/src/pool/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod alpaca;
pub mod postgres;

View File

@@ -0,0 +1,21 @@
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::{env, error::Error};
pub type PostgresPool = PgPool;
pub async fn create_postgres_pool(
database_url: &str,
num_clients: usize,
) -> Result<PostgresPool, Box<dyn Error + Send + Sync>> {
PgPoolOptions::new()
.max_connections(num_clients as u32)
.connect(database_url)
.await
.map_err(|e| e.into())
}
pub async fn create_postgres_pool_from_env(
num_clients: usize,
) -> Result<PostgresPool, Box<dyn Error + Send + Sync>> {
create_postgres_pool(&env::var("DATABASE_URL")?, num_clients).await
}

View File

@@ -1,19 +1,17 @@
use crate::database; use crate::database;
use crate::database::assets::update_asset_trading; use crate::database::assets::update_asset_trading;
use crate::pool::AlpacaPool; use crate::pool::alpaca::AlpacaPool;
use crate::pool::postgres::PostgresPool;
use crate::types::{Asset, Class, Exchange}; use crate::types::{Asset, Class, Exchange};
use apca::api::v2::asset::{self, Symbol}; use apca::api::v2::asset::{self, Symbol};
use axum::{extract::Path, http::StatusCode, Extension, Json}; use axum::{extract::Path, http::StatusCode, Extension, Json};
use serde::Deserialize; use serde::Deserialize;
use sqlx::{ use sqlx::types::time::OffsetDateTime;
types::{time::OffsetDateTime, Uuid},
PgPool,
};
pub async fn get_assets( pub async fn get_assets(
Extension(database_pool): Extension<PgPool>, Extension(postgres_pool): Extension<PostgresPool>,
) -> Result<(StatusCode, Json<Vec<Asset>>), StatusCode> { ) -> Result<(StatusCode, Json<Vec<Asset>>), StatusCode> {
let assets = database::assets::get_assets(&database_pool) let assets = database::assets::get_assets(&postgres_pool)
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -21,10 +19,10 @@ pub async fn get_assets(
} }
pub async fn get_asset( pub async fn get_asset(
Extension(database_pool): Extension<PgPool>, Extension(postgres_pool): Extension<PostgresPool>,
Path(symbol): Path<String>, Path(symbol): Path<String>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> { ) -> Result<(StatusCode, Json<Asset>), StatusCode> {
let asset = database::assets::get_asset(&database_pool, &symbol) let asset = database::assets::get_asset(&postgres_pool, &symbol)
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -42,11 +40,11 @@ pub struct AddAssetRequest {
} }
pub async fn add_asset( pub async fn add_asset(
Extension(database_pool): Extension<PgPool>, Extension(postgres_pool): Extension<PostgresPool>,
Extension(alpaca_pool): Extension<AlpacaPool>, Extension(alpaca_pool): Extension<AlpacaPool>,
Json(request): Json<AddAssetRequest>, Json(request): Json<AddAssetRequest>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> { ) -> Result<(StatusCode, Json<Asset>), StatusCode> {
if database::assets::get_asset(&database_pool, &request.symbol) if database::assets::get_asset(&postgres_pool, &request.symbol)
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.is_some() .is_some()
@@ -60,11 +58,9 @@ pub async fn add_asset(
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.issue::<asset::Get>(&Symbol::Sym(request.symbol)) .issue::<asset::Get>(&Symbol::Sym(request.symbol))
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::NOT_FOUND)?;
let asset = Asset { let asset = Asset {
id: Uuid::parse_str(&asset.id.to_string())
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
symbol: asset.symbol, symbol: asset.symbol,
class: Class::from(asset.class) as Class, class: Class::from(asset.class) as Class,
exchange: Exchange::from(asset.exchange) as Exchange, exchange: Exchange::from(asset.exchange) as Exchange,
@@ -72,7 +68,7 @@ pub async fn add_asset(
date_added: OffsetDateTime::now_utc(), date_added: OffsetDateTime::now_utc(),
}; };
let asset = database::assets::add_asset(&database_pool, asset) let asset = database::assets::add_asset(&postgres_pool, asset)
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -86,11 +82,11 @@ pub struct UpdateAssetRequest {
} }
pub async fn update_asset( pub async fn update_asset(
Extension(database_pool): Extension<PgPool>, Extension(postgres_pool): Extension<PostgresPool>,
Path(symbol): Path<String>, Path(symbol): Path<String>,
Json(request): Json<UpdateAssetRequest>, Json(request): Json<UpdateAssetRequest>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> { ) -> Result<(StatusCode, Json<Asset>), StatusCode> {
let asset = update_asset_trading(&database_pool, &symbol, request.trading) let asset = update_asset_trading(&postgres_pool, &symbol, request.trading)
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -101,10 +97,10 @@ pub async fn update_asset(
} }
pub async fn delete_asset( pub async fn delete_asset(
Extension(database_pool): Extension<PgPool>, Extension(postgres_pool): Extension<PostgresPool>,
Path(symbol): Path<String>, Path(symbol): Path<String>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> { ) -> Result<(StatusCode, Json<Asset>), StatusCode> {
let asset = database::assets::delete_asset(&database_pool, &symbol) let asset = database::assets::delete_asset(&postgres_pool, &symbol)
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

View File

@@ -1,16 +1,15 @@
use crate::pool::AlpacaPool; use crate::pool::{alpaca::AlpacaPool, postgres::PostgresPool};
use axum::{ use axum::{
routing::{delete, get, post}, routing::{delete, get, post},
Extension, Router, Server, Extension, Router, Server,
}; };
use log::info; use log::info;
use sqlx::PgPool;
use std::net::SocketAddr; use std::net::SocketAddr;
pub mod assets; pub mod assets;
pub async fn initialize_api( pub async fn run_api(
database_pool: PgPool, postgres_pool: PostgresPool,
alpaca_pool: AlpacaPool, alpaca_pool: AlpacaPool,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let app = Router::new() let app = Router::new()
@@ -19,7 +18,7 @@ pub async fn initialize_api(
.route("/assets", post(assets::add_asset)) .route("/assets", post(assets::add_asset))
.route("/assets/:symbol", post(assets::update_asset)) .route("/assets/:symbol", post(assets::update_asset))
.route("/assets/:symbol", delete(assets::delete_asset)) .route("/assets/:symbol", delete(assets::delete_asset))
.layer(Extension(database_pool)) .layer(Extension(postgres_pool))
.layer(Extension(alpaca_pool)); .layer(Extension(alpaca_pool));
let addr = SocketAddr::from(([0, 0, 0, 0], 7878)); let addr = SocketAddr::from(([0, 0, 0, 0], 7878));

View File

@@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::{error::BoxDynError, types::Uuid, Decode, Encode, FromRow, Postgres, Type}; use sqlx::{error::BoxDynError, Decode, Encode, FromRow, Postgres, Type};
use std::ops::Deref; use std::ops::Deref;
use time::OffsetDateTime; use time::OffsetDateTime;
@@ -69,7 +69,6 @@ impl_apca_sqlx_traits!(
#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Asset { pub struct Asset {
pub id: Uuid,
pub symbol: String, pub symbol: String,
pub class: Class, pub class: Class,
pub exchange: Exchange, pub exchange: Exchange,
@@ -80,10 +79,10 @@ pub struct Asset {
#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Bar { pub struct Bar {
pub timestamp: OffsetDateTime, pub timestamp: OffsetDateTime,
pub asset_id: Uuid, pub asset_symbol: String,
pub open: f64, pub open: f64,
pub high: f64, pub high: f64,
pub low: f64, pub low: f64,
pub close: f64, pub close: f64,
pub volume: u64, pub volume: f64,
} }

View File

@@ -15,27 +15,24 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
); );
CREATE TABLE assets ( CREATE TABLE assets (
id UUID PRIMARY KEY, symbol TEXT PRIMARY KEY,
symbol VARCHAR(20) NOT NULL UNIQUE,
class CLASS NOT NULL, class CLASS NOT NULL,
exchange EXCHANGE NOT NULL, exchange EXCHANGE NOT NULL,
trading BOOLEAN NOT NULL DEFAULT FALSE, trading BOOLEAN NOT NULL DEFAULT FALSE,
date_added TIMESTAMPTZ NOT NULL DEFAULT NOW() date_added TIMESTAMPTZ NOT NULL DEFAULT NOW()
); );
CREATE INDEX assets_symbol_idx ON assets (symbol);
CREATE TABLE bars ( CREATE TABLE bars (
timestamp TIMESTAMPTZ NOT NULL, timestamp TIMESTAMPTZ NOT NULL,
asset_id UUID NOT NULL REFERENCES assets(id), asset_symbol TEXT NOT NULL REFERENCES assets(symbol),
open DOUBLE PRECISION NOT NULL, open DOUBLE PRECISION NOT NULL,
high DOUBLE PRECISION NOT NULL, high DOUBLE PRECISION NOT NULL,
low DOUBLE PRECISION NOT NULL, low DOUBLE PRECISION NOT NULL,
close DOUBLE PRECISION NOT NULL, close DOUBLE PRECISION NOT NULL,
volume DOUBLE PRECISION NOT NULL, volume DOUBLE PRECISION NOT NULL,
PRIMARY KEY (asset_id, timestamp), PRIMARY KEY (asset_symbol, timestamp),
FOREIGN KEY (asset_id) REFERENCES assets(id) FOREIGN KEY (asset_symbol) REFERENCES assets(symbol)
); );
SELECT create_hypertable('bars', 'timestamp', 'asset_id', 2); SELECT create_hypertable('bars', 'timestamp', 'asset_symbol', 2);
EOSQL EOSQL