diff --git a/CHANGELOG.md b/CHANGELOG.md index 4274759..877563d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.5.8 (2024-11-19) +* `ModelTopicConsumer.get_defaults` will skip fields not defined on the model. + ## 0.5.7 (2024-11-18) * `@substitute_error` now shows error message of the original error. @@ -19,7 +22,6 @@ * `ModelTopicConsumer.sync` returns now the results of the `update_or_create` method. * Add `days_from_epoch_to_date` function to convert `io.debezium.time.Date` to python `datetime.date`. - ## 0.4.1 (2024-09-17) * Support string-based delete keys in DbzModelTopicConsumer diff --git a/django_kafka/tests/models.py b/django_kafka/tests/models.py index 0f7c928..8e341ee 100644 --- a/django_kafka/tests/models.py +++ b/django_kafka/tests/models.py @@ -1,5 +1,6 @@ from django.db import connection from django.db.models import Model +from django.db.models.base import ModelBase from django.test import TestCase @@ -9,11 +10,14 @@ class AbstractModelTestCase(TestCase): @classmethod def setUpClass(cls): - class TestModel(cls.abstract_model): - class Meta: - app_label = "django_kafka" - - cls.model = TestModel + class Meta: + app_label = cls.__module__ + + cls.model = ModelBase( + f'__Test{cls.abstract_model.__name__}__', + (cls.abstract_model,), + {'__module__': cls.abstract_model.__module__, "Meta": Meta}, + ) with connection.schema_editor() as editor: editor.create_model(cls.model) diff --git a/django_kafka/tests/topic/test_model.py b/django_kafka/tests/topic/test_model.py index a3db550..53840e2 100644 --- a/django_kafka/tests/topic/test_model.py +++ b/django_kafka/tests/topic/test_model.py @@ -1,18 +1,21 @@ from unittest import mock -from django.db.models import Model -from django.test import TestCase +from django.db import models from django_kafka.connect.models import KafkaConnectSkipModel from django_kafka.exceptions import DjangoKafkaError +from django_kafka.tests.models import AbstractModelTestCase from django_kafka.topic.model import ModelTopicConsumer -class TestModelTopicConsumer(TestCase): +class ModelTopicConsumerTestCase(AbstractModelTestCase): + abstract_model = models.Model + model: type[KafkaConnectSkipModel] + def _get_model_topic_consumer(self): class SomeModelTopicConsumer(ModelTopicConsumer): name = "name" - model = Model + model = self.model def get_lookup_kwargs(self, model, key, value) -> dict: return {} @@ -25,7 +28,13 @@ def is_deletion(self, *args, **kwargs): def test_get_defaults(self): topic_consumer = self._get_model_topic_consumer() - defaults = topic_consumer.get_defaults(model=Model, value={"name": 1}) + class SomeModel(models.Model): + name = models.CharField() + + class Meta: + abstract = True + + defaults = topic_consumer.get_defaults(model=SomeModel, value={"name": 1}) self.assertEqual(defaults, {"name": 1}) @@ -33,7 +42,7 @@ def test_get_defaults__adds_kafka_skip(self): topic_consumer = self._get_model_topic_consumer() class KafkaConnectSkip(KafkaConnectSkipModel): - pass + name = models.CharField() defaults = topic_consumer.get_defaults( model=KafkaConnectSkip, value={"name": 1} @@ -45,10 +54,16 @@ def test_get_defaults__calls_transform_attr(self): topic_consumer = self._get_model_topic_consumer() topic_consumer.transform_name = mock.Mock(return_value=("name_new", 2)) - defaults = topic_consumer.get_defaults(model=Model, value={"name": 1}) + class SomeModel(models.Model): + name = models.CharField() + + class Meta: + abstract = True + + defaults = topic_consumer.get_defaults(model=SomeModel, value={"name": 1}) topic_consumer.transform_name.assert_called_once_with( - topic_consumer.model, + SomeModel, "name", 1, ) @@ -129,3 +144,17 @@ def test_consume(self): msg_key, msg_value, ) + + def test_model_has_field(self): + class BookModel(models.Model): + name = models.CharField(max_length=100) + author = models.CharField(max_length=100) + + topic_consumer = self._get_model_topic_consumer() + topic_consumer.model = BookModel + + fields = ("id", "name", "author") + for field in fields: + self.assertTrue(topic_consumer.model_has_field(BookModel, field)) + + self.assertFalse(topic_consumer.model_has_field(BookModel, "not_defined_field")) diff --git a/django_kafka/topic/model.py b/django_kafka/topic/model.py index 4f86cf9..5d9b7b1 100644 --- a/django_kafka/topic/model.py +++ b/django_kafka/topic/model.py @@ -23,12 +23,14 @@ def get_defaults(self, model, value) -> dict: value fields can be transformed by defining a transform_{attr} method. """ defaults = {} - for attr, attr_value in value.items(): - if transform_method := getattr(self, "transform_" + attr, None): - new_attr, new_value = transform_method(model, attr, attr_value) + for field_name, field_value in value.items(): + if not self.model_has_field(model, field_name): + continue + if transform_method := getattr(self, "transform_" + field_name, None): + new_attr, new_value = transform_method(model, field_name, field_value) defaults[new_attr] = new_value else: - defaults[attr] = attr_value + defaults[field_name] = field_value if issubclass(model, KafkaConnectSkipModel): defaults["kafka_skip"] = True @@ -61,6 +63,9 @@ def get_model(self, key, value) -> Type[Model]: "Cannot obtain model: either define a default model or override get_model", ) + def model_has_field(self, model: Type[Model], field_name: str) -> bool: + return field_name in (field.column for field in model._meta.fields) + def consume(self, msg): key = self.deserialize(msg.key(), MessageField.KEY, msg.headers()) value = self.deserialize(msg.value(), MessageField.VALUE, msg.headers())