Skip to content

Commit

Permalink
Implementation `Deserializer.deserialize(String topic, Headers header…
Browse files Browse the repository at this point in the history
…s, ByteBuffer data)`

adapt apache kafka feature, reduce `CompletedFetch#parseRecord()` memory copy, details see https://issues.apache.org/jira/browse/KAFKA-14944
  • Loading branch information
Wzy19930507 committed Jan 31, 2024
1 parent 69f201d commit 23bac22
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 100 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.support.serializer;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.regex.Pattern;

Expand All @@ -26,6 +27,8 @@
* A {@link Deserializer} that delegates to other deserializers based on the topic name.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.8
*
*/
Expand Down Expand Up @@ -75,4 +78,9 @@ public Object deserialize(String topic, Headers headers, byte[] data) {
return findDelegate(topic).deserialize(topic, headers, data);
}

@Override
public Object deserialize(String topic, Headers headers, ByteBuffer data) {
return findDelegate(topic).deserialize(topic, headers, data);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import java.util.regex.Pattern;

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
Expand All @@ -37,6 +38,8 @@
* @param <T> the type.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.8
*
*/
Expand Down Expand Up @@ -110,14 +113,14 @@ protected void configure(Map<String, ?> configs, boolean isKey) {
}
this.forKeys = isKey;
Object insensitive = configs.get(CASE_SENSITIVE);
if (insensitive instanceof String) {
this.cased = Boolean.parseBoolean((String) insensitive);
if (insensitive instanceof String insensitiveString) {
this.cased = Boolean.parseBoolean(insensitiveString);
}
else if (insensitive instanceof Boolean) {
this.cased = (Boolean) insensitive;
else if (insensitive instanceof Boolean insensitiveBoolean) {
this.cased = insensitiveBoolean;
}
String configKey = defaultKey();
if (configKey != null && configs.containsKey(configKey)) {
if (configs.containsKey(configKey)) {
buildDefault(configs, configKey, isKey, configs.get(configKey));
}
configKey = configKey();
Expand All @@ -128,15 +131,16 @@ else if (insensitive instanceof Boolean) {
else if (value instanceof Map) {
processMap(configs, isKey, configKey, (Map<Object, Object>) value);
}
else if (value instanceof String) {
this.delegates.putAll(createDelegates((String) value, configs, isKey));
else if (value instanceof String mappings) {
this.delegates.putAll(createDelegates(mappings, configs, isKey));
}
else {
throw new IllegalStateException(
configKey + " must be a map or String, not " + value.getClass());
}
}

@NonNull
private String defaultKey() {
return this.forKeys ? KEY_SERIALIZATION_TOPIC_DEFAULT : VALUE_SERIALIZATION_TOPIC_DEFAULT;
}
Expand All @@ -163,11 +167,11 @@ protected void build(Map<String, ?> configs, boolean isKey, String configKey, Ob
this.delegates.put(pattern, (T) delegate);
configureDelegate(configs, isKey, (T) delegate);
}
else if (delegate instanceof Class) {
instantiateAndConfigure(configs, isKey, this.delegates, pattern, (Class<?>) delegate);
else if (delegate instanceof Class<?> clazz) {
instantiateAndConfigure(configs, isKey, this.delegates, pattern, clazz);
}
else if (delegate instanceof String) {
createInstanceAndConfigure(configs, isKey, this.delegates, pattern, (String) delegate);
else if (delegate instanceof String className) {
createInstanceAndConfigure(configs, isKey, this.delegates, pattern, className);
}
else {
throw new IllegalStateException(configKey
Expand All @@ -181,11 +185,11 @@ protected void buildDefault(Map<String, ?> configs, String configKey, boolean is
if (isInstance(delegate)) {
this.defaultDelegate = configureDelegate(configs, isKey, (T) delegate);
}
else if (delegate instanceof Class) {
this.defaultDelegate = instantiateAndConfigure(configs, isKey, this.delegates, null, (Class<?>) delegate);
else if (delegate instanceof Class<?> clazz) {
this.defaultDelegate = instantiateAndConfigure(configs, isKey, this.delegates, null, clazz);
}
else if (delegate instanceof String) {
this.defaultDelegate = createInstanceAndConfigure(configs, isKey, this.delegates, null, (String) delegate);
else if (delegate instanceof String className) {
this.defaultDelegate = createInstanceAndConfigure(configs, isKey, this.delegates, null, className);
}
else {
throw new IllegalStateException(configKey
Expand Down Expand Up @@ -236,15 +240,15 @@ private T createInstanceAndConfigure(Map<String, ?> configs, boolean isKey,
}

private Pattern obtainPattern(Object key) {
if (key instanceof Pattern) {
return (Pattern) key;
if (key instanceof Pattern pattern) {
return pattern;
}
else if (key instanceof String) {
else if (key instanceof String regex) {
if (this.cased) {
return Pattern.compile(((String) key).trim());
return Pattern.compile(regex.trim());
}
else {
return Pattern.compile(((String) key).trim(), Pattern.CASE_INSENSITIVE);
return Pattern.compile(regex.trim(), Pattern.CASE_INSENSITIVE);
}
}
else {
Expand Down Expand Up @@ -287,7 +291,6 @@ public T removeDelegate(Pattern pattern) {
* @param topic the topic.
* @return the delegate.
*/
@SuppressWarnings(UNCHECKED)
protected T findDelegate(String topic) {
T delegate = null;
for (Entry<Pattern, T> entry : this.delegates.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.support.serializer;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -37,12 +38,14 @@
* {@link Serdes}.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.3
*
*/
public class DelegatingDeserializer implements Deserializer<Object> {

private final Map<String, Deserializer<? extends Object>> delegates = new ConcurrentHashMap<>();
private final Map<String, Deserializer<?>> delegates = new ConcurrentHashMap<>();

private final Map<String, Object> autoConfigs = new HashMap<>();

Expand Down Expand Up @@ -81,24 +84,24 @@ public void configure(Map<String, ?> configs, boolean isKey) {
}
if (value instanceof Map) {
((Map<String, Object>) value).forEach((selector, deser) -> {
if (deser instanceof Deserializer) {
this.delegates.put(selector, (Deserializer<?>) deser);
((Deserializer<?>) deser).configure(configs, isKey);
if (deser instanceof Deserializer<?> clazz) {
this.delegates.put(selector, clazz);
clazz.configure(configs, isKey);
}
else if (deser instanceof Class) {
instantiateAndConfigure(configs, isKey, this.delegates, selector, (Class<?>) deser);
else if (deser instanceof Class<?> clazz) {
instantiateAndConfigure(configs, isKey, this.delegates, selector, clazz);
}
else if (deser instanceof String) {
createInstanceAndConfigure(configs, isKey, this.delegates, selector, (String) deser);
else if (deser instanceof String className) {
createInstanceAndConfigure(configs, isKey, this.delegates, selector, className);
}
else {
throw new IllegalStateException(configKey
+ " map entries must be Serializers or class names, not " + value.getClass());
}
});
}
else if (value instanceof String) {
this.delegates.putAll(createDelegates((String) value, configs, isKey));
else if (value instanceof String mappings) {
this.delegates.putAll(createDelegates(mappings, configs, isKey));
}
else {
throw new IllegalStateException(configKey + " must be a map or String, not " + value.getClass());
Expand Down Expand Up @@ -165,6 +168,17 @@ public Object deserialize(String topic, byte[] data) {

@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
Deserializer<?> deserializer = getDeserializerByHeaders(headers);
return deserializer == null ? data : deserializer.deserialize(topic, headers, data);
}

@Override
public Object deserialize(String topic, Headers headers, ByteBuffer data) {
Deserializer<?> deserializer = getDeserializerByHeaders(headers);
return deserializer == null ? data : deserializer.deserialize(topic, headers, data);
}

private Deserializer<?> getDeserializerByHeaders(Headers headers) {
byte[] value = null;
String selectorKey = selectorKey();
Header header = headers.lastHeader(selectorKey);
Expand All @@ -175,16 +189,11 @@ public Object deserialize(String topic, Headers headers, byte[] data) {
throw new IllegalStateException("No '" + selectorKey + "' header present");
}
String selector = new String(value).replaceAll("\"", "");
Deserializer<? extends Object> deserializer = this.delegates.get(selector);
Deserializer<?> deserializer = this.delegates.get(selector);
if (deserializer == null) {
deserializer = trySerdes(selector);
}
if (deserializer == null) {
return data;
}
else {
return deserializer.deserialize(topic, headers, data);
}
return deserializer;
}

private String selectorKey() {
Expand All @@ -197,11 +206,11 @@ private String selectorKey() {
* Package for testing.
*/
@Nullable
Deserializer<? extends Object> trySerdes(String key) {
Deserializer<?> trySerdes(String key) {
try {
Class<?> clazz = ClassUtils.forName(key, ClassUtils.getDefaultClassLoader());
Serde<? extends Object> serdeFrom = Serdes.serdeFrom(clazz);
Deserializer<? extends Object> deserializer = serdeFrom.deserializer();
Serde<?> serdeFrom = Serdes.serdeFrom(clazz);
Deserializer<?> deserializer = serdeFrom.deserializer();
deserializer.configure(this.autoConfigs, this.forKeys);
this.delegates.put(key, deserializer);
return deserializer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,6 +52,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Elliot Kennedy
* @author Wang Zhiyang
*/
public class JsonSerializer<T> implements Serializer<T> {

Expand Down Expand Up @@ -156,20 +157,19 @@ public synchronized void configure(Map<String, ?> configs, boolean isKey) {
setUseTypeMapperForKey(isKey);
if (configs.containsKey(ADD_TYPE_INFO_HEADERS)) {
Object config = configs.get(ADD_TYPE_INFO_HEADERS);
if (config instanceof Boolean) {
this.addTypeInfo = (Boolean) config;
if (config instanceof Boolean configBoolean) {
this.addTypeInfo = configBoolean;
}
else if (config instanceof String) {
this.addTypeInfo = Boolean.valueOf((String) config);
else if (config instanceof String configString) {
this.addTypeInfo = Boolean.parseBoolean(configString);
}
else {
throw new IllegalStateException(ADD_TYPE_INFO_HEADERS + " must be Boolean or String");
}
}
if (configs.containsKey(TYPE_MAPPINGS) && !this.typeMapperExplicitlySet
&& this.typeMapper instanceof AbstractJavaTypeMapper) {
((AbstractJavaTypeMapper) this.typeMapper)
.setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS)));
&& this.typeMapper instanceof AbstractJavaTypeMapper abstractJavaTypeMapper) {
abstractJavaTypeMapper.setIdClassMapping(createMappings((String) configs.get(TYPE_MAPPINGS)));
}
this.configured = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.support.serializer;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
Expand All @@ -24,6 +25,7 @@

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;

import org.springframework.util.Assert;

Expand All @@ -35,6 +37,8 @@
*
* @author Alexei Klenin
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.5
*/
public class ParseStringDeserializer<T> implements Deserializer<T> {
Expand Down Expand Up @@ -105,6 +109,23 @@ public T deserialize(String topic, Headers headers, byte[] data) {
return this.parser.apply(data == null ? null : new String(data, this.charset), headers);
}

@Override
public T deserialize(String topic, Headers headers, ByteBuffer data) {
String value = deserialize(data);
return this.parser.apply(value, headers);
}

private String deserialize(ByteBuffer data) {
if (data == null) {
return null;
}

if (data.hasArray()) {
return new String(data.array(), data.position() + data.arrayOffset(), data.remaining(), this.charset);
}
return new String(Utils.toArray(data), this.charset);
}

/**
* Set a charset to use when converting byte[] to {@link String}. Default UTF-8.
* @param charset the charset.
Expand Down
Loading

0 comments on commit 23bac22

Please sign in to comment.