Skip to content

Commit

Permalink
Add Tempest crash fetching task with error handling and retries
Browse files Browse the repository at this point in the history
  • Loading branch information
sentry-autofix[bot] authored Feb 8, 2025
1 parent 391f870 commit 2223305
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 4 deletions.
84 changes: 81 additions & 3 deletions src/sentry/tempest/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
import json

import requests
import sentry_sdk
from django.db import transaction
from django.conf import settings
from requests import Response

Expand All @@ -11,9 +13,64 @@
from sentry.tasks.base import instrumented_task
from sentry.tasks.relay import schedule_invalidate_project_config
from sentry.tempest.models import MessageType, TempestCredentials
from sentry.utils.retries import TimedRetryPolicy

logger = logging.getLogger(__name__)

class TempestError(Exception):
"""Base exception for Tempest-related errors."""
pass

class TempestAPIError(TempestError):
"""Exception raised for Tempest API errors."""
def __init__(self, message: str, status_code: int, response_body: str):
self.status_code = status_code
self.response_body = response_body
super().__init__(message)

def validate_tempest_response(response: Response, required_fields: list[str]) -> dict:
"""Validates a Tempest API response and returns the parsed JSON data."""
if response.status_code >= 500:
raise TempestAPIError(
f"Tempest service error: {response.status_code}",
response.status_code,
response.text,
)

if response.status_code >= 400:
raise TempestAPIError(
f"Tempest request error: {response.status_code}",
response.status_code,
response.text,
)

try:
result = response.json()
except ValueError as e:
raise TempestAPIError(
"Invalid JSON response from Tempest",
response.status_code,
response.text,
) from e

if "error" in result:
error_type = result["error"].get("type", "unknown")
error_message = result["error"].get("message", "Unknown error")
raise TempestAPIError(
f"Tempest error: {error_type} - {error_message}",
response.status_code,
response.text,
)

missing_fields = [field for field in required_fields if field not in result]
if missing_fields:
raise TempestAPIError(
f"Missing required fields in response: {', '.join(missing_fields)}",
response.status_code,
response.text,
)

return result

@instrumented_task(
name="sentry.tempest.tasks.poll_tempest",
Expand Down Expand Up @@ -106,6 +163,11 @@ def fetch_latest_item_id(credentials_id: int, **kwargs) -> None:
name="sentry.tempest.tasks.poll_tempest_crashes",
queue="tempest",
silo_mode=SiloMode.REGION,
max_retries=3,
autoretry_for=(TempestAPIError,),
retry_backoff=True,
retry_backoff_max=3600, # 1 hour max delay
retry_jitter=True,
soft_time_limit=55,
time_limit=60,
)
Expand Down Expand Up @@ -147,9 +209,22 @@ def poll_tempest_crashes(credentials_id: int, **kwargs) -> None:
"when latest_fetched_item_id is set."
)

result = response.json()
credentials.latest_fetched_item_id = result["latest_id"]
credentials.save(update_fields=["latest_fetched_item_id"])
try:
result = validate_tempest_response(response, required_fields=["latest_id"])

# Use select_for_update to prevent race conditions
with transaction.atomic():
credentials = TempestCredentials.objects.select_for_update().get(id=credentials_id)
credentials.latest_fetched_item_id = result["latest_id"]
credentials.message = "" # Clear any previous error messages
credentials.message_type = MessageType.INFO
credentials.save(update_fields=["latest_fetched_item_id", "message", "message_type"])

except TempestAPIError as e:
credentials.message = f"Error fetching crashes: {str(e)}"
credentials.message_type = MessageType.ERROR
credentials.save(update_fields=["message", "message_type"])
raise # Re-raise to trigger retry for 5xx errors
except Exception as e:
logger.exception(
"Fetching the crashes failed.",
Expand All @@ -161,6 +236,9 @@ def poll_tempest_crashes(credentials_id: int, **kwargs) -> None:
"error": str(e),
},
)
credentials.message = "An unexpected error occurred while fetching crashes"
credentials.message_type = MessageType.ERROR
credentials.save(update_fields=["message", "message_type"])


