Remove apca dependency

Signed-off-by: Nikolaos Karaolidis <nick@karaolidis.com>
This commit is contained in:
2023-09-02 19:18:14 +03:00
parent d636606285
commit 203b028d7c
37 changed files with 760 additions and 894 deletions

View File

@@ -17,8 +17,7 @@
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
"crypto"
]
}
}
@@ -39,7 +38,7 @@
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
"CRYPTO"
]
}
}

View File

@@ -17,8 +17,7 @@
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
"crypto"
]
}
}
@@ -39,7 +38,7 @@
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
"CRYPTO"
]
}
}

View File

@@ -17,8 +17,7 @@
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
"crypto"
]
}
}
@@ -39,7 +38,7 @@
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
"CRYPTO"
]
}
}

View File

@@ -17,8 +17,7 @@
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
"crypto"
]
}
}
@@ -39,7 +38,7 @@
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
"CRYPTO"
]
}
}
@@ -65,8 +64,7 @@
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
"crypto"
]
}
}
@@ -83,7 +81,7 @@
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
"CRYPTO"
]
}
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume)\n SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[])\n RETURNING timestamp, asset_symbol, open, high, low, close, volume",
"query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted)\n SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[])\n RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted",
"describe": {
"columns": [
{
@@ -37,6 +37,16 @@
"ordinal": 6,
"name": "volume",
"type_info": "Float8"
},
{
"ordinal": 7,
"name": "num_trades",
"type_info": "Int8"
},
{
"ordinal": 8,
"name": "volume_weighted",
"type_info": "Float8"
}
],
"parameters": {
@@ -47,6 +57,8 @@
"Float8Array",
"Float8Array",
"Float8Array",
"Float8Array",
"Int8Array",
"Float8Array"
]
},
@@ -57,8 +69,10 @@
false,
false,
false,
false,
false,
false
]
},
"hash": "e1dcfdc44f4d322c33d10828124d864b5b1087c2d07f385a309a7b0fcb4c9c6d"
"hash": "b940befc2fbef48069c41f18485a2b6b3e523ee3106af735235701a5a151a29f"
}

View File

@@ -17,8 +17,7 @@
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
"crypto"
]
}
}
@@ -39,7 +38,7 @@
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
"CRYPTO"
]
}
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = 'crypto'",
"query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = $1::CLASS",
"describe": {
"columns": [
{
@@ -17,8 +17,7 @@
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
"crypto"
]
}
}
@@ -39,7 +38,7 @@
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
"CRYPTO"
]
}
}
@@ -57,7 +56,19 @@
}
],
"parameters": {
"Left": []
"Left": [
{
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto"
]
}
}
}
]
},
"nullable": [
false,
@@ -67,5 +78,5 @@
false
]
},
"hash": "826f5f5b55cd00d274bb38e5d5c2fff68b4bf970c1508ce7038004d6404d7f4e"
"hash": "d1e9b79a4bb2651b4dde42770576a2776f5881039c8f17c04747770a5bf97214"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume) VALUES ($1, $2, $3, $4, $5, $6, $7)\n RETURNING timestamp, asset_symbol, open, high, low, close, volume",
"query": "INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)\n ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9\n RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted",
"describe": {
"columns": [
{
@@ -37,6 +37,16 @@
"ordinal": 6,
"name": "volume",
"type_info": "Float8"
},
{
"ordinal": 7,
"name": "num_trades",
"type_info": "Int8"
},
{
"ordinal": 8,
"name": "volume_weighted",
"type_info": "Float8"
}
],
"parameters": {
@@ -47,6 +57,8 @@
"Float8",
"Float8",
"Float8",
"Float8",
"Int8",
"Float8"
]
},
@@ -57,8 +69,10 @@
false,
false,
false,
false,
false,
false
]
},
"hash": "e963b6055e28dec14f5e8f82738481327371c97175939a58de8cf54f72fa57ad"
"hash": "ece42c3a72569b95f1b0d77faffe71bf99e5d92a7ee1e5c13090706afde9147c"
}

View File

@@ -1,71 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT symbol, class as \"class: Class\", exchange as \"exchange: Exchange\", trading, date_added FROM assets WHERE class = 'us_equity'",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "symbol",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "class: Class",
"type_info": {
"Custom": {
"name": "class",
"kind": {
"Enum": [
"us_equity",
"crypto",
"unknown"
]
}
}
}
},
{
"ordinal": 2,
"name": "exchange: Exchange",
"type_info": {
"Custom": {
"name": "exchange",
"kind": {
"Enum": [
"AMEX",
"ARCA",
"BATS",
"NASDAQ",
"NYSE",
"NYSEARCA",
"OTC",
"unknown"
]
}
}
}
},
{
"ordinal": 3,
"name": "trading",
"type_info": "Bool"
},
{
"ordinal": 4,
"name": "date_added",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false,
false,
false
]
},
"hash": "f00346add91af120daa4930f3c92b0d96742546d15943c85c594187139516d0b"
}

314
backend/Cargo.lock generated
View File

