Skip to content

Commit

Permalink
style: use rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
AH-dark committed Dec 27, 2024
1 parent 58d9476 commit 4dcb5bc
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 87 deletions.
14 changes: 14 additions & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
comment_width = 100
max_width = 120
edition = "2021"
format_code_in_doc_comments = true
format_strings = true
group_imports = "StdExternalCrate"
imports_granularity = "Crate"
normalize_comments = true
normalize_doc_attributes = true
wrap_comments = true
trailing_comma = "Vertical"
fn_params_layout = "Vertical"
blank_lines_upper_bound = 2
blank_lines_lower_bound = 0
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ thiserror = "2.0"
hashring = "0.3.6"
job_scheduler = "1.2.1"
log = "0.4"
md5 = "0.7.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync"] }
uuid = { version = "1", features = ["v4"] }

etcd-client = { version = "0.14", optional = true }
redis = { version = "0.27.5", optional = true, features = ["tokio-comp"] }
redis = { version = "0.27.6", optional = true, features = ["tokio-comp"] }

[dev-dependencies]
pretty_env_logger = "0.5"
Expand Down
3 changes: 2 additions & 1 deletion examples/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
log::info!("Adding job");
cron.add_job("test", "* * * * * *".parse().unwrap(), || {
log::info!("Running job: {}", chrono::Utc::now());
}).await?;
})
.await?;

log::info!("Starting cron");
cron.start().await;
Expand Down
11 changes: 9 additions & 2 deletions examples/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@ async fn main() -> Result<(), Box<dyn Error>> {

let rdb = redis::Client::open("redis://localhost:6379").unwrap();

let driver = driver::redis::RedisDriver::new(rdb.get_multiplexed_tokio_connection().await?, "example-service", &uuid::Uuid::new_v4().to_string()).await?.with_timeout(3);
let driver = driver::redis::RedisDriver::new(
rdb.get_multiplexed_tokio_connection().await?,
"example-service",
&uuid::Uuid::new_v4().to_string(),
)
.await?
.with_timeout(3);
let np = node_pool::NodePool::new(driver).await?;
let cron = cron::Cron::new(np).await;

log::info!("Adding job");
cron.add_job("test", "* * * * * *".parse().unwrap(), || {
log::info!("Running job: {}", chrono::Utc::now());
}).await?;
})
.await?;

log::info!("Starting cron");
cron.start().await;
Expand Down
30 changes: 19 additions & 11 deletions src/cron.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::{collections::HashMap, future::Future, sync::Arc};

use job_scheduler::{Schedule, Uuid};
use tokio::sync::Mutex;

use crate::driver::Driver;
use crate::node_pool;
use crate::{driver::Driver, node_pool};

/// The `Cron` struct is the main entry point for the library, providing the ability to add and remove jobs.
/// The `Cron` struct is the main entry point for the library, providing the ability to add and
/// remove jobs.
pub struct Cron<'a, D>
where
D: Driver + Send + Sync,
Expand Down Expand Up @@ -60,7 +58,11 @@ where
}