def fetch_latest_id_from_tempest(
Expand Down
24 changes: 24 additions & 0 deletions src/sentry/tempest/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import json
from requests import Response

def create_mock_response(status_code: int, body: dict = None, is_json: bool = True) -> Response:
"""
Creates a mock Response object for testing.
Args:
status_code: HTTP status code to return
body: Response body (dict for JSON responses)
is_json: Whether the response should be treated as JSON
"""
response = Response()
response.status_code = status_code

if body is None:
body = {}

if is_json:
response._content = json.dumps(body).encode()
else:
response._content = str(body).encode()

return response
99 changes: 98 additions & 1 deletion tests/sentry/tempest/test_tempest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from sentry.models.projectkey import ProjectKey, UseCase
from sentry.tempest.models import MessageType
from sentry.tempest.tasks import fetch_latest_item_id, poll_tempest, poll_tempest_crashes
from sentry.tempest.tasks import (fetch_latest_item_id, poll_tempest,
poll_tempest_crashes, TempestError, TempestAPIError)
from sentry.tempest.test_utils import create_mock_response
from sentry.testutils.cases import TestCase


Expand Down Expand Up @@ -210,3 +212,98 @@ def test_poll_tempest_crashes_invalidates_config(self, mock_fetch, mock_invalida
# Second call -> should reuse existing ProjectKey and thus not invalidate config
poll_tempest_crashes(self.credentials.id)
mock_invalidate.assert_not_called()

@patch("sentry.tempest.tasks.fetch_items_from_tempest")
def test_poll_tempest_crashes_http_500_retry(self, mock_fetch):
"""Test that 500 errors trigger a retry with backoff"""
# Setup initial state
self.credentials.latest_fetched_item_id = "42"
self.credentials.save()

# Simulate a 500 error response
mock_fetch.return_value = create_mock_response(
500,
{"error": "Internal Server Error"}
)

# Should raise TempestAPIError to trigger retry
with self.assertRaises(TempestAPIError) as cm:
poll_tempest_crashes(self.credentials.id)

self.assertEqual(cm.exception.status_code, 500)

# Verify credentials were updated with error message
self.credentials.refresh_from_db()
assert self.credentials.message.startswith("Error fetching crashes")
assert self.credentials.message_type == MessageType.ERROR
assert self.credentials.latest_fetched_item_id == "42" # Should not change

@patch("sentry.tempest.tasks.fetch_items_from_tempest")
def test_poll_tempest_crashes_invalid_json(self, mock_fetch):
"""Test handling of invalid JSON responses"""
self.credentials.latest_fetched_item_id = "42"
self.credentials.save()

# Return invalid JSON response
mock_fetch.return_value = create_mock_response(
200,
"Invalid JSON",
is_json=False
)

with self.assertRaises(TempestAPIError) as cm:
poll_tempest_crashes(self.credentials.id)

assert "Invalid JSON response" in str(cm.exception)

# Verify state preserved
self.credentials.refresh_from_db()
assert self.credentials.latest_fetched_item_id == "42"
assert "Error fetching crashes" in self.credentials.message
assert self.credentials.message_type == MessageType.ERROR

@patch("sentry.tempest.tasks.fetch_items_from_tempest")
def test_poll_tempest_crashes_missing_fields(self, mock_fetch):
"""Test handling of responses missing required fields"""
self.credentials.latest_fetched_item_id = "42"
self.credentials.save()

# Return response missing latest_id
mock_fetch.return_value = create_mock_response(
200,
{"data": "some data but no latest_id"}
)

with self.assertRaises(TempestAPIError) as cm:
poll_tempest_crashes(self.credentials.id)

assert "Missing required fields" in str(cm.exception)
assert "latest_id" in str(cm.exception)

# Verify state preserved
self.credentials.refresh_from_db()
assert self.credentials.latest_fetched_item_id == "42"
assert "Error fetching crashes" in self.credentials.message
assert self.credentials.message_type == MessageType.ERROR

@patch("sentry.tempest.tasks.fetch_items_from_tempest")
def test_poll_tempest_crashes_race_condition(self, mock_fetch):
"""Test handling of concurrent updates to latest_fetched_item_id"""
self.credentials.latest_fetched_item_id = "42"
self.credentials.save()

def side_effect(*args, **kwargs):
# Simulate another process updating the ID while we're processing
self.credentials.latest_fetched_item_id = "43"
self.credentials.save()
return create_mock_response(200, {"latest_id": "44"})

mock_fetch.side_effect = side_effect

poll_tempest_crashes(self.credentials.id)

# Verify the update was atomic and preserved the latest value
self.credentials.refresh_from_db()
assert self.credentials.latest_fetched_item_id == "44"
assert self.credentials.message == ""
assert self.credentials.message_type == MessageType.INFO

0 comments on commit 2223305

Please sign in to comment.