Add assets microservice base

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-08-28 09:17:26 +03:00
parent 068f2d8601
commit 8544fc79f5
22 changed files with 3010 additions and 1 deletions

2
.gitignore vendored
View File

@@ -9,4 +9,4 @@ target/
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
.env
.env*

14
backend/.dockerignore Normal file
View File

@@ -0,0 +1,14 @@
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
.env*
Dockerfile
.dockerignore

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE assets SET trading = $1 WHERE symbol = $2",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Bool",
"Text"
]
},
"nullable": []
},
"hash": "2d06d5d904d93907cf5aed70eb11dc6c522d6e2b28feccd9fc49bcf10299033e"
}

View File

@@ -0,0 +1,79 @@
{
"db_name": "PostgreSQL",
"query": "SELECT id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE symbol = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "symbol",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "class: Class",
"type_info": {
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
}
},
{
"ordinal": 3,
"name": "exchange: Exchange",
"type_info": {
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
}
},
{
"ordinal": 4,
"name": "trading",
"type_info": "Bool"
},
{
"ordinal": 5,
"name": "date_added",
"type_info": "Timestamp"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "798c33653855952903818bcae8371831bffd7fec02c622d60308471be02b98c7"
}

View File

@@ -0,0 +1,45 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO assets (id, symbol, class, exchange, trading) VALUES ($1, $2, $3::CLASS, $4::EXCHANGE, $5)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Varchar",
{
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
},
{
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
},
"Bool"
]
},
"nullable": []
},
"hash": "82ee2837924b35ae4cce242c22f9e1f1c4be7ed8b5d5ba64e0ce74d775f8f794"
}

View File

@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM assets WHERE symbol = $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text"
]
},
"nullable": []
},
"hash": "919f09985c1568dfc2f8cc3c693503b677327b5fd77acb19edd3440e26402fb7"
}

View File

@@ -0,0 +1,77 @@
{
"db_name": "PostgreSQL",
"query": "SELECT id, symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "symbol",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "class: Class",
"type_info": {
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
}
},
{
"ordinal": 3,
"name": "exchange: Exchange",
"type_info": {
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
}
},
{
"ordinal": 4,
"name": "trading",
"type_info": "Bool"
},
{
"ordinal": 5,
"name": "date_added",
"type_info": "Timestamp"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "98f6c13cc69f660a1746a6951fac28a79ed91d04216a847dd5d358df3e6e24ee"
}

2381
backend/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

7
backend/Cargo.toml Normal file
View File

@@ -0,0 +1,7 @@
[workspace]
resolver = "2"
members = [
"common",
"assets",
]

11
backend/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM rust AS builder
WORKDIR /usr/src/qrust
COPY . .
ENV SQLX_OFFLINE true
RUN cargo build --release
FROM frolvlad/alpine-glibc AS assets
WORKDIR /usr/src/assets
COPY --from=builder /usr/src/qrust/target/release/assets .
EXPOSE 7878
CMD ["./assets"]

26
backend/assets/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[package]
name = "assets"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
common = { path = "../common" }
apca = "0.27.2"
axum = "0.6.20"
dotenv = "0.15.0"
sqlx = { version = "0.7.1", features = [
"uuid",
"time",
"postgres",
"runtime-tokio",
] }
tokio = { version = "1.32.0", features = [
"macros",
"rt-multi-thread",
] }
deadpool = { version = "0.9.5", features = [
"rt_tokio_1",
] }
serde = "1.0.188"

View File

@@ -0,0 +1,8 @@
services:
assets:
build:
context: ..
dockerfile: Dockerfile
target: assets
hostname: assets
restart: unless-stopped

View File

@@ -0,0 +1,39 @@
use axum::{
routing::{delete, get, post},
Extension, Router, Server,
};
use common::alpaca::create_alpaca_pool;
use dotenv::dotenv;
use sqlx::PgPool;
use std::{env, error::Error, net::SocketAddr};
mod routes;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
dotenv().ok();
let database_pool = PgPool::connect(&env::var("DATABASE_URL").unwrap()).await?;
let alpaca_pool = create_alpaca_pool(
&env::var("APCA_API_BASE_URL").unwrap(),
&env::var("APCA_API_KEY_ID").unwrap(),
&env::var("APCA_API_SECRET_KEY").unwrap(),
10,
)?;
let app = Router::new()
.route("/assets", get(routes::get_assets))
.route("/assets/:symbol", get(routes::get_asset))
.route("/assets", post(routes::add_asset))
.route("/assets/:symbol", post(routes::update_asset))
.route("/assets/:symbol", delete(routes::delete_asset))
.layer(Extension(database_pool))
.layer(Extension(alpaca_pool));
let addr = SocketAddr::from(([0, 0, 0, 0], 7878));
println!("Listening on {}...", addr);
Server::bind(&addr).serve(app.into_make_service()).await?;
Ok(())
}

