Skip to content

Commit

Permalink
feat: add consumer and topic names to the log.
Browse files Browse the repository at this point in the history
  • Loading branch information
bodja committed Jan 27, 2025
1 parent 182cca0 commit b0b13d1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
24 changes: 16 additions & 8 deletions django_kafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def blocking_retry(
if retry_settings.can_retry(attempt, exc):
until = retry_settings.get_retry_time(attempt)
self.pause_partition(msg, until)
self.log_error(exc)
self.log_error(msg, exc_info=True)
return True
return False

Expand Down Expand Up @@ -153,22 +153,30 @@ def handle_exception(self, msg: "cimpl.Message", exc: Exception) -> bool:
retried, blocking = self.retry_msg(msg, exc)
if not retried:
self.dead_letter_msg(msg, exc)
self.log_error(exc)
self.log_error(msg, exc_info=True)
return True
return not blocking

def get_topic(self, msg: "cimpl.Message") -> "TopicConsumer":
return self.topics.get(topic_name=msg.topic())

def log_error(self, error):
logger.error(error, exc_info=True)
def log_error(self, msg: Optional["cimpl.Message"] = None, exc_info=False):
error = f"'{self.__class__.__module__}.{self.__class__.__name__} failed'"
if msg:
topic = self.get_topic(msg)
error = f"{error} on '{topic.__class__.__module__}.{topic.__class__.__name__}'"

if msg_error := msg.error():
error = f"{error}\nMessage error: '{msg_error}'"

logger.error(error, exc_info=exc_info)

def consume(self, msg):
self.get_topic(msg).consume(msg)

def process_message(self, msg: "cimpl.Message"):
if msg_error := msg.error():
self.log_error(msg_error)
if msg.error():
self.log_error(msg, exc_info=False)
return

try:
Expand Down Expand Up @@ -196,8 +204,8 @@ def run(self):
self.resume_partitions()
if (msg := self.poll()) is not None:
self.process_message(msg)
except Exception as exc:
self.log_error(exc)
except Exception:
self.log_error(exc_info=True)
raise
finally:
self.stop()
Expand Down
6 changes: 3 additions & 3 deletions django_kafka/tests/test_consumer/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class SomeConsumer(Consumer):
# checks msg had error before processing
msg.error.assert_called_once_with()
# error handler was triggered
log_error.assert_called_once_with(msg.error.return_value)
log_error.assert_called_once_with(msg, exc_info=False)
# Topic.consume is not called
consumer.topics[msg.topic()].consume.assert_not_called()
# Consumer.commit_offset is not called
Expand Down Expand Up @@ -269,7 +269,7 @@ class SomeConsumer(Consumer):

consumer.retry_msg.assert_called_once_with(msg, exc)
consumer.dead_letter_msg.assert_called_once_with(msg, exc)
consumer.log_error.assert_called_once_with(exc)
consumer.log_error.assert_called_once_with(msg, exc_info=True)

def test_blocking_retry(self):
retry_time = timezone.now()
Expand All @@ -289,7 +289,7 @@ class SomeConsumer(Consumer):
retried = consumer.blocking_retry(retry_settings, msg_mock, exc)

consumer.pause_partition.assert_called_once_with(msg_mock, retry_time)
consumer.log_error.assert_called_once_with(exc)
consumer.log_error.assert_called_once_with(msg_mock, exc_info=True)
self.assertEqual(retried, True)

@patch("django_kafka.consumer.Consumer.log_error")
Expand Down

0 comments on commit b0b13d1

Please sign in to comment.