Add RabbitMQ messaging

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-08-28 13:45:51 +03:00
parent fa509934ae
commit 9d3d51f23c
22 changed files with 1232 additions and 151 deletions

View File

@@ -1,15 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE assets SET trading = $1 WHERE symbol = $2",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Bool",
"Text"
]
},
"nullable": []
},
"hash": "2d06d5d904d93907cf5aed70eb11dc6c522d6e2b28feccd9fc49bcf10299033e"
}

View File

@@ -0,0 +1,80 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "symbol",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "class: Class",
"type_info": {
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
}
},
{
"ordinal": 3,
"name": "exchange: Exchange",
"type_info": {
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
}
},
{
"ordinal": 4,
"name": "trading",
"type_info": "Bool"
},
{
"ordinal": 5,
"name": "date_added",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Bool",
"Text"
]
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "3862cd81245aab9ea45b2926e5688c8355e8cc7718239dfec6e4a52d99e4e7bf"
}

View File

@@ -0,0 +1,79 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM assets WHERE symbol = $1 RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "symbol",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "class: Class",
"type_info": {
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
}
},
{
"ordinal": 3,
"name": "exchange: Exchange",
"type_info": {
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
}
},
{
"ordinal": 4,
"name": "trading",
"type_info": "Bool"
},
{
"ordinal": 5,
"name": "date_added",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "3b4052f53e87d2321c88c61f14e26d3867c8f56564b9c49793c688c707a772c8"
}

View File

@@ -0,0 +1,111 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO assets (id, symbol, class, exchange, trading, date_added) VALUES ($1, $2, $3::CLASS, $4::EXCHANGE, $5, $6) RETURNING id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "symbol",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "class: Class",
"type_info": {
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
}
},
{
"ordinal": 3,
"name": "exchange: Exchange",
"type_info": {
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
}
},
{
"ordinal": 4,
"name": "trading",
"type_info": "Bool"
},
{
"ordinal": 5,
"name": "date_added",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Uuid",
"Varchar",
{
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
},
{
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
},
"Bool",
"Timestamptz"
]
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "60473446809d8d5a8d13ad0fe94c7420b716a4529da589c6698874f8836f89aa"
}

View File

@@ -58,7 +58,7 @@
{
"ordinal": 5,
"name": "date_added",
"type_info": "Timestamp"
"type_info": "Timestamptz"
}
],
"parameters": {

View File

@@ -1,45 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO assets (id, symbol, class, exchange, trading) VALUES ($1, $2, $3::CLASS, $4::EXCHANGE, $5)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Varchar",
{
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
},
{
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
},
"Bool"
]
},
"nullable": []
},
"hash": "82ee2837924b35ae4cce242c22f9e1f1c4be7ed8b5d5ba64e0ce74d775f8f794"
}

View File

@@ -1,14 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM assets WHERE symbol = $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text"
]
},
"nullable": []
},
"hash": "919f09985c1568dfc2f8cc3c693503b677327b5fd77acb19edd3440e26402fb7"
}

View File

