Skip to content

Commit

Permalink
chore: code style optimized
Browse files Browse the repository at this point in the history
  • Loading branch information
AH-dark committed Jul 6, 2024
1 parent 07a89df commit d65b03d
Showing 1 changed file with 26 additions and 26 deletions.
52 changes: 26 additions & 26 deletions src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::node_pool;
pub struct Cron<'a> {
node_pool: Arc<node_pool::NodePool>,
jobs: Mutex<HashMap<String, Uuid>>,
cron: Arc<Mutex<job_scheduler::JobScheduler<'a>>>,
scheduler: Arc<Mutex<job_scheduler::JobScheduler<'a>>>,
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -18,7 +18,7 @@ pub enum Error {
NodePool(#[from] node_pool::Error),
}

async fn run_cron<'a>(job_scheduler: Arc<Mutex<job_scheduler::JobScheduler<'a>>>) {
async fn run_scheduler<'a>(job_scheduler: Arc<Mutex<job_scheduler::JobScheduler<'a>>>) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));

loop {
Expand All @@ -33,7 +33,7 @@ impl<'a> Cron<'a> {
Self {
node_pool: Arc::new(np),
jobs: Mutex::new(HashMap::new()),
cron: Arc::new(Mutex::new(job_scheduler::JobScheduler::new())),
scheduler: Arc::new(Mutex::new(job_scheduler::JobScheduler::new())),
}
}

Expand All @@ -42,13 +42,20 @@ impl<'a> Cron<'a> {
let np = self.node_pool.clone();

tokio::select! {
_ = run_cron(self.cron.clone()) => {},
_ = run_scheduler(self.scheduler.clone()) => {},
val = np.start() => {
val.expect("Failed to start node pool");
},
}
}

/// Register a job in the scheduler
async fn register_job(&self, job_name: &str, job: job_scheduler::Job<'a>) {
let mut cron = self.scheduler.lock().await;
let id = cron.add(job);
self.jobs.lock().await.insert(job_name.to_string(), id);
}

/// Add a job to the cron
///
/// # Arguments
Expand All @@ -57,9 +64,9 @@ impl<'a> Cron<'a> {
/// * `schedule` - The schedule of the job
/// * `run` - The function to run
///
pub async fn add_job<T>(&self, job_name: &str, schedule: Schedule, run: T) -> Result<(), Error>
pub async fn add_job<F>(&self, job_name: &str, schedule: Schedule, run: F) -> Result<(), Error>
where
T: 'static + Sync + Send + Fn(),
F: 'static + Sync + Send + Fn(),
{
let run = Arc::new(run);

Expand All @@ -81,10 +88,8 @@ impl<'a> Cron<'a> {
});
}
});
let id = self.cron.lock().await.add(job);

let mut jobs = self.jobs.lock().await;
jobs.insert(job_name.to_string(), id);
self.register_job(job_name, job).await;

Ok(())
}
Expand All @@ -97,24 +102,24 @@ impl<'a> Cron<'a> {
/// * `schedule` - The schedule of the job
/// * `run` - The async function to run
///
pub async fn add_async_job<T>(&self, job_name: &str, schedule: Schedule, run: T) -> Result<(), Error>
pub async fn add_async_job<F>(&self, job_name: &str, schedule: Schedule, run: F) -> Result<(), Error>
where
T: 'static + Sync + Send + Fn() -> tokio::task::JoinHandle<()>,
F: 'static + Sync + Send + Fn() -> tokio::task::JoinHandle<()>,
{
let run = Arc::new(run);

let mut cron = self.cron.lock().await;
let id = cron.add(job_scheduler::Job::new(schedule, {
let job = job_scheduler::Job::new(schedule, {
let job_name = job_name.to_string();
let np = self.node_pool.clone();
let np = Arc::clone(&self.node_pool);
let run = Arc::clone(&run);

move || {
let job_name = job_name.clone();
let np = np.clone();
let run = run.clone();
let np = Arc::clone(&np);
let run = Arc::clone(&run);

// spawn the async job
tokio::task::spawn_blocking(|| async move {
tokio::spawn(async move {
// check if the job is available
if np.check_job_available(&job_name)
.await
Expand All @@ -123,12 +128,9 @@ impl<'a> Cron<'a> {
}
});
}
}));
drop(cron);
});

let mut jobs = self.jobs.lock().await;
jobs.insert(job_name.to_string(), id);
drop(jobs);
self.register_job(job_name, job).await;

Ok(())
}
Expand All @@ -140,11 +142,9 @@ impl<'a> Cron<'a> {
/// * `job_name` - The unique name of the job
///
pub async fn remove_job(&self, job_name: &str) -> Result<(), Error> {
let mut jobs = self.jobs.lock().await;
if let Some(id) = jobs.remove(job_name) {
self.cron.lock().await.remove(id);
if let Some(id) = self.jobs.lock().await.remove(job_name) {
self.scheduler.lock().await.remove(id);
}
drop(jobs);

Ok(())
}
Expand Down

0 comments on commit d65b03d

Please sign in to comment.