View File

@@ -0,0 +1,110 @@
use apca::api::v2::asset::{self, Symbol};
use axum::{extract::Path, http::StatusCode, Extension, Json};
use common::{
alpaca::AlpacaPool,
database::{Asset, Class, Exchange},
};
use serde::Deserialize;
use sqlx::{query, query_as, types::Uuid, PgPool};
pub async fn get_assets(
Extension(database_pool): Extension<PgPool>,
) -> Result<(StatusCode, Json<Vec<Asset>>), StatusCode> {
let assets = query_as!(Asset, r#"SELECT id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets"#)
.fetch_all(&database_pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok((StatusCode::OK, Json(assets)))
}
pub async fn get_asset(
Extension(database_pool): Extension<PgPool>,
Path(symbol): Path<String>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> {
let asset = query_as!(Asset, r#"SELECT id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, symbol)
.fetch_one(&database_pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok((StatusCode::OK, Json(asset)))
}
#[allow(dead_code)]
#[derive(Deserialize)]
pub struct AddAssetRequest {
symbol: String,
trading: Option<bool>,
}
pub async fn add_asset(
Extension(database_pool): Extension<PgPool>,
Extension(alpaca_pool): Extension<AlpacaPool>,
Json(request): Json<AddAssetRequest>,
) -> Result<StatusCode, StatusCode> {
if query_as!(Asset, r#"SELECT id, symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE symbol = $1"#, request.symbol)
.fetch_optional(&database_pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.is_some() {
return Err(StatusCode::CONFLICT);
}
let asset = alpaca_pool
.get()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.issue::<asset::Get>(&Symbol::Sym(request.symbol))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
query!(
r#"INSERT INTO assets (id, symbol, class, exchange, trading) VALUES ($1, $2, $3::CLASS, $4::EXCHANGE, $5)"#,
Uuid::parse_str(&asset.id.to_string()).unwrap(),
asset.symbol,
Class::from(asset.class) as Class,
Exchange::from(asset.exchange) as Exchange,
request.trading
)
.execute(&database_pool)
.await
.unwrap();
Ok(StatusCode::CREATED)
}
#[allow(dead_code)]
#[derive(Deserialize)]
pub struct UpdateAssetRequest {
trading: bool,
}
pub async fn update_asset(
Extension(database_pool): Extension<PgPool>,
Path(symbol): Path<String>,
Json(request): Json<UpdateAssetRequest>,
) -> Result<StatusCode, StatusCode> {
query_as!(
Asset,
r#"UPDATE assets SET trading = $1 WHERE symbol = $2"#,
request.trading,
symbol
)
.execute(&database_pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(StatusCode::OK)
}
pub async fn delete_asset(
Extension(database_pool): Extension<PgPool>,
Path(symbol): Path<String>,
) -> Result<StatusCode, StatusCode> {
query!(r#"DELETE FROM assets WHERE symbol = $1"#, symbol)
.execute(&database_pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(StatusCode::NO_CONTENT)
}

11
backend/common/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "common"
version = "0.1.0"
edition = "2021"
[dependencies]
apca = "0.27.2"
deadpool = "0.9.5"
serde = { version = "1.0.188", features = ["derive"] }
sqlx = { version = "0.7.1", features = ["uuid", "time", "postgres"] }
time = { version = "0.3.27", features = ["serde"] }

View File

@@ -0,0 +1,23 @@
use apca::{ApiInfo, Client};
use deadpool::unmanaged::Pool;
use std::error::Error;
pub type AlpacaPool = Pool<Client>;
pub fn create_alpaca_pool(
apca_api_base_url: &str,
apca_api_key_id: &str,
apca_api_secret_key: &str,
num_clients: usize,
) -> Result<Pool<Client>, Box<dyn Error + Send + Sync>> {
let mut alpaca_clients = Vec::new();
for _ in 0..num_clients {
let client = Client::new(ApiInfo::from_parts(
apca_api_base_url,
apca_api_key_id,
apca_api_secret_key,
)?);
alpaca_clients.push(client);
}
Ok(Pool::from(alpaca_clients))
}

View File

@@ -0,0 +1,89 @@
use serde::{Deserialize, Serialize};
use sqlx::{error::BoxDynError, types::Uuid, Decode, Encode, FromRow, Postgres, Type};
use std::ops::Deref;
use time::PrimitiveDateTime;
macro_rules! impl_apca_sqlx_traits {
($outer_type:ident, $inner_type:path, $fallback:expr) => {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct $outer_type($inner_type);
impl Deref for $outer_type {
type Target = $inner_type;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<$inner_type> for $outer_type {
fn from(inner: $inner_type) -> Self {
$outer_type(inner)
}
}
impl From<String> for $outer_type {
fn from(s: String) -> Self {
s.parse().unwrap_or($fallback).into()
}
}
impl Decode<'_, Postgres> for $outer_type {
fn decode(
value: <Postgres as sqlx::database::HasValueRef<'_>>::ValueRef,
) -> Result<Self, BoxDynError> {
Ok($outer_type::from(<String as Decode<Postgres>>::decode(
value,
)?))
}
}
impl Encode<'_, Postgres> for $outer_type {
fn encode_by_ref(
&self,
buf: &mut <Postgres as sqlx::database::HasArguments<'_>>::ArgumentBuffer,
) -> sqlx::encode::IsNull {
<String as Encode<Postgres>>::encode_by_ref(&self.0.as_ref().to_owned(), buf)
}
}
impl Type<Postgres> for $outer_type {
fn type_info() -> <Postgres as sqlx::Database>::TypeInfo {
<String as Type<Postgres>>::type_info()
}
}
};
}
impl_apca_sqlx_traits!(
Class,
apca::api::v2::asset::Class,
apca::api::v2::asset::Class::Unknown
);
impl_apca_sqlx_traits!(
Exchange,
apca::api::v2::asset::Exchange,
apca::api::v2::asset::Exchange::Unknown
);
#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Asset {
pub id: Uuid,
pub symbol: String,
pub class: Class,
pub exchange: Exchange,
pub trading: bool,
pub date_added: PrimitiveDateTime,
}
#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Bar {
pub timestamp: PrimitiveDateTime,
pub symbol_id: Uuid,
pub open: f64,
pub high: f64,
pub low: f64,
pub close: f64,
pub volume: u64,
}

View File

@@ -0,0 +1,2 @@
pub mod alpaca;
pub mod database;

View File

@@ -3,16 +3,34 @@ services:
extends:
file: support/timescaledb/docker-compose.yml
service: timescaledb
profiles:
- support
rabbitmq:
extends:
file: support/rabbitmq/docker-compose.yml
service: rabbitmq
profiles:
- support
nginx:
extends:
file: support/nginx/docker-compose.yml
service: nginx
profiles:
- support
assets:
extends:
file: backend/assets/docker-compose.yml
service: assets
env_file:
- .env.docker
depends_on:
- timescaledb
- rabbitmq
profiles:
- backend
volumes:
timescaledb-data:

View File

@@ -1,3 +1,15 @@
server {
listen 80;
location /assets {
resolver 127.0.0.11;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
set $upstream_proto http;
set $upstream_host assets;
set $upstream_port 7878;
proxy_pass $upstream_proto://$upstream_host:$upstream_port;
}
}

View File

@@ -0,0 +1,27 @@
#!/bin/bash
psql --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
CREATE TYPE CLASS AS ENUM ('us_equity', 'crypto', 'unknown');
CREATE TYPE EXCHANGE AS ENUM (
'AMEX',
'ARCA',
'BATS',
'NASDAQ',
'NYSE',
'NYSEARCA',
'OTC',
'unknown'
);
CREATE TABLE assets (
id UUID PRIMARY KEY,
symbol VARCHAR(20) NOT NULL UNIQUE,
class CLASS NOT NULL,
exchange EXCHANGE NOT NULL,
trading BOOLEAN NOT NULL DEFAULT FALSE,
date_added TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE INDEX assets_symbol_idx ON assets (symbol);
EOSQL

View File

@@ -8,6 +8,7 @@ services:
volumes:
- timescaledb-data:/home/postgres/pgdata/data
- timescaledb-logs:/home/postgres/pg_log
- ./9999-init.sh:/docker-entrypoint-initdb.d/9999-init.sh
environment:
- TIMESCALEDB_TELEMETRY=off
- POSTGRES_USER=${POSTGRES_USER}