From a57c319f78438ace4b0be3e89ed3398638e47abd Mon Sep 17 00:00:00 2001 From: Nathan Xu Date: Fri, 15 Dec 2023 12:20:00 -0500 Subject: [PATCH] GH-2940: Improvements in DefaultKafkaHeaderMapper Fixes: #2940 Minor improvements and code cleanup in DefaultKafkaHeaderMapper. --- .../support/DefaultKafkaHeaderMapper.java | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index dc137b8526..7cb5b297ee 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,8 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -31,13 +31,13 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; -import org.springframework.lang.Nullable; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; import org.springframework.util.MimeType; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -63,26 +63,23 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper { private static final String JAVA_LANG_STRING = "java.lang.String"; - private static final Set TRUSTED_ARRAY_TYPES = - new HashSet<>(Arrays.asList( + private static final Set TRUSTED_ARRAY_TYPES = Set.of( "[B", "[I", "[J", "[F", "[D", "[C" - )); + ); - private static final List DEFAULT_TRUSTED_PACKAGES = - Arrays.asList( + private static final List DEFAULT_TRUSTED_PACKAGES = List.of( "java.lang", "java.net", "java.util", "org.springframework.util" ); - private static final List DEFAULT_TO_STRING_CLASSES = - Arrays.asList( + private static final List DEFAULT_TO_STRING_CLASSES = List.of( "org.springframework.util.MimeType", "org.springframework.http.MediaType" ); @@ -142,7 +139,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ public DefaultKafkaHeaderMapper(String... patterns) { - this(new ObjectMapper(), patterns); + this(JacksonUtils.enhancedObjectMapper(), patterns); } /** @@ -222,7 +219,7 @@ protected boolean isEncodeStrings() { } /** - * Set to true to encode String-valued headers as JSON ("..."), by default just the + * Set to true to encode String-valued headers as JSON string ("..."), by default just the * raw String value is converted to a byte array using the configured charset. Set to * true if a consumer of the outbound record is using Spring for Apache Kafka version * less than 2.3 @@ -234,8 +231,15 @@ public void setEncodeStrings(boolean encodeStrings) { } /** - * Add packages to the trusted packages list (default {@code java.util, java.lang}) used + * Add packages to the trusted packages list used * when constructing objects from JSON. + * By default, the following packages are trusted: + *
    + *
  • java.lang
  • + *
  • java.net
  • + *
  • java.util
  • + *
  • org.springframework.util
  • + *
* If any of the supplied packages is {@code "*"}, all packages are trusted. * If a class for a non-trusted package is encountered, the header is returned to the * application with value of type {@link NonTrustedHeaderType}. @@ -286,7 +290,6 @@ public void fromHeaders(MessageHeaders headers, Headers target) { } if (!encodeToJson && valueToAdd instanceof String) { target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset()))); - className = JAVA_LANG_STRING; } else { target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd))); @@ -294,12 +297,12 @@ public void fromHeaders(MessageHeaders headers, Headers target) { jsonHeaders.put(key, className); } catch (Exception e) { - logger.debug(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName()); + logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName()); } } } }); - if (jsonHeaders.size() > 0) { + if (!jsonHeaders.isEmpty()) { try { target.add(new RecordHeader(JSON_TYPES, headerObjectMapper.writeValueAsBytes(jsonHeaders))); } @@ -321,7 +324,7 @@ else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(head headers.put(headerName, new String(header.value(), getCharset())); } else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { - if (jsonTypes != null && jsonTypes.containsKey(headerName)) { + if (jsonTypes.containsKey(headerName)) { String requestedType = jsonTypes.get(headerName); populateJsonValueHeader(header, requestedType, headers); } @@ -355,8 +358,7 @@ private void populateJsonValueHeader(Header header, String requestedType, Map - "Could not decode json type: " + new String(header.value()) + " for key: " - + header.key()); + "Could not decode json type: " + requestedType + " for key: " + header.key()); headers.put(header.key(), header.value()); } } @@ -385,18 +387,16 @@ private Object decodeValue(Header h, Class type) throws IOException, LinkageE return value; } - @SuppressWarnings("unchecked") - @Nullable private Map decodeJsonTypes(Headers source) { - Map types = null; + Map types = Collections.emptyMap(); Header jsonTypes = source.lastHeader(JSON_TYPES); if (jsonTypes != null) { ObjectMapper headerObjectMapper = getObjectMapper(); try { - types = headerObjectMapper.readValue(jsonTypes.value(), Map.class); + types = headerObjectMapper.readValue(jsonTypes.value(), new TypeReference<>() { }); } catch (IOException e) { - logger.error(e, () -> "Could not decode json types: " + new String(jsonTypes.value())); + logger.error(e, () -> "Could not decode json types: " + new String(jsonTypes.value(), StandardCharsets.UTF_8)); } } return types;