Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation Deserializer.deserialize(String, Headers, ByteBuffer) #3009

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.support.serializer;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.regex.Pattern;

Expand All @@ -26,6 +27,8 @@
* A {@link Deserializer} that delegates to other deserializers based on the topic name.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.8
*
*/
Expand Down Expand Up @@ -75,4 +78,9 @@ public Object deserialize(String topic, Headers headers, byte[] data) {
return findDelegate(topic).deserialize(topic, headers, data);
}

@Override
artembilan marked this conversation as resolved.
Show resolved Hide resolved
public Object deserialize(String topic, Headers headers, ByteBuffer data) {
return findDelegate(topic).deserialize(topic, headers, data);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import java.util.regex.Pattern;

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
Expand All @@ -37,6 +38,8 @@
* @param <T> the type.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.8
*
*/
Expand Down Expand Up @@ -110,14 +113,14 @@ protected void configure(Map<String, ?> configs, boolean isKey) {
}
this.forKeys = isKey;
Object insensitive = configs.get(CASE_SENSITIVE);
if (insensitive instanceof String) {
this.cased = Boolean.parseBoolean((String) insensitive);
if (insensitive instanceof String insensitiveString) {
this.cased = Boolean.parseBoolean(insensitiveString);
}
else if (insensitive instanceof Boolean) {
this.cased = (Boolean) insensitive;
else if (insensitive instanceof Boolean insensitiveBoolean) {
this.cased = insensitiveBoolean;
}
String configKey = defaultKey();
if (configKey != null && configs.containsKey(configKey)) {
if (configs.containsKey(configKey)) {
buildDefault(configs, configKey, isKey, configs.get(configKey));
}
configKey = configKey();
Expand All @@ -128,15 +131,16 @@ else if (insensitive instanceof Boolean) {
else if (value instanceof Map) {
processMap(configs, isKey, configKey, (Map<Object, Object>) value);
}
else if (value instanceof String) {
this.delegates.putAll(createDelegates((String) value, configs, isKey));
else if (value instanceof String mappings) {
this.delegates.putAll(createDelegates(mappings, configs, isKey));
}
else {
throw new IllegalStateException(
configKey + " must be a map or String, not " + value.getClass());
}
}

@NonNull
private String defaultKey() {
return this.forKeys ? KEY_SERIALIZATION_TOPIC_DEFAULT : VALUE_SERIALIZATION_TOPIC_DEFAULT;
}
Expand All @@ -163,11 +167,11 @@ protected void build(Map<String, ?> configs, boolean isKey, String configKey, Ob
this.delegates.put(pattern, (T) delegate);
configureDelegate(configs, isKey, (T) delegate);
}
else if (delegate instanceof Class) {
instantiateAndConfigure(configs, isKey, this.delegates, pattern, (Class<?>) delegate);
else if (delegate instanceof Class<?> clazz) {
instantiateAndConfigure(configs, isKey, this.delegates, pattern, clazz);
}
else if (delegate instanceof String) {
createInstanceAndConfigure(configs, isKey, this.delegates, pattern, (String) delegate);
else if (delegate instanceof String className) {
createInstanceAndConfigure(configs, isKey, this.delegates, pattern, className);
}
else {
throw new IllegalStateException(configKey
Expand All @@ -181,11 +185,11 @@ protected void buildDefault(Map<String, ?> configs, String configKey, boolean is
if (isInstance(delegate)) {
this.defaultDelegate = configureDelegate(configs, isKey, (T) delegate);
}
else if (delegate instanceof Class) {
this.defaultDelegate = instantiateAndConfigure(configs, isKey, this.delegates, null, (Class<?>) delegate);
else if (delegate instanceof Class<?> clazz) {
this.defaultDelegate = instantiateAndConfigure(configs, isKey, this.delegates, null, clazz);
}
else if (delegate instanceof String) {
this.defaultDelegate = createInstanceAndConfigure(configs, isKey, this.delegates, null, (String) delegate);
else if (delegate instanceof String className) {
this.defaultDelegate = createInstanceAndConfigure(configs, isKey, this.delegates, null, className);
}
else {
throw new IllegalStateException(configKey
Expand Down Expand Up @@ -236,15 +240,15 @@ private T createInstanceAndConfigure(Map<String, ?> configs, boolean isKey,
}

private Pattern obtainPattern(Object key) {
if (key instanceof Pattern) {
return (Pattern) key;
if (key instanceof Pattern pattern) {
return pattern;
}
else if (key instanceof String) {
else if (key instanceof String regex) {
if (this.cased) {
return Pattern.compile(((String) key).trim());
return Pattern.compile(regex.trim());
}
else {
return Pattern.compile(((String) key).trim(), Pattern.CASE_INSENSITIVE);
return Pattern.compile(regex.trim(), Pattern.CASE_INSENSITIVE);
}
}
else {
Expand Down Expand Up @@ -287,7 +291,6 @@ public T removeDelegate(Pattern pattern) {
* @param topic the topic.
* @return the delegate.
*/
@SuppressWarnings(UNCHECKED)
protected T findDelegate(String topic) {
T delegate = null;
for (Entry<Pattern, T> entry : this.delegates.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.support.serializer;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -37,12 +38,14 @@
* {@link Serdes}.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.3
*
*/
public class DelegatingDeserializer implements Deserializer<Object> {

private final Map<String, Deserializer<? extends Object>> delegates = new ConcurrentHashMap<>();
private final Map<String, Deserializer<?>> delegates = new ConcurrentHashMap<>();

private final Map<String, Object> autoConfigs = new HashMap<>();

Expand Down Expand Up @@ -81,24 +84,24 @@ public void configure(Map<String, ?> configs, boolean isKey) {
}
if (value instanceof Map) {
((Map<String, Object>) value).forEach((selector, deser) -> {
if (deser instanceof Deserializer) {
this.delegates.put(selector, (Deserializer<?>) deser);
((Deserializer<?>) deser).configure(configs, isKey);
if (deser instanceof Deserializer<?> clazz) {
this.delegates.put(selector, clazz);
clazz.configure(configs, isKey);
}
else if (deser instanceof Class) {
instantiateAndConfigure(configs, isKey, this.delegates, selector, (Class<?>) deser);
else if (deser instanceof Class<?> clazz) {
instantiateAndConfigure(configs, isKey, this.delegates, selector, clazz);
}
else if (deser instanceof String) {
createInstanceAndConfigure(configs, isKey, this.delegates, selector, (String) deser);
else if (deser instanceof String className) {
createInstanceAndConfigure(configs, isKey, this.delegates, selector, className);
}
else {
throw new IllegalStateException(configKey
+ " map entries must be Serializers or class names, not " + value.getClass());
}
});
}
else if (value instanceof String) {
this.delegates.putAll(createDelegates((String) value, configs, isKey));
else if (value instanceof String mappings) {
this.delegates.putAll(createDelegates(mappings, configs, isKey));
}
else {
throw new IllegalStateException(configKey + " must be a map or String, not " + value.getClass());
Expand Down Expand Up @@ -165,6 +168,17 @@ public Object deserialize(String topic, byte[] data) {

@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
Deserializer<?> deserializer = getDeserializerByHeaders(headers);
return deserializer == null ? data : deserializer.deserialize(topic, headers, data);
}

@Override
public Object deserialize(String topic, Headers headers, ByteBuffer data) {
Deserializer<?> deserializer = getDeserializerByHeaders(headers);
return deserializer == null ? data : deserializer.deserialize(topic, headers, data);
}

private Deserializer<?> getDeserializerByHeaders(Headers headers) {
byte[] value = null;
String selectorKey = selectorKey();
Header header = headers.lastHeader(selectorKey);
Expand All @@ -175,16 +189,11 @@ public Object deserialize(String topic, Headers headers, byte[] data) {
throw new IllegalStateException("No '" + selectorKey + "' header present");
}
String selector = new String(value).replaceAll("\"", "");
Deserializer<? extends Object> deserializer = this.delegates.get(selector);
Deserializer<?> deserializer = this.delegates.get(selector);
if (deserializer == null) {
deserializer = trySerdes(selector);
}
if (deserializer == null) {
return data;
}
else {
return deserializer.deserialize(topic, headers, data);
}
return deserializer;
}

private String selectorKey() {
Expand All @@ -197,11 +206,11 @@ private String selectorKey() {
* Package for testing.
*/
@Nullable
Deserializer<? extends Object> trySerdes(String key) {
Deserializer<?> trySerdes(String key) {
try {
Class<?> clazz = ClassUtils.forName(key, ClassUtils.getDefaultClassLoader());
Serde<? extends Object> serdeFrom = Serdes.serdeFrom(clazz);
Deserializer<? extends Object> deserializer = serdeFrom.deserializer();
Serde<?> serdeFrom = Serdes.serdeFrom(clazz);
Deserializer<?> deserializer = serdeFrom.deserializer();
deserializer.configure(this.autoConfigs, this.forKeys);
this.delegates.put(key, deserializer);
return deserializer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,6 +52,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Elliot Kennedy
* @author Wang Zhiyang
*/
public class JsonSerializer<T> implements Serializer<T> {

Expand Down Expand Up @@ -156,20 +157,19 @@ public synchronized void configure(Map<String, ?> configs, boolean isKey) {
setUseTypeMapperForKey(isKey);
if (configs.containsKey(ADD_TYPE_INFO_HEADERS)) {
Object config = configs.get(ADD_TYPE_INFO_HEADERS);
if (config instanceof Boolean) {
this.addTypeInfo = (Boolean) config;
if (config instanceof Boolean configBoolean) {
this.addTypeInfo = configBoolean;
}
else if (config instanceof String) {
this.addTypeInfo = Boolean.valueOf((String) config);
else if (config instanceof String configString) {
this.addTypeInfo = Boolean.parseBoolean(configString);
}
else {
throw new IllegalStateException(ADD_TYPE_INFO_HEADERS + " must be Boolean or String");
}
}
if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet
&& this.typeMapper instanceof AbstractJavaTypeMapper) {
((AbstractJavaTypeMapper) this.typeMapper)
.setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS)));
&& this.typeMapper instanceof AbstractJavaTypeMapper abstractJavaTypeMapper) {
abstractJavaTypeMapper.setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS)));
}
this.configured = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.support.serializer;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
Expand All @@ -24,6 +25,7 @@

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;

import org.springframework.util.Assert;

Expand All @@ -35,6 +37,8 @@
*
* @author Alexei Klenin
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.5
*/
public class ParseStringDeserializer<T> implements Deserializer<T> {
Expand Down Expand Up @@ -105,6 +109,23 @@ public T deserialize(String topic, Headers headers, byte[] data) {
return this.parser.apply(data == null ? null : new String(data, this.charset), headers);
}

@Override
public T deserialize(String topic, Headers headers, ByteBuffer data) {
String value = deserialize(data);
return this.parser.apply(value, headers);
}

private String deserialize(ByteBuffer data) {
if (data == null) {
return null;
}

if (data.hasArray()) {
return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), this.charset);
}
return new String(Utils.toArray(data), this.charset);
}

/**
* Set a charset to use when converting byte[] to {@link String}. Default UTF-8.
* @param charset the charset.
Expand Down
Loading
Loading