Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/main' into brett/unstuck-v0
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Jan 30, 2024
2 parents 26fa4f4 + e2b5dcb commit 4492e48
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
11 changes: 10 additions & 1 deletion hook-common/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Instant;
use std::time::{Instant, SystemTime};

use axum::{
body::Body, extract::MatchedPath, http::Request, middleware::Next, response::IntoResponse,
Expand Down Expand Up @@ -71,3 +71,12 @@ pub async fn track_metrics(req: Request<Body>, next: Next) -> impl IntoResponse

response
}

/// Returns the number of seconds since the Unix epoch, to use in prom gauges.
/// Saturates to zero if the system time is set before epoch.
pub fn get_current_timestamp_seconds() -> f64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as f64
}
28 changes: 16 additions & 12 deletions hook-janitor/src/webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::cleanup::Cleaner;
use crate::kafka_producer::KafkaContext;

use hook_common::kafka_messages::app_metrics::{AppMetric, AppMetricCategory};
use hook_common::metrics::get_current_timestamp_seconds;

#[derive(Error, Debug)]
pub enum WebhookCleanerError {
Expand Down Expand Up @@ -114,9 +115,9 @@ struct FailedRow {

#[derive(sqlx::FromRow, Debug)]
struct QueueDepth {
oldest_created_at_untried: DateTime<Utc>,
oldest_scheduled_at_untried: DateTime<Utc>,
count_untried: i64,
oldest_created_at_retries: DateTime<Utc>,
oldest_scheduled_at_retries: DateTime<Utc>,
count_retries: i64,
}

Expand Down Expand Up @@ -218,9 +219,9 @@ impl WebhookCleaner {

let base_query = r#"
SELECT
COALESCE(MIN(CASE WHEN attempt = 0 THEN created_at END), now()) AS oldest_created_at_untried,
COALESCE(MIN(CASE WHEN attempt = 0 THEN scheduled_at END), now()) AS oldest_scheduled_at_untried,
COALESCE(SUM(CASE WHEN attempt = 0 THEN 1 ELSE 0 END), 0) AS count_untried,
COALESCE(MIN(CASE WHEN attempt > 0 THEN created_at END), now()) AS oldest_created_at_retries,
COALESCE(MIN(CASE WHEN attempt > 0 THEN scheduled_at END), now()) AS oldest_scheduled_at_retries,
COALESCE(SUM(CASE WHEN attempt > 0 THEN 1 ELSE 0 END), 0) AS count_retries
FROM job_queue
WHERE status = 'available';
Expand Down Expand Up @@ -405,17 +406,18 @@ impl WebhookCleaner {
// of rows in memory. It seems unlikely we'll need to paginate, but that can be added in the
// future if necessary.

let untried_status = [("status", "untried")];
let retries_status = [("status", "retries")];

let jobs_unstuck_count = self.reschedule_stuck_jobs().await?;

let queue_depth = self.get_queue_depth().await?;
metrics::gauge!("queue_depth_oldest_created_at_untried")
.set(queue_depth.oldest_created_at_untried.timestamp() as f64);
metrics::gauge!("queue_depth", &[("status", "untried")])
.set(queue_depth.count_untried as f64);
metrics::gauge!("queue_depth_oldest_created_at_retries")
.set(queue_depth.oldest_created_at_retries.timestamp() as f64);
metrics::gauge!("queue_depth", &[("status", "retries")])
.set(queue_depth.count_retries as f64);
metrics::gauge!("queue_depth_oldest_scheduled", &untried_status)
.set(queue_depth.oldest_scheduled_at_untried.timestamp() as f64);
metrics::gauge!("queue_depth", &untried_status).set(queue_depth.count_untried as f64);
metrics::gauge!("queue_depth_oldest_scheduled", &retries_status)
.set(queue_depth.oldest_scheduled_at_retries.timestamp() as f64);
metrics::gauge!("queue_depth", &retries_status).set(queue_depth.count_retries as f64);

let mut tx = self.start_serializable_txn().await?;

Expand Down Expand Up @@ -481,6 +483,8 @@ impl Cleaner for WebhookCleaner {
match self.cleanup_impl().await {
Ok(stats) => {
metrics::counter!("webhook_cleanup_success",).increment(1);
metrics::gauge!("webhook_cleanup_last_success_timestamp",)
.set(get_current_timestamp_seconds());
metrics::counter!("webhook_cleanup_jobs_unstuck")
.increment(stats.jobs_unstuck_count);

Expand Down

0 comments on commit 4492e48

Please sign in to comment.