diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicDeserializer.java index f8ae5e5889..5d98252705 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicDeserializer.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.kafka.support.serializer; +import java.nio.ByteBuffer; import java.util.Map; import java.util.regex.Pattern; @@ -26,6 +27,8 @@ * A {@link Deserializer} that delegates to other deserializers based on the topic name. * * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.8 * */ @@ -75,4 +78,9 @@ public Object deserialize(String topic, Headers headers, byte[] data) { return findDelegate(topic).deserialize(topic, headers, data); } + @Override + public Object deserialize(String topic, Headers headers, ByteBuffer data) { + return findDelegate(topic).deserialize(topic, headers, data); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerialization.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerialization.java index 8463f64975..5cc0fe63e1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerialization.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerialization.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import java.util.regex.Pattern; import org.springframework.core.log.LogAccessor; +import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -37,6 +38,8 @@ * @param the type. * * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.8 * */ @@ -110,14 +113,14 @@ protected void configure(Map configs, boolean isKey) { } this.forKeys = isKey; Object insensitive = configs.get(CASE_SENSITIVE); - if (insensitive instanceof String) { - this.cased = Boolean.parseBoolean((String) insensitive); + if (insensitive instanceof String insensitiveString) { + this.cased = Boolean.parseBoolean(insensitiveString); } - else if (insensitive instanceof Boolean) { - this.cased = (Boolean) insensitive; + else if (insensitive instanceof Boolean insensitiveBoolean) { + this.cased = insensitiveBoolean; } String configKey = defaultKey(); - if (configKey != null && configs.containsKey(configKey)) { + if (configs.containsKey(configKey)) { buildDefault(configs, configKey, isKey, configs.get(configKey)); } configKey = configKey(); @@ -128,8 +131,8 @@ else if (insensitive instanceof Boolean) { else if (value instanceof Map) { processMap(configs, isKey, configKey, (Map) value); } - else if (value instanceof String) { - this.delegates.putAll(createDelegates((String) value, configs, isKey)); + else if (value instanceof String mappings) { + this.delegates.putAll(createDelegates(mappings, configs, isKey)); } else { throw new IllegalStateException( @@ -137,6 +140,7 @@ else if (value instanceof String) { } } + @NonNull private String defaultKey() { return this.forKeys ? KEY_SERIALIZATION_TOPIC_DEFAULT : VALUE_SERIALIZATION_TOPIC_DEFAULT; } @@ -163,11 +167,11 @@ protected void build(Map configs, boolean isKey, String configKey, Ob this.delegates.put(pattern, (T) delegate); configureDelegate(configs, isKey, (T) delegate); } - else if (delegate instanceof Class) { - instantiateAndConfigure(configs, isKey, this.delegates, pattern, (Class) delegate); + else if (delegate instanceof Class clazz) { + instantiateAndConfigure(configs, isKey, this.delegates, pattern, clazz); } - else if (delegate instanceof String) { - createInstanceAndConfigure(configs, isKey, this.delegates, pattern, (String) delegate); + else if (delegate instanceof String className) { + createInstanceAndConfigure(configs, isKey, this.delegates, pattern, className); } else { throw new IllegalStateException(configKey @@ -181,11 +185,11 @@ protected void buildDefault(Map configs, String configKey, boolean is if (isInstance(delegate)) { this.defaultDelegate = configureDelegate(configs, isKey, (T) delegate); } - else if (delegate instanceof Class) { - this.defaultDelegate = instantiateAndConfigure(configs, isKey, this.delegates, null, (Class) delegate); + else if (delegate instanceof Class clazz) { + this.defaultDelegate = instantiateAndConfigure(configs, isKey, this.delegates, null, clazz); } - else if (delegate instanceof String) { - this.defaultDelegate = createInstanceAndConfigure(configs, isKey, this.delegates, null, (String) delegate); + else if (delegate instanceof String className) { + this.defaultDelegate = createInstanceAndConfigure(configs, isKey, this.delegates, null, className); } else { throw new IllegalStateException(configKey @@ -236,15 +240,15 @@ private T createInstanceAndConfigure(Map configs, boolean isKey, } private Pattern obtainPattern(Object key) { - if (key instanceof Pattern) { - return (Pattern) key; + if (key instanceof Pattern pattern) { + return pattern; } - else if (key instanceof String) { + else if (key instanceof String regex) { if (this.cased) { - return Pattern.compile(((String) key).trim()); + return Pattern.compile(regex.trim()); } else { - return Pattern.compile(((String) key).trim(), Pattern.CASE_INSENSITIVE); + return Pattern.compile(regex.trim(), Pattern.CASE_INSENSITIVE); } } else { @@ -287,7 +291,6 @@ public T removeDelegate(Pattern pattern) { * @param topic the topic. * @return the delegate. */ - @SuppressWarnings(UNCHECKED) protected T findDelegate(String topic) { T delegate = null; for (Entry entry : this.delegates.entrySet()) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java index 35dab45cac..5e2462d818 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.kafka.support.serializer; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -37,12 +38,14 @@ * {@link Serdes}. * * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.3 * */ public class DelegatingDeserializer implements Deserializer { - private final Map> delegates = new ConcurrentHashMap<>(); + private final Map> delegates = new ConcurrentHashMap<>(); private final Map autoConfigs = new HashMap<>(); @@ -81,15 +84,15 @@ public void configure(Map configs, boolean isKey) { } if (value instanceof Map) { ((Map) value).forEach((selector, deser) -> { - if (deser instanceof Deserializer) { - this.delegates.put(selector, (Deserializer) deser); - ((Deserializer) deser).configure(configs, isKey); + if (deser instanceof Deserializer clazz) { + this.delegates.put(selector, clazz); + clazz.configure(configs, isKey); } - else if (deser instanceof Class) { - instantiateAndConfigure(configs, isKey, this.delegates, selector, (Class) deser); + else if (deser instanceof Class clazz) { + instantiateAndConfigure(configs, isKey, this.delegates, selector, clazz); } - else if (deser instanceof String) { - createInstanceAndConfigure(configs, isKey, this.delegates, selector, (String) deser); + else if (deser instanceof String className) { + createInstanceAndConfigure(configs, isKey, this.delegates, selector, className); } else { throw new IllegalStateException(configKey @@ -97,8 +100,8 @@ else if (deser instanceof String) { } }); } - else if (value instanceof String) { - this.delegates.putAll(createDelegates((String) value, configs, isKey)); + else if (value instanceof String mappings) { + this.delegates.putAll(createDelegates(mappings, configs, isKey)); } else { throw new IllegalStateException(configKey + " must be a map or String, not " + value.getClass()); @@ -165,6 +168,17 @@ public Object deserialize(String topic, byte[] data) { @Override public Object deserialize(String topic, Headers headers, byte[] data) { + Deserializer deserializer = getDeserializerByHeaders(headers); + return deserializer == null ? data : deserializer.deserialize(topic, headers, data); + } + + @Override + public Object deserialize(String topic, Headers headers, ByteBuffer data) { + Deserializer deserializer = getDeserializerByHeaders(headers); + return deserializer == null ? data : deserializer.deserialize(topic, headers, data); + } + + private Deserializer getDeserializerByHeaders(Headers headers) { byte[] value = null; String selectorKey = selectorKey(); Header header = headers.lastHeader(selectorKey); @@ -175,16 +189,11 @@ public Object deserialize(String topic, Headers headers, byte[] data) { throw new IllegalStateException("No '" + selectorKey + "' header present"); } String selector = new String(value).replaceAll("\"", ""); - Deserializer deserializer = this.delegates.get(selector); + Deserializer deserializer = this.delegates.get(selector); if (deserializer == null) { deserializer = trySerdes(selector); } - if (deserializer == null) { - return data; - } - else { - return deserializer.deserialize(topic, headers, data); - } + return deserializer; } private String selectorKey() { @@ -197,11 +206,11 @@ private String selectorKey() { * Package for testing. */ @Nullable - Deserializer trySerdes(String key) { + Deserializer trySerdes(String key) { try { Class clazz = ClassUtils.forName(key, ClassUtils.getDefaultClassLoader()); - Serde serdeFrom = Serdes.serdeFrom(clazz); - Deserializer deserializer = serdeFrom.deserializer(); + Serde serdeFrom = Serdes.serdeFrom(clazz); + Deserializer deserializer = serdeFrom.deserializer(); deserializer.configure(this.autoConfigs, this.forKeys); this.delegates.put(key, deserializer); return deserializer; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java index 41bc98c76e..aaa8f8de3c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,6 +52,7 @@ * @author Artem Bilan * @author Gary Russell * @author Elliot Kennedy + * @author Wang Zhiyang */ public class JsonSerializer implements Serializer { @@ -156,20 +157,19 @@ public synchronized void configure(Map configs, boolean isKey) { setUseTypeMapperForKey(isKey); if (configs.containsKey(ADD_TYPE_INFO_HEADERS)) { Object config = configs.get(ADD_TYPE_INFO_HEADERS); - if (config instanceof Boolean) { - this.addTypeInfo = (Boolean) config; + if (config instanceof Boolean configBoolean) { + this.addTypeInfo = configBoolean; } - else if (config instanceof String) { - this.addTypeInfo = Boolean.valueOf((String) config); + else if (config instanceof String configString) { + this.addTypeInfo = Boolean.parseBoolean(configString); } else { throw new IllegalStateException(ADD_TYPE_INFO_HEADERS + " must be Boolean or String"); } } if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet - && this.typeMapper instanceof AbstractJavaTypeMapper) { - ((AbstractJavaTypeMapper) this.typeMapper) - .setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS))); + && this.typeMapper instanceof AbstractJavaTypeMapper abstractJavaTypeMapper) { + abstractJavaTypeMapper.setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS))); } this.configured = true; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ParseStringDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ParseStringDeserializer.java index 83b02483df..98d3b789b6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ParseStringDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ParseStringDeserializer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.kafka.support.serializer; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -24,6 +25,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Utils; import org.springframework.util.Assert; @@ -35,6 +37,8 @@ * * @author Alexei Klenin * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.5 */ public class ParseStringDeserializer implements Deserializer { @@ -105,6 +109,23 @@ public T deserialize(String topic, Headers headers, byte[] data) { return this.parser.apply(data == null ? null : new String(data, this.charset), headers); } + @Override + public T deserialize(String topic, Headers headers, ByteBuffer data) { + String value = deserialize(data); + return this.parser.apply(value, headers); + } + + private String deserialize(ByteBuffer data) { + if (data == null) { + return null; + } + + if (data.hasArray()) { + return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), this.charset); + } + return new String(Utils.toArray(data), this.charset); + } + /** * Set a charset to use when converting byte[] to {@link String}. Default UTF-8. * @param charset the charset. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java index 62f4257d76..5e2e7df137 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.kafka.support.serializer; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.kafka.common.header.Headers; @@ -31,6 +32,8 @@ * @param Type to be deserialized into. * * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.3 * */ @@ -54,16 +57,17 @@ public void configure(Map configs, boolean isKey) { @Override public T deserialize(String topic, byte[] data) { - return this.retryOperations.execute(context -> { - return this.delegate.deserialize(topic, data); - }); + return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data)); } @Override public T deserialize(String topic, Headers headers, byte[] data) { - return this.retryOperations.execute(context -> { - return this.delegate.deserialize(topic, headers, data); - }); + return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data)); + } + + @Override + public T deserialize(String topic, Headers headers, ByteBuffer data) { + return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data)); } @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerializationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerializationTests.java index 9a79f6cc48..f280d8e1ab 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerializationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerializationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -33,6 +34,8 @@ /** * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.8 * */ @@ -50,11 +53,8 @@ void testWithMapConfig() { configs.put(DelegatingByTopicSerializer.CASE_SENSITIVE, "false"); configs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_DEFAULT, ByteArraySerializer.class); serializer.configure(configs, false); - assertThat(serializer.findDelegate("foo")).isInstanceOf(BytesSerializer.class); - assertThat(serializer.findDelegate("bar")).isInstanceOf(IntegerSerializer.class); - assertThat(serializer.findDelegate("baz")).isInstanceOf(StringSerializer.class); + assertThatSerializer(serializer); assertThat(serializer.findDelegate("Foo")).isInstanceOf(BytesSerializer.class); - assertThat(serializer.findDelegate("qux")).isInstanceOf(ByteArraySerializer.class); DelegatingByTopicDeserializer deserializer = new DelegatingByTopicDeserializer(); Map deserializers = new HashMap<>(); deserializers.put("fo.*", new BytesDeserializer()); @@ -64,12 +64,11 @@ void testWithMapConfig() { configs.put(DelegatingByTopicDeserializer.CASE_SENSITIVE, false); configs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_DEFAULT, ByteArrayDeserializer.class); deserializer.configure(configs, false); - assertThat(deserializer.findDelegate("foo")).isInstanceOf(BytesDeserializer.class); - assertThat(deserializer.findDelegate("bar")).isInstanceOf(IntegerDeserializer.class); - assertThat(deserializer.findDelegate("baz")).isInstanceOf(StringDeserializer.class); - assertThat(deserializer.deserialize("baz", null, serializer.serialize("baz", null, "qux"))).isEqualTo("qux"); + assertThatDeserializer(deserializer); assertThat(deserializer.findDelegate("Foo")).isInstanceOf(BytesDeserializer.class); - assertThat(deserializer.findDelegate("qux")).isInstanceOf(ByteArrayDeserializer.class); + byte[] serialized = serializer.serialize("baz", null, "qux"); + assertThat(deserializer.deserialize("baz", null, serialized)).isEqualTo("qux"); + assertThat(deserializer.deserialize("baz", null, ByteBuffer.wrap(serialized))).isEqualTo("qux"); } @Test @@ -80,19 +79,13 @@ void testWithPropertyConfig() { + ", bar:" + IntegerSerializer.class.getName() + ", baz: " + StringSerializer.class.getName()); configs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_DEFAULT, ByteArraySerializer.class); serializer.configure(configs, false); - assertThat(serializer.findDelegate("foo")).isInstanceOf(BytesSerializer.class); - assertThat(serializer.findDelegate("bar")).isInstanceOf(IntegerSerializer.class); - assertThat(serializer.findDelegate("baz")).isInstanceOf(StringSerializer.class); - assertThat(serializer.findDelegate("qux")).isInstanceOf(ByteArraySerializer.class); + assertThatSerializer(serializer); DelegatingByTopicDeserializer deserializer = new DelegatingByTopicDeserializer(); configs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG, "fo*:" + BytesDeserializer.class.getName() + ", bar:" + IntegerDeserializer.class.getName() + ", baz: " + StringDeserializer.class.getName()); configs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_DEFAULT, ByteArrayDeserializer.class); deserializer.configure(configs, false); - assertThat(deserializer.findDelegate("foo")).isInstanceOf(BytesDeserializer.class); - assertThat(deserializer.findDelegate("bar")).isInstanceOf(IntegerDeserializer.class); - assertThat(deserializer.findDelegate("baz")).isInstanceOf(StringDeserializer.class); - assertThat(deserializer.findDelegate("qux")).isInstanceOf(ByteArrayDeserializer.class); + assertThatDeserializer(deserializer); } @Test @@ -106,10 +99,7 @@ void testWithMapConfigKeys() { configs.put(DelegatingByTopicSerializer.KEY_SERIALIZATION_TOPIC_CONFIG, serializers); configs.put(DelegatingByTopicSerializer.KEY_SERIALIZATION_TOPIC_DEFAULT, ByteArraySerializer.class); serializer.configure(configs, true); - assertThat(serializer.findDelegate("foo")).isInstanceOf(BytesSerializer.class); - assertThat(serializer.findDelegate("bar")).isInstanceOf(IntegerSerializer.class); - assertThat(serializer.findDelegate("baz")).isInstanceOf(StringSerializer.class); - assertThat(serializer.findDelegate("qux")).isInstanceOf(ByteArraySerializer.class); + assertThatSerializer(serializer); DelegatingByTopicDeserializer deserializer = new DelegatingByTopicDeserializer(); Map deserializers = new HashMap<>(); deserializers.put("fo.*", new BytesDeserializer()); @@ -118,11 +108,10 @@ void testWithMapConfigKeys() { configs.put(DelegatingByTopicSerializer.KEY_SERIALIZATION_TOPIC_CONFIG, deserializers); configs.put(DelegatingByTopicSerializer.KEY_SERIALIZATION_TOPIC_DEFAULT, ByteArrayDeserializer.class); deserializer.configure(configs, true); - assertThat(deserializer.findDelegate("foo")).isInstanceOf(BytesDeserializer.class); - assertThat(deserializer.findDelegate("bar")).isInstanceOf(IntegerDeserializer.class); - assertThat(deserializer.findDelegate("baz")).isInstanceOf(StringDeserializer.class); - assertThat(deserializer.deserialize("baz", null, serializer.serialize("baz", null, "qux"))).isEqualTo("qux"); - assertThat(deserializer.findDelegate("qux")).isInstanceOf(ByteArrayDeserializer.class); + assertThatDeserializer(deserializer); + byte[] serialized = serializer.serialize("baz", null, "qux"); + assertThat(deserializer.deserialize("baz", null, serialized)).isEqualTo("qux"); + assertThat(deserializer.deserialize("baz", null, ByteBuffer.wrap(serialized))).isEqualTo("qux"); } @Test @@ -133,19 +122,27 @@ void testWithPropertyConfigKeys() { + ", bar:" + IntegerSerializer.class.getName() + ", baz: " + StringSerializer.class.getName()); configs.put(DelegatingByTopicSerializer.KEY_SERIALIZATION_TOPIC_DEFAULT, ByteArraySerializer.class); serializer.configure(configs, true); - assertThat(serializer.findDelegate("foo")).isInstanceOf(BytesSerializer.class); - assertThat(serializer.findDelegate("bar")).isInstanceOf(IntegerSerializer.class); - assertThat(serializer.findDelegate("baz")).isInstanceOf(StringSerializer.class); - assertThat(serializer.findDelegate("qux")).isInstanceOf(ByteArraySerializer.class); + assertThatSerializer(serializer); DelegatingByTopicDeserializer deserializer = new DelegatingByTopicDeserializer(); configs.put(DelegatingByTopicSerializer.KEY_SERIALIZATION_TOPIC_CONFIG, "fo*:" + BytesDeserializer.class.getName() + ", bar:" + IntegerDeserializer.class.getName() + ", baz: " + StringDeserializer.class.getName()); configs.put(DelegatingByTopicSerializer.KEY_SERIALIZATION_TOPIC_DEFAULT, ByteArrayDeserializer.class); deserializer.configure(configs, true); + assertThatDeserializer(deserializer); + } + + private void assertThatSerializer(DelegatingByTopicSerializer serializer) { + assertThat(serializer.findDelegate("foo")).isInstanceOf(BytesSerializer.class); + assertThat(serializer.findDelegate("bar")).isInstanceOf(IntegerSerializer.class); + assertThat(serializer.findDelegate("baz")).isInstanceOf(StringSerializer.class); + assertThat(serializer.findDelegate("defaultTopic")).isInstanceOf(ByteArraySerializer.class); + } + + private void assertThatDeserializer(DelegatingByTopicDeserializer deserializer) { assertThat(deserializer.findDelegate("foo")).isInstanceOf(BytesDeserializer.class); assertThat(deserializer.findDelegate("bar")).isInstanceOf(IntegerDeserializer.class); assertThat(deserializer.findDelegate("baz")).isInstanceOf(StringDeserializer.class); - assertThat(deserializer.findDelegate("qux")).isInstanceOf(ByteArrayDeserializer.class); + assertThat(deserializer.findDelegate("defaultTopic")).isInstanceOf(ByteArrayDeserializer.class); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingSerializationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingSerializationTests.java index 6f47ba4805..9cd6a60ea2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingSerializationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/DelegatingSerializationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -140,13 +141,12 @@ private void doTest(DelegatingSerializer serializer, DelegatingDeserializer dese headers.remove(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR); DelegatingSerializer spySe = spy(serializer); serialized = spySe.serialize("foo", headers, 42L); - serialized = spySe.serialize("foo", headers, 42L); verify(spySe, times(1)).trySerdes(42L); assertThat(headers.lastHeader(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR).value()) .isEqualTo(Long.class.getName().getBytes()); DelegatingDeserializer spyDe = spy(deserializer); assertThat(spyDe.deserialize("foo", headers, serialized)).isEqualTo(42L); - spyDe.deserialize("foo", headers, serialized); + assertThat(spyDe.deserialize("foo", headers, ByteBuffer.wrap(serialized))).isEqualTo(42L); verify(spyDe, times(1)).trySerdes(Long.class.getName()); // The DKHM will jsonize the value; test that we ignore the quotes @@ -158,6 +158,7 @@ private void doTest(DelegatingSerializer serializer, DelegatingDeserializer dese serialized = serializer.serialize("foo", headers, "bar"); assertThat(serialized).isEqualTo(new byte[]{ 'b', 'a', 'r' }); assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar"); + assertThat(deserializer.deserialize("foo", headers, ByteBuffer.wrap(serialized))).isEqualTo("bar"); } private void doTestKeys(DelegatingSerializer serializer, DelegatingDeserializer deserializer) { @@ -179,13 +180,12 @@ private void doTestKeys(DelegatingSerializer serializer, DelegatingDeserializer headers.remove(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR); DelegatingSerializer spySe = spy(serializer); serialized = spySe.serialize("foo", headers, 42L); - serialized = spySe.serialize("foo", headers, 42L); verify(spySe, times(1)).trySerdes(42L); assertThat(headers.lastHeader(DelegatingSerializer.KEY_SERIALIZATION_SELECTOR).value()) .isEqualTo(Long.class.getName().getBytes()); DelegatingDeserializer spyDe = spy(deserializer); assertThat(spyDe.deserialize("foo", headers, serialized)).isEqualTo(42L); - spyDe.deserialize("foo", headers, serialized); + assertThat(spyDe.deserialize("foo", headers, ByteBuffer.wrap(serialized))).isEqualTo(42L); verify(spyDe, times(1)).trySerdes(Long.class.getName()); // The DKHM will jsonize the value; test that we ignore the quotes @@ -197,6 +197,7 @@ private void doTestKeys(DelegatingSerializer serializer, DelegatingDeserializer serialized = serializer.serialize("foo", headers, "bar"); assertThat(serialized).isEqualTo(new byte[]{ 'b', 'a', 'r' }); assertThat(deserializer.deserialize("foo", headers, serialized)).isEqualTo("bar"); + assertThat(deserializer.deserialize("foo", headers, ByteBuffer.wrap(serialized))).isEqualTo("bar"); } @Test @@ -206,7 +207,7 @@ void testBadIncomingOnlyOnce() { headers.add(new RecordHeader(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR, "junk".getBytes())); byte[] data = "foo".getBytes(); assertThat(spy.deserialize("foo", headers, data)).isSameAs(data); - spy.deserialize("foo", headers, data); + assertThat(spy.deserialize("foo", headers, ByteBuffer.wrap(data))).isEqualTo(data); verify(spy, times(1)).trySerdes("junk"); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java index 17a106b267..837471711f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,17 +18,22 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Map; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.Test; import org.springframework.retry.support.RetryTemplate; /** * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.3 * */ @@ -43,6 +48,9 @@ void testRetry() { delegate.n = 0; assertThat(rdes.deserialize("foo", new RecordHeaders(), "bar".getBytes())).isEqualTo("bar"); assertThat(delegate.n).isEqualTo(3); + delegate.n = 0; + ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("byteBuffer"); + assertThat(rdes.deserialize("foo", new RecordHeaders(), byteBuffer)).isEqualTo("byteBuffer"); rdes.close(); } @@ -70,6 +78,14 @@ public String deserialize(String topic, Headers headers, byte[] data) { return new String(data); } + @Override + public String deserialize(String topic, Headers headers, ByteBuffer data) { + if (n++ < 1) { + throw new RuntimeException(); + } + return new String(Utils.toArray(data)); + } + @Override public void close() { // empty diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/ToStringSerializationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/ToStringSerializationTests.java index c81610b9aa..f19a9150fb 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/ToStringSerializationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/ToStringSerializationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; @@ -35,6 +36,7 @@ * * @author Alexei Klenin * @author Gary Russell + * * @since 2.5 */ public class ToStringSerializationTests { @@ -167,6 +169,18 @@ public void testSimpleDeserialization() { .hasFieldOrPropertyWithValue("first", "toto") .hasFieldOrPropertyWithValue("second", 123) .hasFieldOrPropertyWithValue("third", true); + + /* Given */ + ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("foo:456:true"); + + /* When */ + Object entityFromByteBuffer = deserializer.deserialize("my-topic", null, byteBuffer); + + /* Then */ + assertThat(entityFromByteBuffer) + .hasFieldOrPropertyWithValue("first", "foo") + .hasFieldOrPropertyWithValue("second", 456) + .hasFieldOrPropertyWithValue("third", true); } @Test @@ -188,6 +202,18 @@ public void testSimpleDeserializationViaConfig() { .hasFieldOrPropertyWithValue("first", "toto") .hasFieldOrPropertyWithValue("second", 123) .hasFieldOrPropertyWithValue("third", true); + + /* Given */ + ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("foo:456:true"); + + /* When */ + Object entityFromByteBuffer = deserializer.deserialize("my-topic", null, byteBuffer); + + /* Then */ + assertThat(entityFromByteBuffer) + .hasFieldOrPropertyWithValue("first", "foo") + .hasFieldOrPropertyWithValue("second", 456) + .hasFieldOrPropertyWithValue("third", true); } @Test @@ -211,13 +237,28 @@ public void testSerialization_usingHeaders() { .isInstanceOf(DummyEntity.class) .hasFieldOrPropertyWithValue("stringValue", "toto") .hasFieldOrPropertyWithValue("intValue", 123); + + /* Given */ + ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("foo:456:true"); + + /* When */ + Object entityFromByteBuffer = deserializer.deserialize("my-topic", headers, byteBuffer); + + /* Then */ + assertThat(entityFromByteBuffer) + .isNotNull() + .isInstanceOf(DummyEntity.class) + .hasFieldOrPropertyWithValue("stringValue", "foo") + .hasFieldOrPropertyWithValue("intValue", 456); } @Test + @DisplayName("Test deserialization using headers when data is null") void nullValue() { ParseStringDeserializer deserializer = new ParseStringDeserializer<>(ToStringSerializationTests::parseWithHeaders); assertThat(deserializer.deserialize("foo", new RecordHeaders(), (byte[]) null)).isNull(); + assertThat(deserializer.deserialize("foo", new RecordHeaders(), (ByteBuffer) null)).isNull(); } @Test