Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
Add very basic version of job unstuck-ing for non-txn jobs that hang … (
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored Jan 30, 2024
1 parent e2b5dcb commit 6729401
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
16 changes: 16 additions & 0 deletions hook-janitor/src/fixtures/webhook_cleanup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ INSERT INTO
job_queue (
errors,
metadata,
attempted_at,
last_attempt_finished_at,
parameters,
queue,
Expand All @@ -14,6 +15,7 @@ VALUES
NULL,
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{}',
'webhooks',
'completed',
Expand All @@ -24,6 +26,7 @@ VALUES
NULL,
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{}',
'webhooks',
'completed',
Expand All @@ -34,6 +37,7 @@ VALUES
NULL,
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 21:01:18.799371+00',
'2023-12-19 21:01:18.799371+00',
'{}',
'webhooks',
'completed',
Expand All @@ -44,6 +48,7 @@ VALUES
NULL,
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 3}',
'2023-12-19 20:01:18.80335+00',
'2023-12-19 20:01:18.80335+00',
'{}',
'webhooks',
'completed',
Expand All @@ -54,6 +59,7 @@ VALUES
NULL,
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{}',
'not-webhooks',
'completed',
Expand All @@ -64,6 +70,7 @@ VALUES
NULL,
'{"team_id": 2, "plugin_id": 99, "plugin_config_id": 4}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{}',
'webhooks',
'completed',
Expand All @@ -74,6 +81,7 @@ VALUES
ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb],
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{}',
'webhooks',
'failed',
Expand All @@ -84,6 +92,7 @@ VALUES
ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb],
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{}',
'webhooks',
'failed',
Expand All @@ -94,6 +103,7 @@ VALUES
ARRAY ['{"type":"ConnectionError","details":{"error":{"name":"Connection Error"}}}'::jsonb],
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{}',
'webhooks',
'failed',
Expand All @@ -104,6 +114,7 @@ VALUES
ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb],
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 21:01:18.799371+00',
'2023-12-19 21:01:18.799371+00',
'{}',
'webhooks',
'failed',
Expand All @@ -114,6 +125,7 @@ VALUES
ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb],
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 3}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{}',
'webhooks',
'failed',
Expand All @@ -124,6 +136,7 @@ VALUES
ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb],
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{}',
'not-webhooks',
'failed',
Expand All @@ -134,6 +147,7 @@ VALUES
ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb],
'{"team_id": 2, "plugin_id": 99, "plugin_config_id": 4}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{}',
'webhooks',
'failed',
Expand All @@ -144,6 +158,7 @@ VALUES
NULL,
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 20:01:18.799371+00',
'2023-12-19 20:01:18.799371+00',
'{"body": "hello world", "headers": {}, "method": "POST", "url": "https://myhost/endpoint"}',
'webhooks',
'available',
Expand All @@ -154,6 +169,7 @@ VALUES
NULL,
'{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}',
'2023-12-19 20:01:18.799371+00',
now() - '1 hour' :: interval,
'{}',
'webhooks',
'running',
Expand Down
55 changes: 54 additions & 1 deletion hook-janitor/src/webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ use hook_common::metrics::get_current_timestamp_seconds;
pub enum WebhookCleanerError {
#[error("failed to create postgres pool: {error}")]
PoolCreationError { error: sqlx::Error },
#[error("failed to acquire conn: {error}")]
AcquireConnError { error: sqlx::Error },
#[error("failed to acquire conn and start txn: {error}")]
StartTxnError { error: sqlx::Error },
#[error("failed to reschedule stuck jobs: {error}")]
RescheduleStuckJobsError { error: sqlx::Error },
#[error("failed to get queue depth: {error}")]
GetQueueDepthError { error: sqlx::Error },
#[error("failed to get row count: {error}")]
Expand Down Expand Up @@ -140,6 +144,7 @@ impl From<FailedRow> for AppMetric {
struct SerializableTxn<'a>(Transaction<'a, Postgres>);

struct CleanupStats {
jobs_unstuck_count: u64,
rows_processed: u64,
completed_row_count: u64,
completed_agg_row_count: u64,
Expand Down Expand Up @@ -178,12 +183,51 @@ impl WebhookCleaner {
})
}