/// Register a job in the scheduler
async fn register_job(&self, job_name: &str, job: job_scheduler::Job<'a>) {
async fn register_job(
&self,
job_name: &str,
job: job_scheduler::Job<'a>,
) {
let mut cron = self.scheduler.lock().await;
let id = cron.add(job);
self.jobs.lock().await.insert(job_name.to_string(), id);
Expand All @@ -73,8 +75,12 @@ where
/// * `job_name` - The unique name of the job
/// * `schedule` - The schedule of the job
/// * `run` - The function to run
///
pub async fn add_job<F>(&self, job_name: &str, schedule: Schedule, run: F) -> Result<(), Error>
pub async fn add_job<F>(
&self,
job_name: &str,
schedule: Schedule,
run: F,
) -> Result<(), Error>
where
F: 'static + Sync + Send + Fn(),
{
Expand Down Expand Up @@ -159,8 +165,10 @@ where
/// # Arguments
///
/// * `job_name` - The unique name of the job
///
pub async fn remove_job(&self, job_name: &str) -> Result<(), Error> {
pub async fn remove_job(
&self,
job_name: &str,
) -> Result<(), Error> {
if let Some(id) = self.jobs.lock().await.remove(job_name) {
self.scheduler.lock().await.remove(id);
}
Expand Down
59 changes: 45 additions & 14 deletions src/driver/etcd.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
/// Etcd driver implementation.
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::{atomic::AtomicBool, Arc};

use etcd_client::*;
use tokio::sync::{Mutex, RwLock};

use super::{Driver, utils};
use super::{utils, Driver};

const DEFAULT_LEASE_TTL: i64 = 3;

Expand All @@ -26,9 +24,11 @@ pub struct EtcdDriver {
}

impl std::fmt::Debug for EtcdDriver {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f
.debug_struct("EtcdDriver")
fn fmt(
&self,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
f.debug_struct("EtcdDriver")
.field("service_name", &self.service_name)
.field("node_id", &self.node_id)
.field("stop", &self.stop)
Expand All @@ -51,7 +51,11 @@ pub enum Error {

impl EtcdDriver {
/// Create a new etcd driver with the given client, service name, and node id.
pub async fn new(client: Client, service_name: &str, node_id: &str) -> Result<Self, Error> {
pub async fn new(
client: Client,
service_name: &str,
node_id: &str,
) -> Result<Self, Error> {
if service_name.is_empty() {
return Err(Error::EmptyServiceName);
}
Expand All @@ -71,7 +75,10 @@ impl EtcdDriver {
}

/// Set the timeout for the driver.
pub fn with_timeout(mut self, timeout: i64) -> Self {
pub fn with_timeout(
mut self,
timeout: i64,
) -> Self {
self.lease_ttl = timeout;
self
}
Expand All @@ -92,20 +99,33 @@ impl Driver for EtcdDriver {
Ok(self.node_list.read().await.iter().cloned().collect())
}

/// Start a routine to watch for node changes and register the current node. Use lease to keep the node key alive.
/// Start a routine to watch for node changes and register the current node. Use lease to keep
/// the node key alive.
async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let mut client = self.client.lock().await;
self.stop.store(false, std::sync::atomic::Ordering::SeqCst);

// init node list
let mut node_list = self.node_list.write().await;
for kv in client.get(utils::get_key_prefix(&self.service_name), Some(GetOptions::new().with_prefix())).await?.kvs() {
for kv in client
.get(
utils::get_key_prefix(&self.service_name),
Some(GetOptions::new().with_prefix()),
)
.await?
.kvs()
{
node_list.insert(kv.key_str()?.into());
}

// watch for node changes
{
let (mut watcher, mut watch_stream) = client.watch(utils::get_key_prefix(&self.service_name), Some(WatchOptions::new().with_prefix())).await?;
let (mut watcher, mut watch_stream) = client
.watch(
utils::get_key_prefix(&self.service_name),
Some(WatchOptions::new().with_prefix()),
)
.await?;
let node_list = self.node_list.clone();
let stop = self.stop.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -160,7 +180,12 @@ impl Driver for EtcdDriver {

loop {
if stop.load(std::sync::atomic::Ordering::SeqCst) {
inner_client.lock().await.lease_revoke(lease_id).await.expect("Failed to revoke lease");
inner_client
.lock()
.await
.lease_revoke(lease_id)
.await
.expect("Failed to revoke lease");
break;
}

Expand All @@ -173,7 +198,13 @@ impl Driver for EtcdDriver {
});

// put the node key
client.put(self.node_id.as_str(), self.node_id.as_str(), Some(PutOptions::new().with_lease(lease_id))).await?;
client
.put(
self.node_id.as_str(),
self.node_id.as_str(),
Some(PutOptions::new().with_lease(lease_id)),
)
.await?;
}

Ok(())
Expand Down
67 changes: 40 additions & 27 deletions src/driver/redis.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::{
fmt::Debug,
sync::{atomic::AtomicBool, Arc},
};

use redis::aio::ConnectionLike;
use redis::AsyncCommands;
use redis::{aio::ConnectionLike, AsyncCommands};

use super::{utils, Driver};

Expand Down Expand Up @@ -36,7 +36,11 @@ impl<C> RedisDriver<C>
where
C: ConnectionLike,
{
pub async fn new(con: C, service_name: &str, node_id: &str) -> Result<Self, Error> {
pub async fn new(
con: C,
service_name: &str,
node_id: &str,
) -> Result<Self, Error> {
if service_name.is_empty() {
return Err(Error::EmptyServiceName);
}
Expand All @@ -54,7 +58,10 @@ where
})
}

pub fn with_timeout(mut self, timeout: u64) -> Self {
pub fn with_timeout(
mut self,
timeout: u64,
) -> Self {
self.timeout = timeout;
self
}
Expand All @@ -64,7 +71,6 @@ where
}
}


#[async_trait::async_trait]
impl<C> Driver for RedisDriver<C>
where
Expand Down Expand Up @@ -133,8 +139,11 @@ where
/// * `node_id` - The id of the node
/// * `timeout` - The timeout of the node
/// * `con` - The redis connection
///
async fn register_node<C: ConnectionLike + Send>(node_id: &str, timeout: u64, con: &mut C) -> Result<(), Box<dyn std::error::Error>> {
async fn register_node<C: ConnectionLike + Send>(
node_id: &str,
timeout: u64,
con: &mut C,
) -> Result<(), Box<dyn std::error::Error>> {
con.set_ex(node_id, node_id, timeout).await?;
Ok(())
}
Expand All @@ -152,8 +161,12 @@ async fn register_node<C: ConnectionLike + Send>(node_id: &str, timeout: u64, co
/// # Returns
///
/// * `Result<(), Box<dyn std::error::Error>` - The result of the function
///
async fn heartbeat<C: ConnectionLike + Send>(node_id: &str, timeout: u64, con: C, started: Arc<AtomicBool>) -> Result<(), Box<dyn std::error::Error>> {
async fn heartbeat<C: ConnectionLike + Send>(
node_id: &str,
timeout: u64,
con: C,
started: Arc<AtomicBool>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
let mut con = con;
let mut error_time = 0;
Expand All @@ -170,7 +183,8 @@ async fn heartbeat<C: ConnectionLike + Send>(node_id: &str, timeout: u64, con: C
interval.tick().await;

// register the node
register_node(node_id, timeout, &mut con).await
register_node(node_id, timeout, &mut con)
.await
.map_err(|e| {
error_time += 1;
log::error!("Failed to register node: {:?}", e);
Expand Down Expand Up @@ -198,12 +212,10 @@ mod tests {
let node_id = "test-node";
let timeout = 10_u64;

let mut mock_con = MockRedisConnection::new(vec![
MockCmd::new(
redis::cmd("SETEX").arg(node_id).arg(timeout as usize).arg(node_id),
Ok(redis::Value::Okay),
),
]);
let mut mock_con = MockRedisConnection::new(vec![MockCmd::new(
redis::cmd("SETEX").arg(node_id).arg(timeout as usize).arg(node_id),
Ok(redis::Value::Okay),
)]);

// Perform the node registration
let result = register_node(node_id, timeout, &mut mock_con).await;
Expand All @@ -218,14 +230,15 @@ mod tests {
let pattern = utils::get_key_prefix(service_name) + "*";

let keys = ["test-service-node1", "test-service-node2", "test-service-node3"];
let keys_as_redis_values = keys.iter().map(|k| redis::Value::BulkString(k.to_string().into_bytes())).collect::<Vec<_>>();

let mock_con = MockRedisConnection::new(vec![
MockCmd::new(
redis::cmd("SCAN").arg("0").arg("MATCH").arg(&pattern),
Ok(redis::Value::Array(keys_as_redis_values)),
)
]);
let keys_as_redis_values = keys
.iter()
.map(|k| redis::Value::BulkString(k.to_string().into_bytes()))
.collect::<Vec<_>>();

let mock_con = MockRedisConnection::new(vec![MockCmd::new(
redis::cmd("SCAN").arg("0").arg("MATCH").arg(&pattern),
Ok(redis::Value::Array(keys_as_redis_values)),
)]);

// Perform the node registration
let driver = RedisDriver::new(mock_con, service_name, node_id).await.unwrap();
Expand Down
Loading

0 comments on commit 4dcb5bc

Please sign in to comment.