@@ -58,7 +58,7 @@
{
"ordinal": 5,
"name": "date_added",
"type_info": "Timestamp"
"type_info": "Timestamptz"
}
],
"parameters": {

734
backend/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -7,5 +7,6 @@ RUN cargo build --release
FROM frolvlad/alpine-glibc AS assets
WORKDIR /usr/src/assets
COPY --from=builder /usr/src/qrust/target/release/assets .
COPY log4rs.yaml .
EXPOSE 7878
CMD ["./assets"]

View File

@@ -25,3 +25,6 @@ deadpool = { version = "0.9.5", features = [
] }
serde = "1.0.188"
log = "0.4.20"
lapin = "2.3.1"
serde_json = "1.0.105"
log4rs = "1.2.0"

View File

@@ -2,26 +2,46 @@ use axum::{
routing::{delete, get, post},
Extension, Router, Server,
};
use common::alpaca::create_alpaca_pool;
use common::pool::{
create_alpaca_pool_from_env, create_database_pool_from_env, create_rabbitmq_pool_from_env,
};
use deadpool::managed::{Hook, HookError, HookErrorCause};
use dotenv::dotenv;
use lapin::ExchangeKind;
use log::info;
use sqlx::PgPool;
use std::{env, error::Error, net::SocketAddr};
use std::{error::Error, net::SocketAddr};
mod routes;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
dotenv().ok();
log4rs::init_file("log4rs.yaml", Default::default()).unwrap();
let database_pool = PgPool::connect(&env::var("DATABASE_URL").unwrap()).await?;
let alpaca_pool = create_alpaca_pool(
&env::var("APCA_API_BASE_URL").unwrap(),
&env::var("APCA_API_KEY_ID").unwrap(),
&env::var("APCA_API_SECRET_KEY").unwrap(),
10,
)?;
let num_clients = 10;
let database_pool = create_database_pool_from_env(num_clients).await?;
let alpaca_pool = create_alpaca_pool_from_env(num_clients).await?;
let rabbitmq_pool = create_rabbitmq_pool_from_env(
num_clients,
Hook::async_fn(|connection: &mut lapin::Connection, _| {
Box::pin(async move {
connection
.create_channel()
.await
.map_err(|e| HookError::Abort(HookErrorCause::Backend(e)))?
.exchange_declare(
"assets",
ExchangeKind::Topic,
Default::default(),
Default::default(),
)
.await
.map_err(|e| HookError::Abort(HookErrorCause::Backend(e)))?;
Ok(())
})
}),
)
.await?;
let app = Router::new()
.route("/assets", get(routes::get_assets))
@@ -30,7 +50,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.route("/assets/:symbol", post(routes::update_asset))
.route("/assets/:symbol", delete(routes::delete_asset))
.layer(Extension(database_pool))
.layer(Extension(alpaca_pool));
.layer(Extension(alpaca_pool))
.layer(Extension(rabbitmq_pool));
let addr = SocketAddr::from(([0, 0, 0, 0], 7878));
info!("Listening on {}...", addr);

View File

@@ -1,11 +1,15 @@
use apca::api::v2::asset::{self, Symbol};
use axum::{extract::Path, http::StatusCode, Extension, Json};
use common::{
alpaca::AlpacaPool,
database::{Asset, Class, Exchange},
pool::{AlpacaPool, RabbitmqPool},
};
use serde::Deserialize;
use sqlx::{query, query_as, types::Uuid, PgPool};
use sqlx::{
query_as,
types::{time::OffsetDateTime, Uuid},
PgPool,
};
pub async fn get_assets(
Extension(database_pool): Extension<PgPool>,
@@ -40,6 +44,7 @@ pub struct AddAssetRequest {
pub async fn add_asset(
Extension(database_pool): Extension<PgPool>,
Extension(alpaca_pool): Extension<AlpacaPool>,
Extension(rabbitmq_pool): Extension<RabbitmqPool>,
Json(request): Json<AddAssetRequest>,
) -> Result<StatusCode, StatusCode> {
if query_as!(Asset, r#"SELECT id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, request.symbol)
@@ -58,17 +63,36 @@ pub async fn add_asset(
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
query!(
r#"INSERT INTO assets (id, symbol, class, exchange, trading) VALUES ($1, $2, $3::CLASS, $4::EXCHANGE, $5)"#,
Uuid::parse_str(&asset.id.to_string()).unwrap(),
let asset = query_as!(
Asset,
r#"INSERT INTO assets (id, symbol, class, exchange, trading, date_added) VALUES ($1, $2, $3::CLASS, $4::EXCHANGE, $5, $6) RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#,
Uuid::parse_str(&asset.id.to_string()).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
asset.symbol,
Class::from(asset.class) as Class,
Exchange::from(asset.exchange) as Exchange,
request.trading
request.trading.unwrap_or(false),
OffsetDateTime::now_utc(),
)
.execute(&database_pool)
.fetch_one(&database_pool)
.await
.unwrap();
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
rabbitmq_pool
.get()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.create_channel()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.basic_publish(
"assets",
"assets.added",
Default::default(),
&serde_json::to_vec(&asset).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
Default::default(),
)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(StatusCode::CREATED)
}
@@ -81,28 +105,64 @@ pub struct UpdateAssetRequest {
pub async fn update_asset(
Extension(database_pool): Extension<PgPool>,
Extension(rabbitmq_pool): Extension<RabbitmqPool>,
Path(symbol): Path<String>,
Json(request): Json<UpdateAssetRequest>,
) -> Result<StatusCode, StatusCode> {
query_as!(
let asset = query_as!(
Asset,
r#"UPDATE assets SET trading = $1 WHERE symbol = $2"#,
r#"UPDATE assets SET trading = $1 WHERE symbol = $2 RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#,
request.trading,
symbol
)
.execute(&database_pool)
.fetch_one(&database_pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
rabbitmq_pool
.get()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.create_channel()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.basic_publish(
"assets",
"assets.updated",
Default::default(),
&serde_json::to_vec(&asset).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
Default::default(),
)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(StatusCode::OK)
}
pub async fn delete_asset(
Extension(database_pool): Extension<PgPool>,
Extension(rabbitmq_pool): Extension<RabbitmqPool>,
Path(symbol): Path<String>,
) -> Result<StatusCode, StatusCode> {
query!(r#"DELETE FROM assets WHERE symbol = $1"#, symbol)
.execute(&database_pool)
let asset = query_as!(Asset, r#"DELETE FROM assets WHERE symbol = $1 RETURNING id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added"#, symbol)
.fetch_one(&database_pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
rabbitmq_pool
.get()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.create_channel()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.basic_publish(
"assets",
"assets.deleted",
Default::default(),
&serde_json::to_vec(&asset).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
Default::default(),
)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

View File

@@ -1,11 +1,21 @@
[package]
name = "common"
name = "common"
version = "0.1.0"
edition = "2021"
[dependencies]
lapin = "2.3.1"
apca = "0.27.2"
deadpool = "0.9.5"
serde = { version = "1.0.188", features = ["derive"] }
sqlx = { version = "0.7.1", features = ["uuid", "time", "postgres"] }
time = { version = "0.3.27", features = ["serde"] }
deadpool-lapin = "0.10.0"
serde = { version = "1.0.188", features = [
"derive",
] }
sqlx = { version = "0.7.1", features = [
"uuid",
"time",
"postgres",
] }
time = { version = "0.3.27", features = [
"serde",
] }

View File

@@ -1,23 +0,0 @@
use apca::{ApiInfo, Client};
use deadpool::unmanaged::Pool;
use std::error::Error;
pub type AlpacaPool = Pool<Client>;
pub fn create_alpaca_pool(
apca_api_base_url: &str,
apca_api_key_id: &str,
apca_api_secret_key: &str,
num_clients: usize,
) -> Result<Pool<Client>, Box<dyn Error + Send + Sync>> {
let mut alpaca_clients = Vec::new();
for _ in 0..num_clients {
let client = Client::new(ApiInfo::from_parts(
apca_api_base_url,
apca_api_key_id,
apca_api_secret_key,
)?);
alpaca_clients.push(client);
}
Ok(Pool::from(alpaca_clients))
}

View File

@@ -1,11 +1,11 @@
use serde::{Deserialize, Serialize};
use sqlx::{error::BoxDynError, types::Uuid, Decode, Encode, FromRow, Postgres, Type};
use std::ops::Deref;
use time::PrimitiveDateTime;
use time::OffsetDateTime;
macro_rules! impl_apca_sqlx_traits {
($outer_type:ident, $inner_type:path, $fallback:expr) => {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, Copy, PartialEq, Serialize, Deserialize)]
pub struct $outer_type($inner_type);
impl Deref for $outer_type {
@@ -43,7 +43,7 @@ macro_rules! impl_apca_sqlx_traits {
&self,
buf: &mut <Postgres as sqlx::database::HasArguments<'_>>::ArgumentBuffer,
) -> sqlx::encode::IsNull {
<String as Encode<Postgres>>::encode_by_ref(&self.0.as_ref().to_owned(), buf)
<String as Encode<Postgres>>::encode_by_ref(&self.0.as_ref().into(), buf)
}
}
@@ -74,13 +74,13 @@ pub struct Asset {
pub class: Class,
pub exchange: Exchange,
pub trading: bool,
pub date_added: PrimitiveDateTime,
pub date_added: OffsetDateTime,
}
#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Bar {
pub timestamp: PrimitiveDateTime,
pub symbol_id: Uuid,
pub timestamp: OffsetDateTime,
pub asset_id: Uuid,
pub open: f64,
pub high: f64,
pub low: f64,

View File

@@ -1,2 +1,2 @@
pub mod alpaca;
pub mod pool;
pub mod database;

View File

@@ -0,0 +1,79 @@
use apca::{ApiInfo, Client};
use deadpool::{unmanaged::Pool, Runtime};
use deadpool_lapin::Hook;
use sqlx::postgres::PgPoolOptions;
use std::{env, error::Error};
pub type AlpacaPool = Pool<Client>;
pub async fn create_alpaca_pool(
apca_api_base_url: &str,
apca_api_key_id: &str,
apca_api_secret_key: &str,
num_clients: usize,
) -> Result<Pool<Client>, Box<dyn Error + Send + Sync>> {
let mut alpaca_clients = Vec::new();
for _ in 0..num_clients {
let client = Client::new(ApiInfo::from_parts(
apca_api_base_url,
apca_api_key_id,
apca_api_secret_key,
)?);
alpaca_clients.push(client);
}
Ok(Pool::from(alpaca_clients))
}
pub async fn create_alpaca_pool_from_env(
num_clients: usize,
) -> Result<Pool<Client>, Box<dyn Error + Send + Sync>> {
create_alpaca_pool(
&env::var("APCA_API_BASE_URL")?,
&env::var("APCA_API_KEY_ID")?,
&env::var("APCA_API_SECRET_KEY")?,
num_clients,
)
.await
}
pub async fn create_database_pool(
database_url: &str,
num_clients: usize,
) -> Result<sqlx::PgPool, Box<dyn Error + Send + Sync>> {
PgPoolOptions::new()
.max_connections(num_clients as u32)
.connect(database_url)
.await
.map_err(|e| e.into())
}
pub async fn create_database_pool_from_env(
num_clients: usize,
) -> Result<sqlx::PgPool, Box<dyn Error + Send + Sync>> {
create_database_pool(&env::var("DATABASE_URL")?, num_clients).await
}
pub type RabbitmqPool = deadpool_lapin::Pool;
pub async fn create_rabbitmq_pool(
rabbitmq_url: &str,
num_clients: usize,
post_create: impl Into<Hook>,
) -> Result<RabbitmqPool, Box<dyn Error + Send + Sync>> {
deadpool_lapin::Config {
url: Some(rabbitmq_url.into()),
..Default::default()
}
.builder(Some(Runtime::Tokio1))
.max_size(num_clients)
.post_create(post_create)
.build()
.map_err(|e| e.into())
}
pub async fn create_rabbitmq_pool_from_env(
num_clients: usize,
post_create: impl Into<Hook>,
) -> Result<deadpool_lapin::Pool, Box<dyn Error + Send + Sync>> {
create_rabbitmq_pool(&env::var("RABBITMQ_URL")?, num_clients, post_create).await
}

8
backend/log4rs.yaml Normal file
View File

@@ -0,0 +1,8 @@
appenders:
stdout:
kind: console
root:
level: info
appenders:
- stdout

View File

@@ -3,22 +3,16 @@ services:
extends:
file: support/timescaledb/docker-compose.yml
service: timescaledb
profiles:
- support
rabbitmq:
extends:
file: support/rabbitmq/docker-compose.yml
service: rabbitmq
profiles:
- support
nginx:
extends:
file: support/nginx/docker-compose.yml
service: nginx
profiles:
- support
assets:
extends:

View File

@@ -20,8 +20,22 @@ psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
class CLASS NOT NULL,
exchange EXCHANGE NOT NULL,
trading BOOLEAN NOT NULL DEFAULT FALSE,
date_added TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW()
date_added TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX assets_symbol_idx ON assets (symbol);
CREATE TABLE bars (
timestamp TIMESTAMPTZ NOT NULL,
asset_id UUID NOT NULL REFERENCES assets(id),
open DOUBLE PRECISION NOT NULL,
high DOUBLE PRECISION NOT NULL,
low DOUBLE PRECISION NOT NULL,
close DOUBLE PRECISION NOT NULL,
volume DOUBLE PRECISION NOT NULL,
PRIMARY KEY (asset_id, timestamp),
FOREIGN KEY (asset_id) REFERENCES assets(id)
);
SELECT create_hypertable('bars', 'timestamp', 'asset_id', 2);
EOSQL

View File

@@ -8,7 +8,7 @@ services:
volumes:
- timescaledb-data:/home/postgres/pgdata/data
- timescaledb-logs:/home/postgres/pg_log
- ./9999-init.sh:/docker-entrypoint-initdb.d/9999-init.sh
- ./999_init.sh:/docker-entrypoint-initdb.d/999_init.sh
environment:
- TIMESCALEDB_TELEMETRY=off
- POSTGRES_USER=${POSTGRES_USER}