Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(redis): add async redis client that uses the same pattern as sync redis #4356

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 75 additions & 27 deletions relay-redis/src/real.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub use redis;
use redis::aio::{ConnectionManager, ConnectionManagerConfig};
use redis::cluster::ClusterClientBuilder;
use redis::cluster_async::ClusterConnection;
use redis::{Client, Cmd, ConnectionLike, FromRedisValue};
use redis::{Client, Cmd, ConnectionLike, Pipeline, RedisFuture, Value};
use std::error::Error;
use std::fmt::{Debug, Formatter};
use std::thread::Scope;
Expand Down Expand Up @@ -375,7 +375,7 @@ impl RedisPool {
#[derive(Debug, Clone)]
pub struct RedisPools {
/// The pool used for project configurations
pub project_configs: AsyncRedisConnection,
pub project_configs: AsyncRedisClient,
/// The pool used for cardinality limits.
pub cardinality: RedisPool,
/// The pool used for rate limiting/quotas.
Expand All @@ -390,6 +390,51 @@ pub struct Stats {
pub idle_connections: u32,
}

/// Client that wraps a [`AsyncRedisConnection`].
#[derive(Clone, Debug)]
pub struct AsyncRedisClient {
connection: AsyncRedisConnection,
}

impl AsyncRedisClient {
fn new(connection: AsyncRedisConnection) -> Self {
Self { connection }
}

/// Creates a new [`AsyncRedisClient`] in cluster mode.
pub async fn cluster<'a>(
servers: impl IntoIterator<Item = &'a str>,
opts: &RedisConfigOptions,
) -> Result<Self, RedisError> {
AsyncRedisConnection::cluster(servers, opts)
.await
.map(AsyncRedisClient::new)
}

/// Creates a new [`AsyncRedisClient`] in single mode.
pub async fn single(server: &str, opts: &RedisConfigOptions) -> Result<Self, RedisError> {
AsyncRedisConnection::single(server, opts)
.await
.map(AsyncRedisClient::new)
}

/// Returns a shared [`AsyncRedisConnection`].
pub fn get_connection(&self) -> AsyncRedisConnection {
self.connection.clone()
}

/// Return [`Stats`] for [`AsyncRedisClient`].
///
/// It will always return 0 for `idle_connections` and 1 for `connections` since we
/// are re-using the same connection.
pub fn stats(&self) -> Stats {
Stats {
idle_connections: 0,
connections: 1,
}
}
}

/// A wrapper Type for async redis connections. Conceptually it's similar to [`RedisPool`]
/// but async redis does not require a pool since the connections can just be cloned and
/// are thread safe.
Expand Down Expand Up @@ -431,31 +476,6 @@ impl AsyncRedisConnection {
.map_err(RedisError::Redis)?;
Ok(Self::Single(connection_manager))
}

/// Runs the given command on redis and returns the result.
pub async fn query_async<T: FromRedisValue>(&self, cmd: Cmd) -> Result<T, RedisError> {
match self {
Self::Cluster(conn, ..) => cmd
.query_async(&mut conn.clone())
.await
.map_err(RedisError::Redis),
Self::Single(conn, ..) => cmd
.query_async(&mut conn.clone())
.await
.map_err(RedisError::Redis),
}
}

/// Return [`Stats`] for [`AsyncRedisConnection`].
///
/// It will always return 0 for `idle_connections` and 1 for `connections` since we
/// are re-using the same connection.
pub fn stats(&self) -> Stats {
Stats {
idle_connections: 0,
connections: 1,
}
}
}

impl Debug for AsyncRedisConnection {
Expand All @@ -467,3 +487,31 @@ impl Debug for AsyncRedisConnection {
f.debug_tuple(name).finish()
}
}

impl redis::aio::ConnectionLike for AsyncRedisConnection {
fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
match self {
Self::Cluster(conn) => conn.req_packed_command(cmd),
Self::Single(conn) => conn.req_packed_command(cmd),
}
}

fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a Pipeline,
offset: usize,
count: usize,
) -> RedisFuture<'a, Vec<Value>> {
match self {
Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
Self::Single(conn) => conn.req_packed_commands(cmd, offset, count),
}
}