@@ -56,54 +56,12 @@ version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]]
name = "apca"
version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bed93cbc521cf474aafc4ae672130d86662117f29d4f74eed7d9a8851502256c"
dependencies = [
"async-compression",
"async-trait",
"chrono",
"futures",
"http",
"http-endpoint",
"hyper",
"hyper-tls",
"num-decimal",
"serde",
"serde_json",
"serde_urlencoded",
"serde_variant",
"thiserror",
"tokio",
"tokio-tungstenite",
"tracing",
"tracing-futures",
"url",
"uuid",
"websocket-util",
]
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "async-compression"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d495b6dc0184693324491a5ac05f559acc97bf937ab31d7a1c33dd0016be6d2b"
dependencies = [
"flate2",
"futures-core",
"futures-io",
"memchr",
"pin-project-lite",
]
[[package]]
name = "async-trait"
version = "0.1.73"
@@ -183,21 +141,19 @@ dependencies = [
name = "backend"
version = "0.1.0"
dependencies = [
"apca",
"async-trait",
"axum",
"deadpool",
"dotenv",
"futures",
"futures-util",
"http",
"log",
"log4rs",
"reqwest",
"serde",
"serde_json",
"sqlx",
"time 0.3.28",
"tokio",
"websocket-util",
"tokio-tungstenite",
]
[[package]]
@@ -215,12 +171,6 @@ dependencies = [
"rustc-demangle",
]
[[package]]
name = "base64"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.21.3"
@@ -300,7 +250,6 @@ dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"time 0.1.45",
"wasm-bindgen",
"windows-targets",
@@ -352,15 +301,6 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]]
name = "crc32fast"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
@@ -391,26 +331,10 @@ dependencies = [
]
[[package]]
name = "deadpool"
version = "0.9.5"
name = "data-encoding"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"retain_mut",
"tokio",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1"
dependencies = [
"tokio",
]
checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "der"
@@ -482,6 +406,15 @@ dependencies = [
"serde",
]
[[package]]
name = "encoding_rs"
version = "0.8.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1"
dependencies = [
"cfg-if",
]
[[package]]
name = "equivalent"
version = "1.0.1"
@@ -532,16 +465,6 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764"
[[package]]
name = "flate2"
version = "1.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "flume"
version = "0.10.14"
@@ -584,21 +507,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.28"
@@ -672,7 +580,6 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@@ -711,6 +618,25 @@ version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
[[package]]
name = "h2"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap 1.9.3",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@@ -806,15 +732,6 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "http-endpoint"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5224352a86c0e121f1bf26d1c2f87a344d1978ba4bb94798c831c89f0d427a26"
dependencies = [
"http",
]
[[package]]
name = "httparse"
version = "1.8.0"
@@ -843,6 +760,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"httparse",
@@ -922,6 +840,12 @@ dependencies = [
"hashbrown 0.14.0",
]
[[package]]
name = "ipnet"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6"
[[package]]
name = "itertools"
version = "0.10.5"
@@ -1122,17 +1046,6 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "num-bigint"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-bigint-dig"
version = "0.8.4"
@@ -1150,18 +1063,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "num-decimal"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8783636b20810a87540f59d19858498a987d7fcdc6555e62f2c99d6ca8a84b61"
dependencies = [
"num-bigint",
"num-rational",
"num-traits",
"serde",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@@ -1183,18 +1084,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0"
dependencies = [
"autocfg",
"num-bigint",
"num-integer",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.16"
@@ -1459,10 +1348,41 @@ dependencies = [
]
[[package]]
name = "retain_mut"
version = "0.1.9"
name = "reqwest"
version = "0.11.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0"
checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1"
dependencies = [
"base64",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "rsa"
@@ -1618,15 +1538,6 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_variant"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47a8ec0b2fd0506290348d9699c0e3eb2e3e8c0498b5a9a6158b3bd4d6970076"
dependencies = [
"serde",
]
[[package]]
name = "serde_yaml"
version = "0.8.26"
@@ -1843,7 +1754,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ca69bf415b93b60b80dc8fda3cb4ef52b2336614d8da2de5456cc942a110482"
dependencies = [
"atoi",
"base64 0.21.3",
"base64",
"bitflags 2.4.0",
"byteorder",
"bytes",
@@ -1887,7 +1798,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0db2df1b8731c3651e204629dd55e52adbae0462fa1bdcbed56a2302c18181e"
dependencies = [
"atoi",
"base64 0.21.3",
"base64",
"bitflags 2.4.0",
"byteorder",
"crc",
@@ -2138,9 +2049,9 @@ dependencies = [
[[package]]
name = "tokio-tungstenite"
version = "0.18.0"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd"
checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2"
dependencies = [
"futures-util",
"log",
@@ -2150,6 +2061,20 @@ dependencies = [
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
@@ -2211,16 +2136,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "try-lock"
version = "0.2.4"
@@ -2229,13 +2144,13 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
name = "tungstenite"
version = "0.18.0"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788"
checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"log",
@@ -2326,9 +2241,6 @@ name = "uuid"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
dependencies = [
"serde",
]
[[package]]
name = "vcpkg"
@@ -2388,6 +2300,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.87"
@@ -2418,15 +2342,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
[[package]]
name = "websocket-util"
version = "0.11.2"
name = "web-sys"
version = "0.3.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ad8d6da9976a197513a4bf34c12b21095cba617dce356cd9e9616ccc5afe0d9"
checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b"
dependencies = [
"futures",
"tokio",
"tokio-tungstenite",
"tracing",
"js-sys",
"wasm-bindgen",
]
[[package]]
@@ -2532,6 +2454,16 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "winreg"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if",
"windows-sys",
]
[[package]]
name = "yaml-rust"
version = "0.4.5"

View File

@@ -12,7 +12,6 @@ codegen-units = 1
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
apca = "0.27.2"
axum = "0.6.20"
dotenv = "0.15.0"
sqlx = { version = "0.7.1", features = [
@@ -25,9 +24,6 @@ tokio = { version = "1.32.0", features = [
"macros",
"rt-multi-thread",
] }
deadpool = { version = "0.9.5", features = [
"rt_tokio_1",
] }
serde = "1.0.188"
log = "0.4.20"
serde_json = "1.0.105"
@@ -35,7 +31,7 @@ log4rs = "1.2.0"
time = { version = "0.3.27", features = [
"serde",
] }
futures = "0.3.28"
websocket-util = "0.11.2"
futures-util = "0.3.28"
async-trait = "0.1.73"
reqwest = { version = "0.11.20", features = ["json", "serde_json"] }
tokio-tungstenite = { version = "0.20.0", features = ["tokio-native-tls", "native-tls"] }
http = "0.2.9"

30
backend/src/config.rs Normal file
View File

@@ -0,0 +1,30 @@
use reqwest::Client;
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::{env, sync::Arc};
pub struct AppConfig {
pub alpaca_api_key: String,
pub alpaca_api_secret: String,
pub postgres_pool: PgPool,
pub reqwest_client: Client,
}
const NUM_CLIENTS: usize = 10;
impl AppConfig {
pub async fn from_env() -> Result<Self, Box<dyn std::error::Error>> {
Ok(AppConfig {
alpaca_api_key: env::var("APCA_API_KEY_ID").unwrap(),
alpaca_api_secret: env::var("APCA_API_SECRET_KEY").unwrap(),
postgres_pool: PgPoolOptions::new()
.max_connections(NUM_CLIENTS as u32)
.connect(&env::var("DATABASE_URL")?)
.await?,
reqwest_client: Client::new(),
})
}
pub async fn arc_from_env() -> Result<Arc<Self>, Box<dyn std::error::Error>> {
Ok(Arc::new(AppConfig::from_env().await?))
}
}

168
backend/src/data/live.rs Normal file
View File

@@ -0,0 +1,168 @@
use crate::{
config::AppConfig,
database::{assets::get_assets_with_class, bars::add_bar},
types::{
websocket::{
AuthMessage, IncomingMessage, OutgoingMessage, SubscribeMessage, SuccessMessage,
SuccessMessageType,
},
AssetBroadcastMessage, Bar, Class,
},
};
use core::panic;
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use log::{debug, error, info, warn};
use serde_json::{from_str, to_string};
use std::{error::Error, sync::Arc, time::Duration};
use tokio::{
net::TcpStream,
spawn,
sync::{broadcast::Receiver, RwLock},
time::timeout,
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
const ALPACA_STOCK_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v2/iex";
const ALPACA_CRYPTO_WEBSOCKET_URL: &str = "wss://stream.data.alpaca.markets/v1beta3/crypto/us";
const TIMEOUT_DURATION: Duration = Duration::from_millis(100);
pub async fn run_data_live(
class: Class,
app_config: Arc<AppConfig>,
asset_broadcast_receiver: Receiver<AssetBroadcastMessage>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let websocket_url = match class {
Class::UsEquity => ALPACA_STOCK_WEBSOCKET_URL,
Class::Crypto => ALPACA_CRYPTO_WEBSOCKET_URL,
};
let (stream, _) = connect_async(websocket_url).await?;
let (mut sink, mut stream) = stream.split();
match stream.next().await {
Some(Ok(Message::Text(data)))
if from_str::<Vec<IncomingMessage>>(&data)?.get(0)
== Some(&IncomingMessage::Success(SuccessMessage {
msg: SuccessMessageType::Connected,
})) => {}
_ => panic!(),
}
sink.send(Message::Text(to_string(&OutgoingMessage::Auth(
AuthMessage::new(
app_config.alpaca_api_key.clone(),
app_config.alpaca_api_secret.clone(),
),
))?))
.await?;
match stream.next().await {
Some(Ok(Message::Text(data)))
if from_str::<Vec<IncomingMessage>>(&data)?.get(0)
== Some(&IncomingMessage::Success(SuccessMessage {
msg: SuccessMessageType::Authenticated,
})) => {}
_ => panic!(),
}
let symbols = get_assets_with_class(&app_config.postgres_pool, class.clone())
.await?
.into_iter()
.map(|asset| asset.symbol)
.collect::<Vec<String>>();
if !symbols.is_empty() {
sink.send(Message::Text(to_string(&OutgoingMessage::Subscribe(
SubscribeMessage::from_vec(symbols),
))?))
.await?;
}
let sink = Arc::new(RwLock::new(sink));
let stream = Arc::new(RwLock::new(stream));
info!("Running live data thread for {:?}.", class);
spawn(broadcast_handler(
class,
asset_broadcast_receiver,
sink.clone(),
));
websocket_handler(app_config, sink, stream).await?;
unreachable!()
}
pub async fn websocket_handler(
app_config: Arc<AppConfig>,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
stream: Arc<RwLock<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
loop {
let mut stream = stream.write().await;
match timeout(TIMEOUT_DURATION, stream.next()).await {
Ok(Some(Ok(Message::Text(data)))) => match from_str::<Vec<IncomingMessage>>(&data) {
Ok(parsed_data) => {
for message in parsed_data {
match message {
IncomingMessage::Subscription(subscription_message) => {
info!("Current subscriptions: {:?}", subscription_message.bars);
}
IncomingMessage::Bars(bar_message)
| IncomingMessage::UpdatedBars(bar_message) => {
debug!("Incoming bar: {:?}", bar_message);
add_bar(&app_config.postgres_pool, Bar::from(bar_message)).await?;
}
message => {
warn!("Unhandled incoming message: {:?}", message);
}
}
}
}
Err(e) => {
warn!("Unparsed incoming message: {:?}: {}", data, e);
}
},
Ok(Some(Ok(Message::Ping(_)))) => {
sink.write().await.send(Message::Pong(vec![])).await?
}
Ok(unknown) => {
error!("Unknown incoming message: {:?}", unknown);
}
Err(_) => {}
}
}
}
pub async fn broadcast_handler(
class: Class,
mut asset_broadcast_receiver: Receiver<AssetBroadcastMessage>,
sink: Arc<RwLock<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
loop {
match asset_broadcast_receiver.recv().await? {
AssetBroadcastMessage::Added(asset) if asset.class == class => {
sink.write()
.await
.send(Message::Text(serde_json::to_string(
&OutgoingMessage::Subscribe(SubscribeMessage::new(asset.symbol)),
)?))
.await?;
}
AssetBroadcastMessage::Deleted(asset) if asset.class == class => {
sink.write()
.await
.send(Message::Text(serde_json::to_string(
&OutgoingMessage::Unsubscribe(SubscribeMessage::new(asset.symbol)),
)?))
.await?;
}
_ => {}
}
}
}

View File

@@ -1,57 +0,0 @@
use super::{AssetMPSC, StockStreamSubscription};
use crate::{
database::assets::get_assets_crypto,
pool::{alpaca::create_alpaca_client_from_env, postgres::PostgresPool},
};
use apca::data::v2::stream::{
drive, Bar, CustomUrl, MarketData, Quote, RealtimeData, SymbolList, Symbols, Trade, IEX,
};
use futures_util::FutureExt;
use std::{error::Error, sync::Arc};
use tokio::sync::{mpsc, Mutex};
#[derive(Default)]
pub struct CryptoUrl;
impl ToString for CryptoUrl {
fn to_string(&self) -> String {
"wss://stream.data.alpaca.markets/v1beta3/crypto/us".into()
}
}
pub type Crypto = CustomUrl<CryptoUrl>;
pub async fn init_stream_subscription_mpsc(
postgres_pool: &PostgresPool,
) -> Result<(Arc<Mutex<StockStreamSubscription<IEX>>>, AssetMPSC), Box<dyn Error + Send + Sync>> {
let client = create_alpaca_client_from_env().await?;
let (mut stream, mut subscription) = client
.subscribe::<RealtimeData<Crypto, Bar, Quote, Trade>>()
.await?;
let symbols = get_assets_crypto(postgres_pool)
.await?
.iter()
.map(|asset| asset.symbol.clone())
.collect::<Vec<String>>();
if !symbols.is_empty() {
let data = MarketData {
bars: Symbols::List(SymbolList::from(symbols)),
..Default::default()
};
drive(subscription.subscribe(&data).boxed(), &mut stream)
.await
.unwrap()
.unwrap()
.unwrap();
}
let stream_subscription_mutex = Arc::new(Mutex::new((stream, subscription)));
let (sender, receiver) = mpsc::channel(50);
let asset_mpcs = AssetMPSC { sender, receiver };
Ok((stream_subscription_mutex, asset_mpcs))
}

View File

@@ -1,133 +0,0 @@
pub mod crypto;
pub mod stocks;
use crate::{database::bars::add_bar, pool::postgres::PostgresPool, types::Asset};
use apca::{
data::v2::stream::{drive, Data, MarketData, RealtimeData, Source, SymbolList, Symbols},
Subscribable,
};
use futures_util::{FutureExt, StreamExt};
use log::{debug, error, info, warn};
use std::{any::type_name, error::Error, sync::Arc, time::Duration};
use time::OffsetDateTime;
use tokio::{
spawn,
sync::{
mpsc::{Receiver, Sender},
Mutex,
},
time::timeout,
};
pub enum AssetMPSCMessage {
Added(Asset),
Removed(Asset),
}
pub struct AssetMPSC {
pub sender: Sender<AssetMPSCMessage>,
pub receiver: Receiver<AssetMPSCMessage>,
}
pub type StockStreamSubscription<S> = (
<RealtimeData<S> as Subscribable>::Stream,
<RealtimeData<S> as Subscribable>::Subscription,
);
pub const TIMEOUT_DURATION: Duration = Duration::from_millis(100);
pub async fn run_data_live<S>(
postgres_pool: PostgresPool,
stream_subscription_mutex: Arc<Mutex<StockStreamSubscription<S>>>,
asset_mpsc_receiver: Receiver<AssetMPSCMessage>,
) -> Result<(), Box<dyn Error + Send + Sync>>
where
S: Source + 'static,
{
info!("Running live data thread for {}.", type_name::<S>());
spawn(mpsc_handler::<S>(
stream_subscription_mutex.clone(),
asset_mpsc_receiver,
));
loop {
let (stream, _) = &mut *stream_subscription_mutex.lock().await;
match timeout(TIMEOUT_DURATION, stream.next()).await {
Ok(Some(Ok(Ok(Data::Bar(bar))))) => {
let bar = add_bar(
&postgres_pool,
crate::types::Bar {
timestamp: match OffsetDateTime::from_unix_timestamp(
bar.timestamp.timestamp(),
) {
Ok(timestamp) => timestamp,
Err(_) => {
warn!(
"Failed to parse timestamp for {}: {}.",
bar.symbol, bar.timestamp
);
continue;
}
},
asset_symbol: bar.symbol,
open: bar.open_price.to_f64().unwrap_or_default(),
high: bar.high_price.to_f64().unwrap_or_default(),
low: bar.low_price.to_f64().unwrap_or_default(),
close: bar.close_price.to_f64().unwrap_or_default(),
volume: bar.volume.to_f64().unwrap_or_default(),
},
)
.await?;
debug!(
"Saved timestamp for {}: {}.",
bar.asset_symbol, bar.timestamp
);
}
Ok(Some(Ok(Ok(_)))) | Ok(Some(Ok(Err(_)))) | Err(_) => continue,
_ => panic!(),
}
}
}
pub async fn mpsc_handler<S: Source>(
stream_subscription_mutex: Arc<Mutex<StockStreamSubscription<S>>>,
mut asset_mpsc_receiver: Receiver<AssetMPSCMessage>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
while let Some(message) = asset_mpsc_receiver.recv().await {
let (stream, subscription) = &mut *stream_subscription_mutex.lock().await;
match message {
AssetMPSCMessage::Added(asset) => {
let data = MarketData {
bars: Symbols::List(SymbolList::from(vec![asset.symbol.clone()])),
..Default::default()
};
match drive(subscription.subscribe(&data).boxed(), stream).await {
Ok(_) => info!("Successfully subscribed to {}", asset.symbol),
Err(e) => {
error!("Failed to subscribe to {}: {:?}", asset.symbol, e);
continue;
}
}
}
AssetMPSCMessage::Removed(asset) => {
let data = MarketData {
bars: Symbols::List(SymbolList::from(vec![asset.symbol.clone()])),
..Default::default()
};
match drive(subscription.unsubscribe(&data).boxed(), stream).await {
Ok(_) => info!("Successfully unsubscribed from {}", asset.symbol),
Err(e) => {
error!("Failed to unsubscribe from {}: {:?}", asset.symbol, e);
continue;
}
}
}
}
}
Ok(())
}

View File

@@ -1,46 +0,0 @@
use super::{AssetMPSC, StockStreamSubscription};
use crate::{
database::assets::get_assets_stocks,
pool::{alpaca::create_alpaca_client_from_env, postgres::PostgresPool},
};
use apca::data::v2::stream::{
drive, Bar, MarketData, Quote, RealtimeData, SymbolList, Symbols, Trade, IEX,
};
use futures_util::FutureExt;
use std::{error::Error, sync::Arc};
use tokio::sync::{mpsc, Mutex};
pub async fn init_stream_subscription_mpsc(
postgres_pool: &PostgresPool,
) -> Result<(Arc<Mutex<StockStreamSubscription<IEX>>>, AssetMPSC), Box<dyn Error + Send + Sync>> {
let client = create_alpaca_client_from_env().await?;
let (mut stream, mut subscription) = client
.subscribe::<RealtimeData<IEX, Bar, Quote, Trade>>()
.await?;
let symbols = get_assets_stocks(postgres_pool)
.await?
.iter()
.map(|asset| asset.symbol.clone())
.collect::<Vec<String>>();
if !symbols.is_empty() {
let data = MarketData {
bars: Symbols::List(SymbolList::from(symbols)),
..Default::default()
};
drive(subscription.subscribe(&data).boxed(), &mut stream)
.await
.unwrap()
.unwrap()
.unwrap();
}
let stream_subscription_mutex = Arc::new(Mutex::new((stream, subscription)));
let (sender, receiver) = mpsc::channel(50);
let asset_mpcs = AssetMPSC { sender, receiver };
Ok((stream_subscription_mutex, asset_mpcs))
}

View File

@@ -1,12 +1,9 @@
use crate::{
pool::postgres::PostgresPool,
types::{Asset, Class, Exchange},
};
use sqlx::query_as;
use crate::types::{Asset, Class, Exchange};
use sqlx::{query_as, PgPool};
use std::error::Error;
pub async fn get_assets(
postgres_pool: &PostgresPool,
postgres_pool: &PgPool,
) -> Result<Vec<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(
Asset,
@@ -17,24 +14,13 @@ pub async fn get_assets(
.map_err(|e| e.into())
}
pub async fn get_assets_stocks(
postgres_pool: &PostgresPool,
pub async fn get_assets_with_class(
postgres_pool: &PgPool,
class: Class,
) -> Result<Vec<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = 'us_equity'"#
)
.fetch_all(postgres_pool)
.await
.map_err(|e| e.into())
}
pub async fn get_assets_crypto(
postgres_pool: &PostgresPool,
) -> Result<Vec<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(
Asset,
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = 'crypto'"#
r#"SELECT symbol, class as "class: Class", exchange as "exchange: Exchange", trading, date_added FROM assets WHERE class = $1::CLASS"#, class as Class
)
.fetch_all(postgres_pool)
.await
@@ -42,7 +28,7 @@ pub async fn get_assets_crypto(
}
pub async fn get_asset(
postgres_pool: &PostgresPool,
postgres_pool: &PgPool,
symbol: &str,
) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> {
query_as!(
@@ -55,7 +41,7 @@ pub async fn get_asset(
}
pub async fn add_asset(
postgres_pool: &PostgresPool,
postgres_pool: &PgPool,
asset: Asset,
) -> Result<Asset, Box<dyn Error + Send + Sync>> {
query_as!(
@@ -70,7 +56,7 @@ pub async fn add_asset(
}
pub async fn update_asset_trading(
postgres_pool: &PostgresPool,
postgres_pool: &PgPool,
symbol: &str,
trading: bool,
) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> {
@@ -86,7 +72,7 @@ pub async fn update_asset_trading(
}
pub async fn delete_asset(
postgres_pool: &PostgresPool,
postgres_pool: &PgPool,
symbol: &str,
) -> Result<Option<Asset>, Box<dyn Error + Send + Sync>> {
Ok(query_as!(

View File

@@ -8,9 +8,10 @@ pub async fn add_bar(
) -> Result<Bar, Box<dyn Error + Send + Sync>> {
query_as!(
Bar,
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING timestamp, asset_symbol, open, high, low, close, volume"#,
bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (timestamp, asset_symbol) DO UPDATE SET open = $3, high = $4, low = $5, close = $6, volume = $7, num_trades = $8, volume_weighted = $9
RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#,
bar.timestamp, bar.asset_symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, bar.num_trades, bar.volume_weighted
)
.fetch_one(postgres_pool)
.await
@@ -29,6 +30,8 @@ pub async fn add_bars(
let mut lows = Vec::with_capacity(bars.len());
let mut closes = Vec::with_capacity(bars.len());
let mut volumes = Vec::with_capacity(bars.len());
let mut num_trades = Vec::with_capacity(bars.len());
let mut volumes_weighted = Vec::with_capacity(bars.len());
for bar in bars {
timestamps.push(bar.timestamp);
@@ -38,14 +41,16 @@ pub async fn add_bars(
lows.push(bar.low);
closes.push(bar.close);
volumes.push(bar.volume);
num_trades.push(bar.num_trades);
volumes_weighted.push(bar.volume_weighted);
}
query_as!(
Bar,
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume)
SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[])
RETURNING timestamp, asset_symbol, open, high, low, close, volume"#,
&timestamps, &asset_symbols, &opens, &highs, &lows, &closes, &volumes
r#"INSERT INTO bars (timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted)
SELECT * FROM UNNEST($1::timestamptz[], $2::text[], $3::float8[], $4::float8[], $5::float8[], $6::float8[], $7::float8[], $8::int8[], $9::float8[])
RETURNING timestamp, asset_symbol, open, high, low, close, volume, num_trades, volume_weighted"#,
&timestamps, &asset_symbols, &opens, &highs, &lows, &closes, &volumes, &num_trades, &volumes_weighted
)
.fetch_all(postgres_pool)
.await

View File

@@ -1,61 +1,43 @@
mod config;
mod data;
mod database;
mod pool;
mod routes;
mod types;
use apca::data::v2::stream::IEX;
use data::live::{
crypto::{self, Crypto},
run_data_live, stocks,
};
use config::AppConfig;
use data::live::run_data_live;
use dotenv::dotenv;
use pool::{alpaca::create_alpaca_pool_from_env, postgres::create_postgres_pool_from_env};
use routes::run_api;
use std::{error::Error, sync::Arc};
use tokio::spawn;
const NUM_CLIENTS: usize = 10;
use std::error::Error;
use tokio::{spawn, sync::broadcast};
use types::{AssetBroadcastMessage, Class};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
dotenv().ok();
log4rs::init_file("log4rs.yaml", Default::default()).unwrap();
let app_config = AppConfig::arc_from_env().await.unwrap();
let mut threads = Vec::new();
let postgres_pool = create_postgres_pool_from_env(NUM_CLIENTS).await?;
let alpaca_pool = create_alpaca_pool_from_env(NUM_CLIENTS).await?;
let (asset_broadcast_sender, _) = broadcast::channel::<AssetBroadcastMessage>(100);
// Stock Live Data
let (stock_live_stream_subscription_mutex, stock_live_mpsc) =
stocks::init_stream_subscription_mpsc(&postgres_pool).await?;
let stock_live_mpsc_sender_arc = Arc::new(stock_live_mpsc.sender);
threads.push(spawn(run_data_live::<IEX>(
postgres_pool.clone(),
stock_live_stream_subscription_mutex.clone(),
stock_live_mpsc.receiver,
threads.push(spawn(run_data_live(
Class::UsEquity,
app_config.clone(),
asset_broadcast_sender.subscribe(),
)));
// Crypto Live Data
let (crypto_stream_subscription_mutex, crypto_live_mpsc) =
crypto::init_stream_subscription_mpsc(&postgres_pool).await?;
let crypto_live_mpsc_sender_arc = Arc::new(crypto_live_mpsc.sender);
threads.push(spawn(run_data_live::<Crypto>(
postgres_pool.clone(),
crypto_stream_subscription_mutex.clone(),
crypto_live_mpsc.receiver,
threads.push(spawn(run_data_live(
Class::Crypto,
app_config.clone(),
asset_broadcast_sender.subscribe(),
)));
// REST API
threads.push(spawn(run_api(
postgres_pool.clone(),
alpaca_pool.clone(),
stock_live_mpsc_sender_arc.clone(),
crypto_live_mpsc_sender_arc.clone(),
)));
threads.push(spawn(run_api(app_config.clone(), asset_broadcast_sender)));
for thread in threads {
let _ = thread.await?;

View File

@@ -1,98 +0,0 @@
use apca::{ApiInfo, Client};
use async_trait::async_trait;
use deadpool::managed::{BuildError, Manager, Pool, RecycleResult};
use std::{env, error::Error};
pub struct AlpacaManager {
apca_api_base_url: String,
apca_api_key_id: String,
apca_api_secret_key: String,
}
impl AlpacaManager {
pub fn new(
apca_api_base_url: String,
apca_api_key_id: String,
apca_api_secret_key: String,
) -> Self {
Self {
apca_api_base_url,
apca_api_key_id,
apca_api_secret_key,
}
}
}
pub type AlpacaPool = Pool<AlpacaManager>;
#[async_trait]
impl Manager for AlpacaManager {
type Type = Client;
type Error = Box<dyn Error + Send + Sync>;
async fn create(&self) -> Result<Self::Type, Self::Error> {
let client = Client::new(ApiInfo::from_parts(
&self.apca_api_base_url,
&self.apca_api_key_id,
&self.apca_api_secret_key,
)?);
Ok(client)
}
async fn recycle(&self, _: &mut Self::Type) -> RecycleResult<Self::Error> {
Ok(())
}
}
pub async fn create_alpaca_client(
apca_api_base_url: &str,
apca_api_key_id: &str,
apca_api_secret_key: &str,
) -> Result<Client, Box<dyn Error + Send + Sync>> {
Ok(Client::new(ApiInfo::from_parts(
apca_api_base_url,
apca_api_key_id,
apca_api_secret_key,
)?))
}
pub async fn create_alpaca_client_from_env() -> Result<Client, Box<dyn Error + Send + Sync>> {
create_alpaca_client(
&env::var("APCA_API_BASE_URL")?,
&env::var("APCA_API_KEY_ID")?,
&env::var("APCA_API_SECRET_KEY")?,
)
.await
}
pub async fn create_alpaca_pool(
apca_api_base_url: &str,
apca_api_key_id: &str,
apca_api_secret_key: &str,
num_clients: usize,
) -> Result<AlpacaPool, Box<dyn Error + Send + Sync>> {
let manager = AlpacaManager::new(
apca_api_base_url.to_owned(),
apca_api_key_id.to_owned(),
apca_api_secret_key.to_owned(),
);
Pool::builder(manager)
.max_size(num_clients)
.build()
.map_err(|e| match e {
BuildError::Backend(e) => e,
BuildError::NoRuntimeSpecified(_) => unreachable!(),
})
}
pub async fn create_alpaca_pool_from_env(
num_clients: usize,
) -> Result<AlpacaPool, Box<dyn Error + Send + Sync>> {
create_alpaca_pool(
&env::var("APCA_API_BASE_URL")?,
&env::var("APCA_API_KEY_ID")?,
&env::var("APCA_API_SECRET_KEY")?,
num_clients,
)
.await
}

View File

@@ -1,2 +0,0 @@
pub mod alpaca;
pub mod postgres;

View File

@@ -1,21 +0,0 @@
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::{env, error::Error};
pub type PostgresPool = PgPool;
pub async fn create_postgres_pool(
database_url: &str,
num_clients: usize,
) -> Result<PostgresPool, Box<dyn Error + Send + Sync>> {
PgPoolOptions::new()
.max_connections(num_clients as u32)
.connect(database_url)
.await
.map_err(|e| e.into())
}
pub async fn create_postgres_pool_from_env(
num_clients: usize,
) -> Result<PostgresPool, Box<dyn Error + Send + Sync>> {
create_postgres_pool(&env::var("DATABASE_URL")?, num_clients).await
}

View File

@@ -1,22 +1,22 @@
use crate::data::live::AssetMPSCMessage;
use std::sync::Arc;
use crate::config::AppConfig;
use crate::database;
use crate::database::assets::update_asset_trading;
use crate::pool::alpaca::AlpacaPool;
use crate::pool::postgres::PostgresPool;
use crate::types::{Asset, Class, Exchange};
use apca::api::v2::asset::{self, Symbol};
use apca::RequestError;
use crate::types::api;
use crate::types::{Asset, AssetBroadcastMessage, Status};
use axum::{extract::Path, http::StatusCode, Extension, Json};
use http::Method;
use log::info;
use serde::Deserialize;
use sqlx::types::time::OffsetDateTime;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::broadcast::Sender;
const ALPACA_API_URL: &str = "https://api.alpaca.markets/v2";
pub async fn get_assets(
Extension(postgres_pool): Extension<PostgresPool>,
Extension(app_config): Extension<Arc<AppConfig>>,
) -> Result<(StatusCode, Json<Vec<Asset>>), StatusCode> {
let assets = database::assets::get_assets(&postgres_pool)
let assets = database::assets::get_assets(&app_config.postgres_pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -24,10 +24,10 @@ pub async fn get_assets(
}
pub async fn get_asset(
Extension(postgres_pool): Extension<PostgresPool>,
Extension(app_config): Extension<Arc<AppConfig>>,
Path(symbol): Path<String>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> {
let asset = database::assets::get_asset(&postgres_pool, &symbol)
let asset = database::assets::get_asset(&app_config.postgres_pool, &symbol)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -37,7 +37,6 @@ pub async fn get_asset(
}
}
#[allow(dead_code)]
#[derive(Deserialize)]
pub struct AddAssetRequest {
symbol: String,
@@ -45,13 +44,11 @@ pub struct AddAssetRequest {
}
pub async fn add_asset(
Extension(postgres_pool): Extension<PostgresPool>,
Extension(alpaca_pool): Extension<AlpacaPool>,
Extension(stock_live_mpsc_sender): Extension<Arc<Sender<AssetMPSCMessage>>>,
Extension(crypto_live_mpsc_sender): Extension<Arc<Sender<AssetMPSCMessage>>>,
Extension(app_config): Extension<Arc<AppConfig>>,
Extension(asset_broadcast_sender): Extension<Sender<AssetBroadcastMessage>>,
Json(request): Json<AddAssetRequest>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> {
if database::assets::get_asset(&postgres_pool, &request.symbol)
if database::assets::get_asset(&app_config.postgres_pool, &request.symbol)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.is_some()
@@ -59,44 +56,40 @@ pub async fn add_asset(
return Err(StatusCode::CONFLICT);
}
let asset = alpaca_pool
.get()
let asset = app_config
.reqwest_client
.request(
Method::GET,
&format!("{}/assets/{}", ALPACA_API_URL, request.symbol),
)
.header("APCA-API-KEY-ID", &app_config.alpaca_api_key)
.header("APCA-API-SECRET-KEY", &app_config.alpaca_api_secret)
.send()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.issue::<asset::Get>(&Symbol::Sym(request.symbol))
.await
.map_err(|e| match e {
RequestError::Endpoint(_) => StatusCode::NOT_FOUND,
.map_err(|e| match e.status() {
Some(StatusCode::NOT_FOUND) => StatusCode::NOT_FOUND,
Some(StatusCode::FORBIDDEN) => panic!(),
_ => StatusCode::INTERNAL_SERVER_ERROR,
})?;
let asset = Asset {
symbol: asset.symbol,
class: Class::from(asset.class) as Class,
exchange: Exchange::from(asset.exchange) as Exchange,
trading: request.trading.unwrap_or(false),
date_added: OffsetDateTime::now_utc(),
};
let asset = asset.json::<api::Asset>().await.unwrap();
let asset = database::assets::add_asset(&postgres_pool, asset)
if asset.status != Status::Active || !asset.tradable || !asset.fractionable {
return Err(StatusCode::FORBIDDEN);
}
let mut asset = Asset::from(asset);
if let Some(trading) = request.trading {
asset.trading = trading;
}
let asset = database::assets::add_asset(&app_config.postgres_pool, asset)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match asset.class {
Class(asset::Class::UsEquity) => {
stock_live_mpsc_sender
.send(AssetMPSCMessage::Added(asset.clone()))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
Class(asset::Class::Crypto) => {
crypto_live_mpsc_sender
.send(AssetMPSCMessage::Added(asset.clone()))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
_ => {}
}
asset_broadcast_sender
.send(AssetBroadcastMessage::Added(asset.clone()))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
info!("Added asset {}.", asset.symbol);
Ok((StatusCode::CREATED, Json(asset)))
@@ -109,16 +102,21 @@ pub struct UpdateAssetRequest {
}
pub async fn update_asset(
Extension(postgres_pool): Extension<PostgresPool>,
Extension(app_config): Extension<Arc<AppConfig>>,
Extension(asset_broadcast_sender): Extension<Sender<AssetBroadcastMessage>>,
Path(symbol): Path<String>,
Json(request): Json<UpdateAssetRequest>,
) -> Result<(StatusCode, Json<Asset>), StatusCode> {
let asset = update_asset_trading(&postgres_pool, &symbol, request.trading)
let asset = update_asset_trading(&app_config.postgres_pool, &symbol, request.trading)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match asset {
Some(asset) => {
asset_broadcast_sender
.send(AssetBroadcastMessage::Updated(asset.clone()))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
info!("Updated asset {}.", symbol);
Ok((StatusCode::OK, Json(asset)))
}
@@ -127,32 +125,19 @@ pub async fn update_asset(
}
pub async fn delete_asset(
Extension(postgres_pool): Extension<PostgresPool>,
Extension(stock_live_mpsc_sender): Extension<Arc<Sender<AssetMPSCMessage>>>,
Extension(crypto_live_mpsc_sender): Extension<Arc<Sender<AssetMPSCMessage>>>,
Extension(app_config): Extension<Arc<AppConfig>>,
Extension(asset_broadcast_sender): Extension<Sender<AssetBroadcastMessage>>,
Path(symbol): Path<String>,
) -> Result<StatusCode, StatusCode> {
let asset = database::assets::delete_asset(&postgres_pool, &symbol)
let asset = database::assets::delete_asset(&app_config.postgres_pool, &symbol)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match asset {
Some(asset) => {
match asset.class {
Class(asset::Class::UsEquity) => {
stock_live_mpsc_sender
.send(AssetMPSCMessage::Removed(asset.clone()))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
Class(asset::Class::Crypto) => {
crypto_live_mpsc_sender
.send(AssetMPSCMessage::Removed(asset.clone()))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
_ => {}
}
asset_broadcast_sender
.send(AssetBroadcastMessage::Deleted(asset.clone()))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
info!("Deleted asset {}.", symbol);
Ok(StatusCode::NO_CONTENT)

View File

@@ -1,22 +1,17 @@
use crate::{
data::live::AssetMPSCMessage,
pool::{alpaca::AlpacaPool, postgres::PostgresPool},
};
use crate::{config::AppConfig, types::AssetBroadcastMessage};
use axum::{
routing::{delete, get, post},
Extension, Router, Server,
};
use log::info;
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::mpsc::Sender;
use tokio::sync::broadcast::Sender;
pub mod assets;
pub async fn run_api(
postgres_pool: PostgresPool,
alpaca_pool: AlpacaPool,
stock_live_mpsc_sender: Arc<Sender<AssetMPSCMessage>>,
crypto_live_mpsc_sender: Arc<Sender<AssetMPSCMessage>>,
app_config: Arc<AppConfig>,
asset_broadcast_sender: Sender<AssetBroadcastMessage>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let app = Router::new()
.route("/assets", get(assets::get_assets))
@@ -24,10 +19,8 @@ pub async fn run_api(
.route("/assets", post(assets::add_asset))
.route("/assets/:symbol", post(assets::update_asset))
.route("/assets/:symbol", delete(assets::delete_asset))
.layer(Extension(postgres_pool))
.layer(Extension(alpaca_pool))
.layer(Extension(stock_live_mpsc_sender))
.layer(Extension(crypto_live_mpsc_sender));
.layer(Extension(app_config))
.layer(Extension(asset_broadcast_sender));
let addr = SocketAddr::from(([0, 0, 0, 0], 7878));
info!("Listening on {}...", addr);

View File

@@ -1,88 +0,0 @@
use serde::{Deserialize, Serialize};
use sqlx::{error::BoxDynError, Decode, Encode, FromRow, Postgres, Type};
use std::ops::Deref;
use time::OffsetDateTime;
macro_rules! impl_apca_sqlx_traits {
($outer_type:ident, $inner_type:path, $fallback:expr) => {
#[derive(Clone, Debug, Copy, PartialEq, Serialize, Deserialize)]
pub struct $outer_type(pub $inner_type);
impl Deref for $outer_type {
type Target = $inner_type;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<$inner_type> for $outer_type {
fn from(inner: $inner_type) -> Self {
$outer_type(inner)
}
}
impl From<String> for $outer_type {
fn from(s: String) -> Self {
s.parse().unwrap_or($fallback).into()
}
}
impl Decode<'_, Postgres> for $outer_type {
fn decode(
value: <Postgres as sqlx::database::HasValueRef<'_>>::ValueRef,
) -> Result<Self, BoxDynError> {
Ok($outer_type::from(<String as Decode<Postgres>>::decode(
value,
)?))
}
}
impl Encode<'_, Postgres> for $outer_type {
fn encode_by_ref(
&self,
buf: &mut <Postgres as sqlx::database::HasArguments<'_>>::ArgumentBuffer,
) -> sqlx::encode::IsNull {
<String as Encode<Postgres>>::encode_by_ref(&self.0.as_ref().into(), buf)
}
}
impl Type<Postgres> for $outer_type {
fn type_info() -> <Postgres as sqlx::Database>::TypeInfo {
<String as Type<Postgres>>::type_info()
}
}
};
}
impl_apca_sqlx_traits!(
Class,
apca::api::v2::asset::Class,
apca::api::v2::asset::Class::Unknown
);
impl_apca_sqlx_traits!(
Exchange,
apca::api::v2::asset::Exchange,
apca::api::v2::asset::Exchange::Unknown
);
#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Asset {
pub symbol: String,
pub class: Class,
pub exchange: Exchange,
pub trading: bool,
pub date_added: OffsetDateTime,
}
#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Bar {
pub timestamp: OffsetDateTime,
pub asset_symbol: String,
pub open: f64,
pub high: f64,
pub low: f64,
pub close: f64,
pub volume: f64,
}

View File

@@ -0,0 +1,19 @@
use crate::types::{Class, Exchange, Status};
use serde::Deserialize;
#[derive(Deserialize)]
pub struct Asset {
pub id: String,
pub class: Class,
pub exchange: Exchange,
pub symbol: String,
pub name: String,
pub status: Status,
pub tradable: bool,
pub marginable: bool,
pub shortable: bool,
pub easy_to_borrow: bool,
pub fractionable: bool,
pub maintenance_margin_requirement: Option<f32>,
pub attributes: Option<Vec<String>>,
}

View File

@@ -0,0 +1,3 @@
pub mod asset;
pub use asset::*;

View File

@@ -0,0 +1,33 @@
use super::{api, class::Class, exchange::Exchange};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use time::OffsetDateTime;
#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Asset {
pub symbol: String,
pub class: Class,
pub exchange: Exchange,
pub trading: bool,
pub date_added: OffsetDateTime,
}
impl From<api::Asset> for Asset {
fn from(asset: api::Asset) -> Self {
Self {
symbol: asset.symbol,
class: asset.class,
exchange: asset.exchange,
trading: asset.tradable,
date_added: OffsetDateTime::now_utc(),
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum AssetBroadcastMessage {
Added(Asset),
Updated(Asset),
Deleted(Asset),
Reset(Asset),
}

33
backend/src/types/bar.rs Normal file
View File

@@ -0,0 +1,33 @@
use super::websocket::BarMessage;
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use time::OffsetDateTime;
#[derive(Clone, Debug, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Bar {
pub timestamp: OffsetDateTime,
pub asset_symbol: String,
pub open: f64,
pub high: f64,
pub low: f64,
pub close: f64,
pub volume: f64,
pub num_trades: i64,
pub volume_weighted: f64,
}
impl From<BarMessage> for Bar {
fn from(bar_message: BarMessage) -> Self {
Self {
timestamp: bar_message.timestamp,
asset_symbol: bar_message.symbol,
open: bar_message.open,
high: bar_message.high,
low: bar_message.low,
close: bar_message.close,
volume: bar_message.volume,
num_trades: bar_message.num_trades,
volume_weighted: bar_message.volume_weighted,
}
}
}

View File

@@ -0,0 +1,12 @@
use serde::{Deserialize, Serialize};
use sqlx::Type;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)]
pub enum Class {
#[sqlx(rename = "us_equity")]
#[serde(rename = "us_equity")]
UsEquity,
#[sqlx(rename = "crypto")]
#[serde(rename = "crypto")]
Crypto,
}

View File

@@ -0,0 +1,30 @@
use serde::{Deserialize, Serialize};
use sqlx::Type;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)]
pub enum Exchange {
#[sqlx(rename = "AMEX")]
#[serde(rename = "AMEX")]
Amex,
#[sqlx(rename = "ARCA")]
#[serde(rename = "ARCA")]
Arca,
#[sqlx(rename = "BATS")]
#[serde(rename = "BATS")]
Bats,
#[sqlx(rename = "NYSE")]
#[serde(rename = "NYSE")]
Nyse,
#[sqlx(rename = "NASDAQ")]
#[serde(rename = "NASDAQ")]
Nasdaq,
#[sqlx(rename = "NYSEARCA")]
#[serde(rename = "NYSEARCA")]
Nysearca,
#[sqlx(rename = "OTC")]
#[serde(rename = "OTC")]
Otc,
#[sqlx(rename = "CRYPTO")]
#[serde(rename = "CRYPTO")]
Crypto,
}

13
backend/src/types/mod.rs Normal file
View File

@@ -0,0 +1,13 @@
pub mod api;
pub mod asset;
pub mod bar;
pub mod class;
pub mod exchange;
pub mod status;
pub mod websocket;
pub use asset::*;
pub use bar::*;
pub use class::*;
pub use exchange::*;
pub use status::*;

View File

@@ -0,0 +1,12 @@
use serde::{Deserialize, Serialize};
use sqlx::Type;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Type)]
pub enum Status {
#[sqlx(rename = "active")]
#[serde(rename = "active")]
Active,
#[sqlx(rename = "inactive")]
#[serde(rename = "inactive")]
Inactive,
}

View File

@@ -0,0 +1,63 @@
use serde::Deserialize;
use time::OffsetDateTime;
#[derive(Debug, Deserialize, PartialEq)]
#[serde(tag = "T")]
pub enum IncomingMessage {
#[serde(rename = "success")]
Success(SuccessMessage),
#[serde(rename = "subscription")]
Subscription(SubscriptionMessage),
#[serde(rename = "b")]
Bars(BarMessage),
#[serde(rename = "u")]
UpdatedBars(BarMessage),
}
#[derive(Debug, PartialEq, Deserialize)]
pub enum SuccessMessageType {
#[serde(rename = "connected")]
Connected,
#[serde(rename = "authenticated")]
Authenticated,
}
#[derive(Debug, PartialEq, Deserialize)]
pub struct SuccessMessage {
pub msg: SuccessMessageType,
}
#[derive(Debug, PartialEq, Deserialize)]
pub struct SubscriptionMessage {
pub trades: Vec<String>,
pub quotes: Vec<String>,
pub orderbooks: Vec<String>,
pub bars: Vec<String>,
#[serde(rename = "updatedBars")]
pub updated_bars: Vec<String>,
#[serde(rename = "dailyBars")]
pub daily_bars: Vec<String>,
}
#[derive(Debug, PartialEq, Deserialize)]
pub struct BarMessage {
#[serde(rename = "t")]
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
#[serde(rename = "S")]
pub symbol: String,
#[serde(rename = "o")]
pub open: f64,
#[serde(rename = "h")]
pub high: f64,
#[serde(rename = "l")]
pub low: f64,
#[serde(rename = "c")]
pub close: f64,
#[serde(rename = "v")]
pub volume: f64,
#[serde(rename = "n")]
pub num_trades: i64,
#[serde(rename = "vw")]
pub volume_weighted: f64,
}

View File

@@ -0,0 +1,5 @@
pub mod incoming;
pub mod outgoing;
pub use incoming::*;
pub use outgoing::*;

View File

@@ -0,0 +1,47 @@
use serde::Serialize;
#[derive(Debug, Serialize)]
#[serde(tag = "action")]
pub enum OutgoingMessage {
#[serde(rename = "auth")]
Auth(AuthMessage),
#[serde(rename = "subscribe")]
Subscribe(SubscribeMessage),
#[serde(rename = "unsubscribe")]
Unsubscribe(SubscribeMessage),
}
#[derive(Debug, Serialize)]
pub struct AuthMessage {
key: String,
secret: String,
}
impl AuthMessage {
pub fn new(key: String, secret: String) -> Self {
Self { key, secret }
}
}
#[derive(Debug, Serialize)]
pub struct SubscribeMessage {
bars: Vec<String>,
#[serde(rename = "updatedBars")]
updated_bars: Vec<String>,
}
impl SubscribeMessage {
pub fn new(symbol: String) -> Self {
Self {
bars: vec![symbol.clone()],
updated_bars: vec![symbol],
}
}
pub fn from_vec(symbols: Vec<String>) -> Self {
Self {
bars: symbols.clone(),
updated_bars: symbols,
}
}
}