diff --git a/hook-janitor/src/fixtures/webhook_cleanup.sql b/hook-janitor/src/fixtures/webhook_cleanup.sql index bddaf26..5dfa827 100644 --- a/hook-janitor/src/fixtures/webhook_cleanup.sql +++ b/hook-janitor/src/fixtures/webhook_cleanup.sql @@ -2,6 +2,7 @@ INSERT INTO job_queue ( errors, metadata, + attempted_at, last_attempt_finished_at, parameters, queue, @@ -14,6 +15,7 @@ VALUES NULL, '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{}', 'webhooks', 'completed', @@ -24,6 +26,7 @@ VALUES NULL, '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{}', 'webhooks', 'completed', @@ -34,6 +37,7 @@ VALUES NULL, '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 21:01:18.799371+00', + '2023-12-19 21:01:18.799371+00', '{}', 'webhooks', 'completed', @@ -44,6 +48,7 @@ VALUES NULL, '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 3}', '2023-12-19 20:01:18.80335+00', + '2023-12-19 20:01:18.80335+00', '{}', 'webhooks', 'completed', @@ -54,6 +59,7 @@ VALUES NULL, '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{}', 'not-webhooks', 'completed', @@ -64,6 +70,7 @@ VALUES NULL, '{"team_id": 2, "plugin_id": 99, "plugin_config_id": 4}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{}', 'webhooks', 'completed', @@ -74,6 +81,7 @@ VALUES ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{}', 'webhooks', 'failed', @@ -84,6 +92,7 @@ VALUES ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{}', 'webhooks', 'failed', @@ -94,6 +103,7 @@ VALUES ARRAY ['{"type":"ConnectionError","details":{"error":{"name":"Connection Error"}}}'::jsonb], '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{}', 'webhooks', 'failed', @@ -104,6 +114,7 @@ VALUES ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 21:01:18.799371+00', + '2023-12-19 21:01:18.799371+00', '{}', 'webhooks', 'failed', @@ -114,6 +125,7 @@ VALUES ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 3}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{}', 'webhooks', 'failed', @@ -124,6 +136,7 @@ VALUES ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{}', 'not-webhooks', 'failed', @@ -134,6 +147,7 @@ VALUES ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], '{"team_id": 2, "plugin_id": 99, "plugin_config_id": 4}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{}', 'webhooks', 'failed', @@ -144,6 +158,7 @@ VALUES NULL, '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', '{"body": "hello world", "headers": {}, "method": "POST", "url": "https://myhost/endpoint"}', 'webhooks', 'available', @@ -154,6 +169,7 @@ VALUES NULL, '{"team_id": 1, "plugin_id": 99, "plugin_config_id": 2}', '2023-12-19 20:01:18.799371+00', + now() - '1 hour' :: interval, '{}', 'webhooks', 'running', diff --git a/hook-janitor/src/webhooks.rs b/hook-janitor/src/webhooks.rs index 9c33c5e..ee8ff43 100644 --- a/hook-janitor/src/webhooks.rs +++ b/hook-janitor/src/webhooks.rs @@ -23,8 +23,12 @@ use hook_common::metrics::get_current_timestamp_seconds; pub enum WebhookCleanerError { #[error("failed to create postgres pool: {error}")] PoolCreationError { error: sqlx::Error }, + #[error("failed to acquire conn: {error}")] + AcquireConnError { error: sqlx::Error }, #[error("failed to acquire conn and start txn: {error}")] StartTxnError { error: sqlx::Error }, + #[error("failed to reschedule stuck jobs: {error}")] + RescheduleStuckJobsError { error: sqlx::Error }, #[error("failed to get queue depth: {error}")] GetQueueDepthError { error: sqlx::Error }, #[error("failed to get row count: {error}")] @@ -140,6 +144,7 @@ impl From for AppMetric { struct SerializableTxn<'a>(Transaction<'a, Postgres>); struct CleanupStats { + jobs_unstuck_count: u64, rows_processed: u64, completed_row_count: u64, completed_agg_row_count: u64, @@ -178,12 +183,51 @@ impl WebhookCleaner { }) } + async fn reschedule_stuck_jobs(&self) -> Result { + let mut conn = self + .pg_pool + .acquire() + .await + .map_err(|e| WebhookCleanerError::AcquireConnError { error: e })?; + + // The "non-transactional" worker runs the risk of crashing and leaving jobs permanently in + // the `running` state. This query will reschedule any jobs that have been in the running + // state for more than 2 minutes (which is *much* longer than we expect any Webhook job to + // take). + // + // We don't need to increment the `attempt` counter here because the worker already did that + // when it moved the job into `running`. + // + // If the previous worker was somehow stalled for 2 minutes and completes the task, that + // will mean we sent duplicate Webhooks. Success stats should not be affected, since both + // will update the same job row, which will only be processed once by the janitor. + + let base_query = r#" + UPDATE + job_queue + SET + status = 'available'::job_status, + last_attempt_finished_at = NOW(), + scheduled_at = NOW() + WHERE + status = 'running'::job_status + AND attempted_at < NOW() - INTERVAL '2 minutes' + "#; + + let result = sqlx::query(base_query) + .execute(&mut *conn) + .await + .map_err(|e| WebhookCleanerError::RescheduleStuckJobsError { error: e })?; + + Ok(result.rows_affected()) + } + async fn get_queue_depth(&self) -> Result { let mut conn = self .pg_pool .acquire() .await - .map_err(|e| WebhookCleanerError::StartTxnError { error: e })?; + .map_err(|e| WebhookCleanerError::AcquireConnError { error: e })?; let base_query = r#" SELECT @@ -377,6 +421,8 @@ impl WebhookCleaner { let untried_status = [("status", "untried")]; let retries_status = [("status", "retries")]; + let jobs_unstuck_count = self.reschedule_stuck_jobs().await?; + let queue_depth = self.get_queue_depth().await?; metrics::gauge!("queue_depth_oldest_scheduled", &untried_status) .set(queue_depth.oldest_scheduled_at_untried.timestamp() as f64); @@ -430,6 +476,7 @@ impl WebhookCleaner { } Ok(CleanupStats { + jobs_unstuck_count, rows_processed: rows_deleted, completed_row_count, completed_agg_row_count, @@ -450,6 +497,8 @@ impl Cleaner for WebhookCleaner { metrics::counter!("webhook_cleanup_success",).increment(1); metrics::gauge!("webhook_cleanup_last_success_timestamp",) .set(get_current_timestamp_seconds()); + metrics::counter!("webhook_cleanup_jobs_unstuck") + .increment(stats.jobs_unstuck_count); if stats.rows_processed > 0 { let elapsed_time = start_time.elapsed().as_secs_f64(); @@ -572,6 +621,9 @@ mod tests { .await .expect("webbook cleanup_impl failed"); + // The one 'running' job is transitioned to 'available'. + assert_eq!(cleanup_stats.jobs_unstuck_count, 1); + // Rows that are not 'completed' or 'failed' should not be processed. assert_eq!(cleanup_stats.rows_processed, 13); @@ -766,6 +818,7 @@ mod tests { .expect("webbook cleanup_impl failed"); // Reported metrics are all zeroes + assert_eq!(cleanup_stats.jobs_unstuck_count, 0); assert_eq!(cleanup_stats.rows_processed, 0); assert_eq!(cleanup_stats.completed_row_count, 0); assert_eq!(cleanup_stats.completed_agg_row_count, 0);