fn get_db(&self) -> i64 {
match self {
Self::Cluster(conn) => conn.get_db(),
Self::Single(conn) => conn.get_db(),
}
}
}
8 changes: 4 additions & 4 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use relay_config::{RedisConfigRef, RedisPoolConfigs};
#[cfg(feature = "processing")]
use relay_redis::redis::Script;
#[cfg(feature = "processing")]
use relay_redis::AsyncRedisConnection;
use relay_redis::AsyncRedisClient;
#[cfg(feature = "processing")]
use relay_redis::{PooledClient, RedisError, RedisPool, RedisPools, RedisScripts};
use relay_system::{channel, Addr, Service, ServiceRunner};
Expand Down Expand Up @@ -477,14 +477,14 @@ pub async fn create_redis_pools(configs: RedisPoolConfigs<'_>) -> Result<RedisPo
#[cfg(feature = "processing")]
async fn create_async_connection(
config: &RedisConfigRef<'_>,
) -> Result<AsyncRedisConnection, RedisError> {
) -> Result<AsyncRedisClient, RedisError> {
match config {
RedisConfigRef::Cluster {
cluster_nodes,
options,
} => AsyncRedisConnection::cluster(cluster_nodes.iter().map(|s| s.as_str()), options).await,
} => AsyncRedisClient::cluster(cluster_nodes.iter().map(|s| s.as_str()), options).await,
RedisConfigRef::Single { server, options } => {
AsyncRedisConnection::single(server, options).await
AsyncRedisClient::single(server, options).await
}
RedisConfigRef::MultiWrite { .. } => {
Err(RedisError::MultiWriteNotSupported("projectconfig"))
Expand Down
23 changes: 14 additions & 9 deletions relay-server/src/services/projects/source/redis.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_redis::{AsyncRedisConnection, RedisError};
use relay_redis::{AsyncRedisClient, RedisError};
use relay_statsd::metric;
use std::fmt::Debug;
use std::sync::Arc;
Expand All @@ -13,7 +13,7 @@ use relay_redis::redis::cmd;
#[derive(Clone, Debug)]
pub struct RedisProjectSource {
config: Arc<Config>,
redis: AsyncRedisConnection,
redis: AsyncRedisClient,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -50,7 +50,7 @@ fn parse_redis_response(raw_response: &[u8]) -> Result<ParsedProjectState, Redis
}

impl RedisProjectSource {
pub fn new(config: Arc<Config>, redis: AsyncRedisConnection) -> Self {
pub fn new(config: Arc<Config>, redis: AsyncRedisClient) -> Self {
RedisProjectSource { config, redis }
}

Expand All @@ -63,11 +63,14 @@ impl RedisProjectSource {
key: ProjectKey,
revision: Revision,
) -> Result<SourceProjectState, RedisProjectError> {
let mut connection = self.redis.get_connection();
// Only check for the revision if we were passed a revision.
if let Some(revision) = revision.as_str() {
let mut cmd = cmd("GET");
cmd.arg(self.get_redis_rev_key(key));
let current_revision: Option<String> = self.redis.query_async(cmd).await?;
let current_revision: Option<String> = cmd("GET")
.arg(self.get_redis_rev_key(key))
.query_async(&mut connection)
.await
.map_err(RedisError::Redis)?;

relay_log::trace!(
"Redis revision {current_revision:?}, requested revision {revision:?}"
Expand All @@ -81,9 +84,11 @@ impl RedisProjectSource {
}
}

let mut raw_cmd = cmd("GET");
raw_cmd.arg(self.get_redis_project_config_key(key));
let raw_response_opt: Option<Vec<u8>> = self.redis.query_async(raw_cmd).await?;
let raw_response_opt: Option<Vec<u8>> = cmd("GET")
.arg(self.get_redis_project_config_key(key))
.query_async(&mut connection)
.await
.map_err(RedisError::Redis)?;

let Some(response) = raw_response_opt else {
metric!(
Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/services/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::services::upstream::{IsNetworkOutage, UpstreamRelay};
use crate::statsd::{RelayGauges, RuntimeCounters, RuntimeGauges};
use relay_config::{Config, RelayMode};
#[cfg(feature = "processing")]
use relay_redis::AsyncRedisConnection;
use relay_redis::AsyncRedisClient;
#[cfg(feature = "processing")]
use relay_redis::{RedisPool, RedisPools, Stats};
use relay_statsd::metric;
Expand Down Expand Up @@ -126,8 +126,8 @@ impl RelayStats {
}

#[cfg(feature = "processing")]
fn async_redis_connection(conn: &AsyncRedisConnection, name: &str) {
Self::stats_metrics(conn.stats(), name);
fn async_redis_connection(client: &AsyncRedisClient, name: &str) {
Self::stats_metrics(client.stats(), name);
}

#[cfg(feature = "processing")]
Expand Down
Loading