async fn reschedule_stuck_jobs(&self) -> Result<u64> {
let mut conn = self
.pg_pool
.acquire()
.await
.map_err(|e| WebhookCleanerError::AcquireConnError { error: e })?;

// The "non-transactional" worker runs the risk of crashing and leaving jobs permanently in
// the `running` state. This query will reschedule any jobs that have been in the running
// state for more than 2 minutes (which is *much* longer than we expect any Webhook job to
// take).
//
// We don't need to increment the `attempt` counter here because the worker already did that
// when it moved the job into `running`.
//
// If the previous worker was somehow stalled for 2 minutes and completes the task, that
// will mean we sent duplicate Webhooks. Success stats should not be affected, since both
// will update the same job row, which will only be processed once by the janitor.

let base_query = r#"
UPDATE
job_queue
SET
status = 'available'::job_status,
last_attempt_finished_at = NOW(),
scheduled_at = NOW()
WHERE
status = 'running'::job_status
AND attempted_at < NOW() - INTERVAL '2 minutes'
"#;

let result = sqlx::query(base_query)
.execute(&mut *conn)
.await
.map_err(|e| WebhookCleanerError::RescheduleStuckJobsError { error: e })?;

Ok(result.rows_affected())
}

async fn get_queue_depth(&self) -> Result<QueueDepth> {
let mut conn = self
.pg_pool
.acquire()
.await
.map_err(|e| WebhookCleanerError::StartTxnError { error: e })?;
.map_err(|e| WebhookCleanerError::AcquireConnError { error: e })?;

let base_query = r#"
SELECT
Expand Down Expand Up @@ -377,6 +421,8 @@ impl WebhookCleaner {
let untried_status = [("status", "untried")];
let retries_status = [("status", "retries")];

let jobs_unstuck_count = self.reschedule_stuck_jobs().await?;

let queue_depth = self.get_queue_depth().await?;
metrics::gauge!("queue_depth_oldest_scheduled", &untried_status)
.set(queue_depth.oldest_scheduled_at_untried.timestamp() as f64);
Expand Down Expand Up @@ -430,6 +476,7 @@ impl WebhookCleaner {
}

Ok(CleanupStats {
jobs_unstuck_count,
rows_processed: rows_deleted,
completed_row_count,
completed_agg_row_count,
Expand All @@ -450,6 +497,8 @@ impl Cleaner for WebhookCleaner {
metrics::counter!("webhook_cleanup_success",).increment(1);
metrics::gauge!("webhook_cleanup_last_success_timestamp",)
.set(get_current_timestamp_seconds());
metrics::counter!("webhook_cleanup_jobs_unstuck")
.increment(stats.jobs_unstuck_count);

if stats.rows_processed > 0 {
let elapsed_time = start_time.elapsed().as_secs_f64();
Expand Down Expand Up @@ -572,6 +621,9 @@ mod tests {
.await
.expect("webbook cleanup_impl failed");

// The one 'running' job is transitioned to 'available'.
assert_eq!(cleanup_stats.jobs_unstuck_count, 1);

// Rows that are not 'completed' or 'failed' should not be processed.
assert_eq!(cleanup_stats.rows_processed, 13);

Expand Down Expand Up @@ -766,6 +818,7 @@ mod tests {
.expect("webbook cleanup_impl failed");

// Reported metrics are all zeroes
assert_eq!(cleanup_stats.jobs_unstuck_count, 0);
assert_eq!(cleanup_stats.rows_processed, 0);
assert_eq!(cleanup_stats.completed_row_count, 0);
assert_eq!(cleanup_stats.completed_agg_row_count, 0);
Expand Down

0 comments on commit 6729401

Please sign in to comment.