Skip to content

Commit

Permalink
GH-2861: Add Validator to ErrorHandlingDeserializer
Browse files Browse the repository at this point in the history
Resolves #2861
  • Loading branch information
garyrussell authored Oct 26, 2023
1 parent ad5e754 commit 8b15468
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ public KafkaListenerErrorHandler validationErrorHandler() {
Starting with version 2.5.11, validation now works on payloads for `@KafkaHandler` methods in a class-level listener.
See xref:kafka/receiving-messages/class-level-kafkalistener.adoc[`@KafkaListener` on a Class].

Starting with version 3.1, you can perform validation in an `ErrorHandlingDeserializer` instead.
See xref:../serdes.adoc#error-handling-deserializer[Using `ErrorHandlingDeserializer`] for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,11 @@ The following example uses a `failedDeserializationFunction`.

[source, java]
----
public class BadFoo extends Foo {
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
Expand All @@ -569,11 +569,11 @@ public class BadFoo extends Foo {
}
public class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Foo apply(FailedDeserializationInfo info) {
return new BadFoo(info);
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
Expand All @@ -586,7 +586,7 @@ The preceding example uses the following configuration:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
----

Expand Down Expand Up @@ -662,6 +662,12 @@ void listen(List<ConsumerRecord<String, Thing>> in) {
IMPORTANT: If you are also using a `DeadLetterPublishingRecoverer`, the record published for a `DeserializationException` will have a `record.value()` of type `byte[]`; this should not be serialized.
Consider using a `DelegatingByTypeSerializer` configured to use a `ByteArraySerializer` for `byte[]` and the normal serializer (Json, Avro, etc) for all other types.

Starting with version 3.1, you can add a `Validator` to the `ErrorHandlingDeserializer`.
If the delegate `Deserializer` successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring.
This allows the original raw data to be passed to the error handler.
WHen creating the deserializer yourself, simply call `setValidator`; if you configure the serializer using properties, set the consumer configuration property `ErrorHandlingDeserializer.VALIDATOR_CLASS` to the class or fully qualified class name for your `Validator`.
When using Spring Boot, this property name is `spring.kafka.consumer.properties.spring.deserializer.validator.class`.

[[payload-conversion-with-batch]]
== Payload Conversion with Batch Listeners

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ When not used with an `ErrorHandlingDeserializer`, the `KafkaConsumer` will cont
Post processing can be applied on a listener container by specifying the bean name of a `ContainerPostProcessor` on the `@KafkaListener` annotation.
This occurs after the container has been created and after any configured `ContainerCustomizer` configured on the container factory.
See xref:kafka/container-factory.adoc[Container Factory] for more information.

[[x31-ehd]]
=== ErrorHandlingDeserializer

You can now add a `Validator` to this deserializer; if the delegate `Deserializer` successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring.
This allows the original raw data to be passed to the error handler.
See xref:kafka/serdes.adoc#error-handling-deserializer[Using `ErrorHandlingDeserializer`] for more information.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -189,9 +189,9 @@ public void afterPropertiesSet() {
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint
if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint mmkle
&& this.validator != null) {
((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);
mmkle.setValidator(this.validator);
}
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@

import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.validation.Validator;

/**
* Delegating key/value deserializer that catches exceptions, returning them
Expand Down Expand Up @@ -60,12 +61,19 @@ public class ErrorHandlingDeserializer<T> implements Deserializer<T> {
*/
public static final String VALUE_DESERIALIZER_CLASS = "spring.deserializer.value.delegate.class";

/**
* Property name for the validator.
*/
public static final String VALIDATOR_CLASS = "spring.deserializer.validator.class";

private Deserializer<T> delegate;

private boolean isForKey;

private Function<FailedDeserializationInfo, T> failedDeserializationFunction;

private Validator validator;

public ErrorHandlingDeserializer() {
}

Expand Down Expand Up @@ -108,6 +116,18 @@ public ErrorHandlingDeserializer<T> keyDeserializer(boolean isKey) {
return this;
}

/**
* Set a validator to validate the object after successful deserialization. If the
* validator throws an exception, or returns an
* {@link org.springframework.validation.Errors} with validation failures, the raw
* data will be available in any configured error handler.
* @param validator the validator to set
* @since 3.1
*/
public void setValidator(Validator validator) {
this.validator = validator;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (this.delegate == null) {
Expand All @@ -119,6 +139,7 @@ public void configure(Map<String, ?> configs, boolean isKey) {
if (this.failedDeserializationFunction == null) {
setupFunction(configs, isKey ? KEY_FUNCTION : VALUE_FUNCTION);
}
setupValidator(configs);
}

public void setupDelegate(Map<String, ?> configs, String configKey) {
Expand All @@ -145,7 +166,7 @@ private void setupFunction(Map<String, ?> configs, String configKey) {
if (configs.containsKey(configKey)) {
try {
Object value = configs.get(configKey);
Class<?> clazz = value instanceof Class ? (Class<?>) value : ClassUtils.forName((String) value, null);
Class<?> clazz = value instanceof Class cls ? cls : ClassUtils.forName((String) value, null);
Assert.isTrue(Function.class.isAssignableFrom(clazz), "'function' must be a 'Function ', not a "
+ clazz.getName());
this.failedDeserializationFunction = (Function<FailedDeserializationInfo, T>)
Expand All @@ -157,10 +178,25 @@ private void setupFunction(Map<String, ?> configs, String configKey) {
}
}

private void setupValidator(Map<String, ?> configs) {
if (configs.containsKey(VALIDATOR_CLASS) && this.validator == null) {
try {
Object value = configs.get(VALIDATOR_CLASS);
Class<?> clazz = value instanceof Class cls ? cls : ClassUtils.forName((String) value, null);
Object instance = clazz.getDeclaredConstructor().newInstance();
Assert.isInstanceOf(Validator.class, instance, "'validator' must be a 'Validator', not a ");
this.validator = (Validator) instance;
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
}

@Override
public T deserialize(String topic, byte[] data) {
try {
return this.delegate.deserialize(topic, data);
return validate(this.delegate.deserialize(topic, data));
}
catch (Exception e) {
return recoverFromSupplier(topic, null, data, e);
Expand All @@ -176,14 +212,22 @@ public T deserialize(String topic, Headers headers, byte[] data) {
else {
headers.remove(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
}
return this.delegate.deserialize(topic, headers, data);
return validate(this.delegate.deserialize(topic, headers, data));
}
catch (Exception e) {
SerializationUtils.deserializationException(headers, data, e, this.isForKey);
return recoverFromSupplier(topic, headers, data, e);
}
}

private T validate(T deserialized) {
if (this.validator == null || !this.validator.supports(deserialized.getClass())) {
return deserialized;
}
this.validator.validateObject(deserialized).failOnError(IllegalStateException::new);
return deserialized;
}

private T recoverFromSupplier(String topic, Headers headers, byte[] data, Exception exception) {
if (this.failedDeserializationFunction != null) {
FailedDeserializationInfo failedDeserializationInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -56,6 +57,8 @@
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;

/**
* @author Gary Russell
Expand All @@ -72,7 +75,7 @@ public class ErrorHandlingDeserializerTests {
public Config config;

@Test
public void testBadDeserializer() throws Exception {
void testBadDeserializer() throws Exception {
this.config.template().send(TOPIC, "foo", "bar");
this.config.template().send(TOPIC, "fail", "bar");
this.config.template().send(TOPIC, "foo", "fail");
Expand All @@ -84,7 +87,7 @@ public void testBadDeserializer() throws Exception {
}

@Test
public void unitTests() throws Exception {
void unitTests() throws Exception {
ErrorHandlingDeserializer<String> ehd = new ErrorHandlingDeserializer<>(new StringDeserializer());
assertThat(ehd.deserialize("topic", "foo".getBytes())).isEqualTo("foo");
ehd.close();
Expand Down Expand Up @@ -137,6 +140,41 @@ public String deserialize(String topic, Headers headers, byte[] data) {
.contains("original exception message");
}

@Test
void validate() {
ErrorHandlingDeserializer<String> ehd = new ErrorHandlingDeserializer<>(new StringDeserializer());
ehd.configure(Map.of(ErrorHandlingDeserializer.VALIDATOR_CLASS, Val.class.getName()), false);

Headers headers = new RecordHeaders();
assertThat(ehd.deserialize("foo", headers, "foo".getBytes())).isEqualTo("foo");
ehd.deserialize("foo", headers, "bar".getBytes());
DeserializationException ex = SerializationUtils.byteArrayToDeserializationException(null,
headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
assertThat(ex.getCause()).isInstanceOf(IllegalStateException.class)
.extracting("message", InstanceOfAssertFactories.STRING)
.contains("validation failure");

ehd.setValidator(new Validator() {

@Override
public void validate(Object target, Errors errors) {
throw new IllegalArgumentException("test validation");
}

@Override
public boolean supports(Class<?> clazz) {
return clazz.equals(String.class);
}

});
ehd.deserialize("foo", headers, "baz".getBytes());
ex = SerializationUtils.byteArrayToDeserializationException(null,
headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
assertThat(ex.getCause()).isInstanceOf(IllegalArgumentException.class)
.extracting("message")
.isEqualTo("test validation");
}

@Configuration
@EnableKafka
public static class Config {
Expand Down Expand Up @@ -287,4 +325,20 @@ public static class Foo {

}

public static class Val implements Validator {

@Override
public void validate(Object target, Errors errors) {
if ("bar".equals(target)) {
errors.reject("validation failure");
}
}

@Override
public boolean supports(Class<?> clazz) {
return clazz.equals(String.class);
}

}

}

0 comments on commit 8b15468

Please sign in to comment.