From 050f25bba9a2ed51389c7097fcb073efedbb41c3 Mon Sep 17 00:00:00 2001 From: Nikolaos Karaolidis Date: Thu, 5 Jun 2025 08:29:58 +0100 Subject: [PATCH] Add redis Signed-off-by: Nikolaos Karaolidis --- Cargo.lock | 13 ++++++ Cargo.toml | 2 + src/config.rs | 7 +++- src/main.rs | 4 +- src/models/invites.rs | 15 +++++++ src/models/mod.rs | 1 + src/state.rs | 96 +++++++++++++++++++++++++++++-------------- 7 files changed, 103 insertions(+), 35 deletions(-) create mode 100644 src/models/invites.rs diff --git a/Cargo.lock b/Cargo.lock index 9179ca7..51e668e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1139,7 +1139,9 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "time", "tokio", + "uuid", ] [[package]] @@ -3084,6 +3086,17 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" +dependencies = [ + "js-sys", + "serde", + "wasm-bindgen", +] + [[package]] name = "value-bag" version = "1.11.1" diff --git a/Cargo.toml b/Cargo.toml index 51da271..894363b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,4 +30,6 @@ redis-macros = "0.5.4" serde = "1.0.219" serde_json = "1.0.140" serde_yaml = "0.9.34" +time = { version = "0.3.41", features = ["serde"] } tokio = { version = "1.45.1", features = ["rt-multi-thread", "process"] } +uuid = { version = "1.17.0", features = ["serde"] } diff --git a/src/config.rs b/src/config.rs index 2acac86..d051dd4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,7 @@ use clap::Parser; use serde::Deserialize; use std::{ + error::Error, fs, net::{IpAddr, Ipv4Addr}, path::PathBuf, @@ -56,8 +57,10 @@ pub struct Config { pub redis: RedisConfig, } -impl Config { - pub fn from_yaml(path: &PathBuf) -> Result> { +impl TryFrom<&PathBuf> for Config { + type Error = Box; + + fn try_from(path: &PathBuf) -> Result { let contents = fs::read_to_string(path)?; let config = serde_yaml::from_str(&contents)?; Ok(config) diff --git a/src/main.rs b/src/main.rs index 4de8fc9..8ac5747 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,8 +22,8 @@ async fn main() { let args = Args::parse(); log4rs::init_file(args.log_config, Deserializers::default()).unwrap(); - let config = Config::from_yaml(&args.config).unwrap(); - let state = State::from_config(config.clone()).await.unwrap(); + let config = Config::try_from(&args.config).unwrap(); + let state = State::from_config(config.clone()).await; let routes = routes::routes(state); let app = axum::Router::new().nest(&format!("{}/api", config.server.subpath), routes); diff --git a/src/models/invites.rs b/src/models/invites.rs new file mode 100644 index 0000000..7886e56 --- /dev/null +++ b/src/models/invites.rs @@ -0,0 +1,15 @@ +use redis_macros::{FromRedisValue, ToRedisArgs}; +use serde::{Deserialize, Serialize}; +use time::UtcDateTime; +use uuid::Uuid; + +#[derive(Serialize, Deserialize, FromRedisValue, ToRedisArgs)] +struct Invite { + id: Uuid, + groups: Vec, + emails: Vec, + uses: i64, + max_uses: Option, + created_at: UtcDateTime, + expires_at: Option, +} diff --git a/src/models/mod.rs b/src/models/mod.rs index 9766fd7..2071bc5 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -1,3 +1,4 @@ pub mod authelia; pub mod groups; +pub mod invites; pub mod users; diff --git a/src/state.rs b/src/state.rs index 3b14043..6bac1a1 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,5 +1,3 @@ -use std::error::Error; - use async_redis_session::RedisSessionStore; use axum::extract::FromRef; use openidconnect::{ @@ -12,7 +10,8 @@ use openidconnect::{ }, reqwest, }; -use redis::aio::MultiplexedConnection; +use redis::{self, AsyncCommands}; +use tokio::spawn; use crate::config::Config; @@ -48,23 +47,23 @@ pub struct State { pub config: Config, pub oauth_http_client: reqwest::Client, pub oauth_client: OAuthClient, - pub redis_client: MultiplexedConnection, + pub redis_client: redis::aio::MultiplexedConnection, pub session_store: RedisSessionStore, } impl State { - pub async fn from_config(config: Config) -> Result> { - let (oauth_http_client, oauth_client) = oauth_client(&config).await?; - let redis_client = redis_client(&config).await?; - let session_store = session_store(&config)?; + pub async fn from_config(config: Config) -> Self { + let (oauth_http_client, oauth_client) = oauth_client(&config).await; + let redis_client = redis_client(&config).await; + let session_store = session_store(&config); - Ok(Self { + Self { config, oauth_http_client, oauth_client, redis_client, session_store, - }) + } } } @@ -86,7 +85,7 @@ impl FromRef for OAuthClient { } } -impl FromRef for MultiplexedConnection { +impl FromRef for redis::aio::MultiplexedConnection { fn from_ref(state: &State) -> Self { state.redis_client.clone() } @@ -98,53 +97,88 @@ impl FromRef for RedisSessionStore { } } -async fn oauth_client( - config: &Config, -) -> Result<(reqwest::Client, OAuthClient), Box> { +async fn oauth_client(config: &Config) -> (reqwest::Client, OAuthClient) { let oauth_http_client = reqwest::ClientBuilder::new() .redirect(reqwest::redirect::Policy::none()) .danger_accept_invalid_certs(config.oauth.insecure) - .build()?; + .build() + .unwrap(); let provider_metadata = CoreProviderMetadata::discover_async( - IssuerUrl::new(config.oauth.issuer_url.clone())?, + IssuerUrl::new(config.oauth.issuer_url.clone()).unwrap(), &oauth_http_client, ) - .await?; + .await + .unwrap(); let oauth_client = OAuthClient::from_provider_metadata( provider_metadata, ClientId::new(config.oauth.client_id.clone()), Some(ClientSecret::new(config.oauth.client_secret.clone())), ) - .set_redirect_uri(RedirectUrl::new(format!( - "{}{}/api/auth/callback", - config.server.host, config.server.subpath - ))?); + .set_redirect_uri( + RedirectUrl::new(format!( + "{}{}/api/auth/callback", + config.server.host, config.server.subpath + )) + .unwrap(), + ); - Ok((oauth_http_client, oauth_client)) + (oauth_http_client, oauth_client) } -async fn redis_client( - config: &Config, -) -> Result> { +async fn redis_client(config: &Config) -> redis::aio::MultiplexedConnection { let url = format!( "redis://{}:{}/{}", config.redis.host, config.redis.port, config.redis.database ); - let client = redis::Client::open(url)?; - let connection = client.get_multiplexed_async_connection().await?; + let client = redis::Client::open(url).unwrap(); + let mut connection = client.get_multiplexed_async_connection().await.unwrap(); - Ok(connection) + let _: () = redis::cmd("CONFIG") + .arg("SET") + .arg("notify-keyspace-events") + .arg("Ex") + .query_async(&mut connection) + .await + .unwrap(); + + let database = config.redis.database.to_string(); + spawn(async move { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let rconfig = redis::AsyncConnectionConfig::new().set_push_sender(tx); + let mut connection = client + .get_multiplexed_async_connection_with_config(&rconfig) + .await + .unwrap(); + + let channel = format!("__keyevent@{}__:expired", database); + connection.subscribe(&[channel]).await.unwrap(); + + while let Some(msg) = rx.recv().await { + if let Some(msg) = redis::Msg::from_push_info(msg) { + if let Ok(key) = msg.get_payload::() { + if !key.starts_with("invite:") { + continue; + } + + let id = key.trim_start_matches("invite:").to_string(); + let _: i64 = connection.srem("invite:all", id).await.unwrap(); + } + } + } + }); + + connection } -fn session_store(config: &Config) -> Result> { +fn session_store(config: &Config) -> RedisSessionStore { let url = format!( "redis://{}:{}/{}", config.redis.host, config.redis.port, config.redis.database ); - let session_store = RedisSessionStore::new(url)?.with_prefix("session:"); + let session_store = RedisSessionStore::new(url).unwrap().with_prefix("session:"); - Ok(session_store) + session_store }