diff --git a/django_kafka/consumer/consumer.py b/django_kafka/consumer/consumer.py index 3908fd7..d4a31a7 100644 --- a/django_kafka/consumer/consumer.py +++ b/django_kafka/consumer/consumer.py @@ -102,7 +102,7 @@ def blocking_retry( if retry_settings.can_retry(attempt, exc): until = retry_settings.get_retry_time(attempt) self.pause_partition(msg, until) - self.log_error(exc) + self.log_error(msg) return True return False @@ -153,22 +153,34 @@ def handle_exception(self, msg: "cimpl.Message", exc: Exception) -> bool: retried, blocking = self.retry_msg(msg, exc) if not retried: self.dead_letter_msg(msg, exc) - self.log_error(exc) + self.log_error(msg) return True return not blocking def get_topic(self, msg: "cimpl.Message") -> "TopicConsumer": return self.topics.get(topic_name=msg.topic()) - def log_error(self, error): - logger.error(error, exc_info=True) + def log_error(self, msg: Optional["cimpl.Message"] = None): + error_parts = [ + f"`{self.__class__.__module__}.{self.__class__.__name__}` failed" + ] + if msg: + topic = self.get_topic(msg) + error_parts.append = [ + f" on `{topic.__class__.__module__}.{topic.__class__.__name__}`" + ] + + if msg_error := msg.error(): + error_parts.append(f"Message error: '{msg_error}'") + + logger.exception("\n".join(error_parts)) def consume(self, msg): self.get_topic(msg).consume(msg) def process_message(self, msg: "cimpl.Message"): - if msg_error := msg.error(): - self.log_error(msg_error) + if msg.error(): + self.log_error(msg) return try: @@ -196,8 +208,8 @@ def run(self): self.resume_partitions() if (msg := self.poll()) is not None: self.process_message(msg) - except Exception as exc: - self.log_error(exc) + except Exception: + self.log_error() raise finally: self.stop() diff --git a/django_kafka/tests/test_consumer/test_consumer.py b/django_kafka/tests/test_consumer/test_consumer.py index c206b56..2a470ff 100644 --- a/django_kafka/tests/test_consumer/test_consumer.py +++ b/django_kafka/tests/test_consumer/test_consumer.py @@ -169,7 +169,7 @@ class SomeConsumer(Consumer): # checks msg had error before processing msg.error.assert_called_once_with() # error handler was triggered - log_error.assert_called_once_with(msg.error.return_value) + log_error.assert_called_once_with(msg) # Topic.consume is not called consumer.topics[msg.topic()].consume.assert_not_called() # Consumer.commit_offset is not called @@ -269,7 +269,7 @@ class SomeConsumer(Consumer): consumer.retry_msg.assert_called_once_with(msg, exc) consumer.dead_letter_msg.assert_called_once_with(msg, exc) - consumer.log_error.assert_called_once_with(exc) + consumer.log_error.assert_called_once_with(msg) def test_blocking_retry(self): retry_time = timezone.now() @@ -289,7 +289,7 @@ class SomeConsumer(Consumer): retried = consumer.blocking_retry(retry_settings, msg_mock, exc) consumer.pause_partition.assert_called_once_with(msg_mock, retry_time) - consumer.log_error.assert_called_once_with(exc) + consumer.log_error.assert_called_once_with(msg_mock) self.assertEqual(retried, True) @patch("django_kafka.consumer.Consumer.log_error")