Skip to content

Commit

Permalink
fix: tokio asynchronous task processing error, added tick processing …
Browse files Browse the repository at this point in the history
…for scheduler.
  • Loading branch information
AH-dark committed Jul 3, 2024
1 parent d22d3d0 commit 2a56d5f
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ async fn run_cron<'a>(job_scheduler: Arc<RwLock<job_scheduler::JobScheduler<'a>>
}

impl<'a> Cron<'a> {
/// Create a new cron with the given node pool.
pub async fn new(mut np: node_pool::NodePool) -> Result<Self, node_pool::Error> {
np.init().await?;

Expand All @@ -38,6 +39,7 @@ impl<'a> Cron<'a> {
})
}

/// Start the cron, blocking the current thread.
pub async fn start(&self) {
let np = self.node_pool.clone();

Expand Down
10 changes: 10 additions & 0 deletions src/driver/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,13 @@ pub static GLOBAL_KEY_PREFIX: &str = "distributed-scheduler";
pub fn get_key_prefix(service_name: &str) -> String {
format!("{}:{}:", GLOBAL_KEY_PREFIX, service_name)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_get_key_prefix() {
assert_eq!(get_key_prefix("test"), "distributed-scheduler:test:");
}
}
53 changes: 53 additions & 0 deletions src/node_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,56 @@ fn equal_ring(a: &[String], b: &[String]) -> bool {

a_sorted == pre_nodes_sorted
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_update_hash_ring() {
let mut pre_nodes = Vec::new();
let state_lock = AtomicBool::new(false);
let mut hash = hashring::HashRing::new();

let nodes = vec!["node1".to_string(), "node2".to_string()];

update_hash_ring(&mut pre_nodes, &state_lock, &mut hash, &nodes).await.unwrap();
assert_eq!(pre_nodes, nodes);
assert_eq!(hash.get(&"test"), Some(&"node2".to_string()));

let nodes = vec!["node1".to_string(), "node2".to_string(), "node3".to_string()];

update_hash_ring(&mut pre_nodes, &state_lock, &mut hash, &nodes).await.unwrap();
assert_eq!(pre_nodes, nodes);
assert_eq!(hash.get(&"test"), Some(&"node2".to_string()));

let nodes = vec!["node1".to_string(), "node3".to_string()];

update_hash_ring(&mut pre_nodes, &state_lock, &mut hash, &nodes).await.unwrap();
assert_eq!(pre_nodes, nodes);
assert_eq!(hash.get(&"test"), Some(&"node3".to_string()));

let nodes = vec!["node1".to_string(), "node3".to_string()];

update_hash_ring(&mut pre_nodes, &state_lock, &mut hash, &nodes).await.unwrap();
assert_eq!(pre_nodes, nodes);
assert_eq!(hash.get(&"test"), Some(&"node3".to_string()));
}

#[tokio::test]
async fn test_equal_ring() {
let a = vec!["node1".to_string(), "node2".to_string()];
let b = vec!["node1".to_string(), "node2".to_string()];

assert_eq!(equal_ring(&a, &b), true);

let a = vec!["node1".to_string(), "node2".to_string()];
let b = vec!["node2".to_string(), "node1".to_string()];

assert_eq!(equal_ring(&a, &b), true);

let a = vec!["node1".to_string(), "node2".to_string()];
let b = vec!["node1".to_string(), "node3".to_string()];
assert_eq!(equal_ring(&a, &b), false);
}
}

0 comments on commit 2a56d5f

Please sign in to comment.