diff --git a/Cargo.toml b/Cargo.toml index 05050247e..66d08e777 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ storage_azure = ["opendal/services-azblob"] storage_gcp = ["opendal/services-gcs"] # Cache feature cache_inmem = ["dep:moka"] +cache_redis = ["dep:bb8", "dep:redis", "dep:bb8-redis", "dep:rmp-serde"] bg_redis = ["dep:rusty-sidekiq", "dep:bb8"] bg_pg = ["dep:sqlx", "dep:ulid"] bg_sqlt = ["dep:sqlx", "dep:ulid"] @@ -157,6 +158,11 @@ ulid = { version = "1", optional = true } rusty-sidekiq = { version = "0.11.0", default-features = false, optional = true } bb8 = { version = "0.8.1", optional = true } +# rediq queue +bb8-redis = { version = "0.20.0", optional = true } +redis = { version = "0.28.2", optional = true } +rmp-serde = { version = "1.3.0", optional = true } + scraper = { version = "0.21.0", features = ["deterministic"], optional = true } [workspace.dependencies] diff --git a/src/boot.rs b/src/boot.rs index 1689118de..b1f5a65c2 100644 --- a/src/boot.rs +++ b/src/boot.rs @@ -338,13 +338,14 @@ pub async fn create_context( }; let queue_provider = bgworker::create_queue_provider(&config).await?; + let cache = cache::create_cache_provider(&config).await?; let ctx = AppContext { environment: environment.clone(), #[cfg(feature = "with-db")] db, queue_provider, storage: Storage::single(storage::drivers::null::new()).into(), - cache: cache::Cache::new(cache::drivers::null::new()).into(), + cache, config, mailer, }; diff --git a/src/cache/drivers/inmem.rs b/src/cache/drivers/inmem.rs index b5db35c14..86bbecda5 100644 --- a/src/cache/drivers/inmem.rs +++ b/src/cache/drivers/inmem.rs @@ -6,11 +6,13 @@ use std::{ time::{Duration, Instant}, }; -use async_trait::async_trait; -use moka::{sync::Cache, Expiry}; - use super::CacheDriver; use crate::cache::CacheResult; +use crate::config::InMemCacheConfig; +use async_trait::async_trait; +use moka::{sync::Cache, Expiry}; +use serde::de::DeserializeOwned; +use serde::Serialize; /// Creates a new instance of the in-memory cache driver, with a default Loco /// configuration. @@ -19,9 +21,9 @@ use crate::cache::CacheResult; /// /// A boxed [`CacheDriver`] instance. #[must_use] -pub fn new() -> Box { +pub async fn new(config: &InMemCacheConfig) -> Box { let cache: Cache = Cache::builder() - .max_capacity(32 * 1024 * 1024) + .max_capacity(config.max_capacity) .expire_after(InMemExpiry) .build(); Inmem::from(cache) @@ -30,7 +32,7 @@ pub fn new() -> Box { /// Represents the in-memory cache driver. #[derive(Debug)] pub struct Inmem { - cache: Cache, + cache: Cache, } impl Inmem { @@ -61,7 +63,7 @@ impl CacheDriver for Inmem { /// # Errors /// /// Returns a `CacheError` if there is an error during the operation. - async fn get(&self, key: &str) -> CacheResult> { + async fn get(&self, key: &str) -> CacheResult> { let result = self.cache.get(key); match result { None => Ok(None), @@ -74,7 +76,7 @@ impl CacheDriver for Inmem { /// # Errors /// /// Returns a `CacheError` if there is an error during the operation. - async fn insert(&self, key: &str, value: &str) -> CacheResult<()> { + async fn insert(&self, key: &str, value: &T) -> CacheResult<()> { self.cache.insert( key.to_string(), (Expiration::Never, Arc::new(value).to_string()), @@ -89,10 +91,10 @@ impl CacheDriver for Inmem { /// /// Returns a [`super::CacheError`] if there is an error during the /// operation. - async fn insert_with_expiry( + async fn insert_with_expiry( &self, key: &str, - value: &str, + value: &T, duration: Duration, ) -> CacheResult<()> { self.cache.insert( diff --git a/src/cache/drivers/mod.rs b/src/cache/drivers/mod.rs index 640e63ab1..faaab863c 100644 --- a/src/cache/drivers/mod.rs +++ b/src/cache/drivers/mod.rs @@ -3,13 +3,15 @@ //! This module defines traits and implementations for cache drivers. use std::time::Duration; -use async_trait::async_trait; - use super::CacheResult; +use async_trait::async_trait; +use serde::de::DeserializeOwned; +use serde::Serialize; #[cfg(feature = "cache_inmem")] pub mod inmem; pub mod null; +pub mod redis; /// Trait representing a cache driver. #[async_trait] @@ -28,7 +30,7 @@ pub trait CacheDriver: Sync + Send { /// /// Returns a [`super::CacheError`] if there is an error during the /// operation. - async fn get(&self, key: &str) -> CacheResult>; + async fn get(&self, key: &str) -> CacheResult>; /// Inserts a key-value pair into the cache. /// @@ -36,7 +38,7 @@ pub trait CacheDriver: Sync + Send { /// /// Returns a [`super::CacheError`] if there is an error during the /// operation. - async fn insert(&self, key: &str, value: &str) -> CacheResult<()>; + async fn insert(&self, key: &str, value: &T) -> CacheResult<()>; /// Inserts a key-value pair into the cache that expires after the /// specified duration. @@ -45,10 +47,10 @@ pub trait CacheDriver: Sync + Send { /// /// Returns a [`super::CacheError`] if there is an error during the /// operation. - async fn insert_with_expiry( + async fn insert_with_expiry( &self, key: &str, - value: &str, + value: &T, duration: Duration, ) -> CacheResult<()>; diff --git a/src/cache/drivers/redis.rs b/src/cache/drivers/redis.rs new file mode 100644 index 000000000..56a6d540b --- /dev/null +++ b/src/cache/drivers/redis.rs @@ -0,0 +1,164 @@ +//! # Redis Cache Driver +//! +//! This module implements a cache driver using an redis cache. +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use super::CacheDriver; +use crate::cache::{CacheError, CacheResult}; +use crate::config::RedisCacheConfig; +use async_trait::async_trait; +use bb8::Pool; +use bb8_redis::RedisConnectionManager; +use opendal::Builder; +use redis::{cmd, AsyncCommands}; +use serde::de::DeserializeOwned; +use serde::Serialize; + +/// Creates a new instance of the in-memory cache driver, with a default Loco +/// configuration. +/// +/// # Returns +/// +/// A boxed [`CacheDriver`] instance. +#[must_use] +pub async fn new(config: &RedisCacheConfig) -> CacheResult> { + let manager = RedisConnectionManager::new(config.uri.clone()) + .map_err(|e| CacheError::Any(Box::new(e)))?; + let redis = Pool::builder().build(manager).await?; + + Ok(Redis::from(redis)) +} + +/// Represents the in-memory cache driver. +pub struct Redis { + redis: Pool, +} + +impl Redis { + /// Constructs a new [`Redis`] instance from a given cache. + /// + /// # Returns + /// + /// A boxed [`CacheDriver`] instance. + #[must_use] + pub fn from(redis: Pool) -> Box { + Box::new(Self { redis }) + } +} + +#[async_trait] +impl CacheDriver for Redis { + /// Checks if a key exists in the cache. + /// + /// # Errors + /// + /// Returns a `CacheError` if there is an error during the operation. + async fn contains_key(&self, key: &str) -> CacheResult { + let mut connection = self.redis.get().await?; + Ok(connection.exists(key).await?) + } + + /// Retrieves a value from the cache based on the provided key. + /// + /// # Errors + /// + /// Returns a `CacheError` if there is an error during the operation. + async fn get(&self, key: &str) -> CacheResult> { + let mut connection = self.redis.get().await?; + let data: Option> = connection.get(key).await?; + + match data { + Some(bytes) => { + let value = + rmp_serde::from_slice(&bytes).map_err(|e| CacheError::Any(Box::new(e)))?; + Ok(Some(value)) + } + None => Ok(None), + } + } + + /// Inserts a key-value pair into the cache. + /// + /// # Errors + /// + /// Returns a `CacheError` if there is an error during the operation. + async fn insert(&self, key: &str, value: &T) -> CacheResult<()> { + let mut connection = self.redis.get().await?; + let encoded = rmp_serde::to_vec(value).map_err(|e| CacheError::Any(Box::new(e)))?; + connection.set(key, encoded).await?; + Ok(()) + } + + /// Inserts a key-value pair into the cache that expires after the specified + /// number of seconds. + /// + /// # Errors + /// + /// Returns a [`super::CacheError`] if there is an error during the + /// operation. + async fn insert_with_expiry( + &self, + key: &str, + value: &T, + duration: Duration, + ) -> CacheResult<()> { + let mut connection = self.redis.get().await?; + let encoded = rmp_serde::to_vec(value).map_err(|e| CacheError::Any(Box::new(e)))?; + connection + .set_ex(key, encoded, duration.as_secs() as usize) + .await?; + Ok(()) + } + + /// Removes a key-value pair from the cache. + /// + /// # Errors + /// + /// Returns a `CacheError` if there is an error during the operation. + async fn remove(&self, key: &str) -> CacheResult<()> { + let mut connection = self.redis.get().await?; + connection.del(key); + Ok(()) + } + + /// Clears all key-value pairs from the cache. + /// + /// # Errors + /// + /// Returns a `CacheError` if there is an error during the operation. + async fn clear(&self) -> CacheResult<()> { + let mut connection = self.redis.get().await?; + cmd("flushall").query(connection).await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[tokio::test] + async fn is_contains_key() { + todo!() + } + + #[tokio::test] + async fn can_get_key_value() { + todo!() + } + + #[tokio::test] + async fn can_remove_key() { + todo!() + } + + #[tokio::test] + async fn can_clear() { + todo!() + } +} diff --git a/src/cache/mod.rs b/src/cache/mod.rs index df803403e..e65bbf467 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -3,10 +3,15 @@ //! This module provides a generic cache interface for various cache drivers. pub mod drivers; -use std::{future::Future, time::Duration}; - use self::drivers::CacheDriver; -use crate::Result as LocoResult; +use crate::bgworker::{pg, skq, sqlt, Queue}; +use crate::cache::drivers::{inmem, redis}; +use crate::config::Config; +use crate::{config, Error, Result as LocoResult}; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::sync::Arc; +use std::{future::Future, time::Duration}; /// Errors related to cache operations #[derive(thiserror::Error, Debug)] @@ -18,6 +23,25 @@ pub enum CacheError { pub type CacheResult = std::result::Result; +/// Create a provider +/// +/// # Errors +/// +/// This function will return an error if fails to build +#[allow(clippy::missing_panics_doc)] +pub async fn create_cache_provider(config: &Config) -> crate::Result> { + match &config.cache { + config::CacheConfig::Redis(config) => Ok(Arc::new(redis::new(config).await?)), + config::CacheConfig::InMem(config) => Ok(Arc::new(inmem::new(config).await?)), + + #[allow(unreachable_patterns)] + _ => Err(Error::string( + "no cache provider feature was selected and compiled, but cache configuration \ + is present", + )), + } +} + /// Represents a cache instance pub struct Cache { /// The cache driver used for underlying operations @@ -64,7 +88,7 @@ impl Cache { /// # Errors /// A [`CacheResult`] containing an `Option` representing the retrieved /// value. - pub async fn get(&self, key: &str) -> CacheResult> { + pub async fn get(&self, key: &str) -> CacheResult> { self.driver.get(key).await } @@ -83,7 +107,7 @@ impl Cache { /// # Errors /// /// A [`CacheResult`] indicating the success of the operation. - pub async fn insert(&self, key: &str, value: &str) -> CacheResult<()> { + pub async fn insert(&self, key: &str, value: &T) -> CacheResult<()> { self.driver.insert(key, value).await } @@ -104,10 +128,10 @@ impl Cache { /// # Errors /// /// A [`CacheResult`] indicating the success of the operation. - pub async fn insert_with_expiry( + pub async fn insert_with_expiry( &self, key: &str, - value: &str, + value: &T, duration: Duration, ) -> CacheResult<()> { self.driver.insert_with_expiry(key, value, duration).await @@ -134,9 +158,10 @@ impl Cache { /// # Errors /// /// A [`LocoResult`] indicating the success of the operation. - pub async fn get_or_insert(&self, key: &str, f: F) -> LocoResult + pub async fn get_or_insert(&self, key: &str, f: F) -> LocoResult where - F: Future> + Send, + T: Serialize + DeserializeOwned, + F: Future> + Send, { if let Some(value) = self.driver.get(key).await? { Ok(value) @@ -169,14 +194,15 @@ impl Cache { /// # Errors /// /// A [`LocoResult`] indicating the success of the operation. - pub async fn get_or_insert_with_expiry( + pub async fn get_or_insert_with_expiry( &self, key: &str, duration: Duration, f: F, - ) -> LocoResult + ) -> LocoResult where - F: Future> + Send, + T: Serialize + DeserializeOwned, + F: Future> + Send, { if let Some(value) = self.driver.get(key).await? { Ok(value) diff --git a/src/config.rs b/src/config.rs index 69d1a73e0..19df6add7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -49,6 +49,7 @@ pub struct Config { pub server: Server, #[cfg(feature = "with-db")] pub database: Database, + pub cache: Option, pub queue: Option, pub auth: Option, #[serde(default)] @@ -220,6 +221,30 @@ pub struct Database { pub dangerously_recreate: bool, } +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(tag = "kind")] +pub enum CacheConfig { + /// In-memory cache + InMem(InMemCacheConfig), + /// Redis cache + Redis(RedisCacheConfig), +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct InMemCacheConfig { + #[serde(default = "cache_in_mem_max_capacity")] + pub max_capacity: u64, +} + +fn cache_in_mem_max_capacity() -> u64 { + 32 * 1024 * 1024 +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct RedisCacheConfig { + pub uri: String, +} + #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(tag = "kind")] pub enum QueueConfig {