From 0f20c3b4f53d118961cf5209beb318062de88c9a Mon Sep 17 00:00:00 2001 From: Grzegorz Poznachowski Date: Tue, 5 Mar 2024 16:50:30 +0100 Subject: [PATCH] GH-3067: Draft of mapping multiple headers with same key with SimpleKafkaHeaderMapper --- .../support/DefaultKafkaHeaderMapper.java | 154 ++++++++++++------ .../support/SimpleKafkaHeaderMapper.java | 54 ++++-- .../DefaultKafkaHeaderMapperTests.java | 28 +++- .../support/SimpleKafkaHeaderMapperTests.java | 35 +++- 4 files changed, 197 insertions(+), 74 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index 586af3f0f4..b5bc9eedba 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -19,17 +19,21 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; +import org.assertj.core.util.Streams; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; @@ -48,12 +52,14 @@ * * @author Gary Russell * @author Artem Bilan + * @author Grzegorz Poznachowski * * @since 1.3 - * */ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper { + private static final String ITERABLE_HEADER_TYPE_PATTERN = "%s#%s"; + private static final String JAVA_LANG_STRING = "java.lang.String"; private static final Set TRUSTED_ARRAY_TYPES = Set.of( @@ -96,6 +102,7 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper { * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in * {@link KafkaHeaders} are never mapped as headers since they represent data in * consumer/producer records. + * * @see #DefaultKafkaHeaderMapper(ObjectMapper) */ public DefaultKafkaHeaderMapper() { @@ -110,6 +117,7 @@ public DefaultKafkaHeaderMapper() { * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in * {@link KafkaHeaders} are never mapped as headers since they represent data in * consumer/producer records. + * * @param objectMapper the object mapper. * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ @@ -128,6 +136,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { * generally should not map the {@code "id" and "timestamp"} headers. Note: * most of the headers in {@link KafkaHeaders} are ever mapped as headers since they * represent data in consumer/producer records. + * * @param patterns the patterns. * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ @@ -143,8 +152,9 @@ public DefaultKafkaHeaderMapper(String... patterns) { * you generally should not map the {@code "id" and "timestamp"} headers. Note: most * of the headers in {@link KafkaHeaders} are never mapped as headers since they * represent data in consumer/producer records. + * * @param objectMapper the object mapper. - * @param patterns the patterns. + * @param patterns the patterns. * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { @@ -160,6 +170,7 @@ private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, St /** * Create an instance for inbound mapping only with pattern matching. + * * @param patterns the patterns to match. * @return the header mapper. * @since 2.8.8 @@ -170,8 +181,9 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patt /** * Create an instance for inbound mapping only with pattern matching. + * * @param objectMapper the object mapper. - * @param patterns the patterns to match. + * @param patterns the patterns to match. * @return the header mapper. * @since 2.8.8 */ @@ -181,6 +193,7 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper o /** * Return the object mapper. + * * @return the mapper. */ protected ObjectMapper getObjectMapper() { @@ -189,6 +202,7 @@ protected ObjectMapper getObjectMapper() { /** * Provide direct access to the trusted packages set for subclasses. + * * @return the trusted packages. * @since 2.2 */ @@ -198,6 +212,7 @@ protected Set getTrustedPackages() { /** * Provide direct access to the toString() classes by subclasses. + * * @return the toString() classes. * @since 2.2 */ @@ -214,6 +229,7 @@ protected boolean isEncodeStrings() { * raw String value is converted to a byte array using the configured charset. Set to * true if a consumer of the outbound record is using Spring for Apache Kafka version * less than 2.3 + * * @param encodeStrings true to encode (default false). * @since 2.3 */ @@ -234,6 +250,7 @@ public void setEncodeStrings(boolean encodeStrings) { * If any of the supplied packages is {@code "*"}, all packages are trusted. * If a class for a non-trusted package is encountered, the header is returned to the * application with value of type {@link NonTrustedHeaderType}. + * * @param packagesToTrust the packages to trust. */ public void addTrustedPackages(String... packagesToTrust) { @@ -253,6 +270,7 @@ public void addTrustedPackages(String... packagesToTrust) { /** * Add class names that the outbound mapper should perform toString() operations on * before mapping. + * * @param classNames the class names. * @since 2.2 */ @@ -264,32 +282,17 @@ public void addToStringClasses(String... classNames) { public void fromHeaders(MessageHeaders headers, Headers target) { final Map jsonHeaders = new HashMap<>(); final ObjectMapper headerObjectMapper = getObjectMapper(); - headers.forEach((key, rawValue) -> { - if (matches(key, rawValue)) { - Object valueToAdd = headerValueToAddOut(key, rawValue); - if (valueToAdd instanceof byte[]) { - target.add(new RecordHeader(key, (byte[]) valueToAdd)); + headers.forEach((key, value) -> { + if (matches(key, value)) { + if (value instanceof Collection values) { + int i = 0; + for (Object element : values) { + resolveSingleHeader(key, element, target, jsonHeaders, i); + i++; + } } else { - try { - String className = valueToAdd.getClass().getName(); - boolean encodeToJson = this.encodeStrings; - if (this.toStringClasses.contains(className)) { - valueToAdd = valueToAdd.toString(); - className = JAVA_LANG_STRING; - encodeToJson = true; - } - if (!encodeToJson && valueToAdd instanceof String) { - target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset()))); - } - else { - target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd))); - } - jsonHeaders.put(key, className); - } - catch (Exception e) { - logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName()); - } + resolveSingleHeader(key, value, target, jsonHeaders); } } }); @@ -303,30 +306,82 @@ public void fromHeaders(MessageHeaders headers, Headers target) { } } - @Override - public void toHeaders(Headers source, final Map headers) { - final Map jsonTypes = decodeJsonTypes(source); - source.forEach(header -> { - String headerName = header.key(); - if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) { - headers.put(headerName, ByteBuffer.wrap(header.value()).getInt()); - } - else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) { - headers.put(headerName, new String(header.value(), getCharset())); - } - else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { - if (jsonTypes.containsKey(headerName)) { - String requestedType = jsonTypes.get(headerName); - populateJsonValueHeader(header, requestedType, headers); + private void resolveSingleHeader(String headerName, Object value, Headers target, Map jsonHeaders) { + resolveSingleHeader(headerName, value, target, jsonHeaders, null); + } + + private void resolveSingleHeader(String headerName, Object value, Headers target, Map jsonHeaders, Integer headerIndex) { + Object valueToAdd = headerValueToAddOut(headerName, value); + if (valueToAdd instanceof byte[] byteArray) { + target.add(new RecordHeader(headerName, byteArray)); + } + else { + try { + String className = valueToAdd.getClass().getName(); + boolean encodeToJson = this.encodeStrings; + if (this.toStringClasses.contains(className)) { + valueToAdd = valueToAdd.toString(); + className = JAVA_LANG_STRING; + encodeToJson = true; + } + if (!encodeToJson && valueToAdd instanceof String stringValue) { + target.add(new RecordHeader(headerName, stringValue.getBytes(getCharset()))); } else { - headers.put(headerName, headerValueToAddIn(header)); + target.add(new RecordHeader(headerName, this.objectMapper.writeValueAsBytes(valueToAdd))); } + jsonHeaders.put(headerIndex == null ? + headerName : + ITERABLE_HEADER_TYPE_PATTERN.formatted(headerName, headerIndex), className); } - }); + catch (Exception e) { + logger.error(e, () -> "Could not map " + headerName + " with type " + value.getClass().getName()); + } + } + } + + @Override + public void toHeaders(Headers source, final Map target) { + final Map jsonTypes = decodeJsonTypes(source); + + Streams.stream(source) + .collect(Collectors.groupingBy(Header::key)) + .forEach((headerName, headers) -> { + if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) { + target.put(headerName, ByteBuffer.wrap(headers.get(headers.size() - 1).value()).getInt()); + } + else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) { + target.put(headerName, new String(headers.get(headers.size() - 1).value(), getCharset())); + } + else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { + if (headers.size() == 1) { + if (jsonTypes.containsKey(headerName)) { + String requestedType = jsonTypes.get(headerName); + target.put(headerName, resolveJsonValueHeader(headers.get(0), requestedType)); + } + else { + target.put(headerName, headerValueToAddIn(headers.get(0))); + } + } + else { + List valueList = new ArrayList<>(); + for (int i = 0; i < headers.size(); i++) { + var jsonTypeIterableHeader = ITERABLE_HEADER_TYPE_PATTERN.formatted(headerName, i); + if (jsonTypes.containsKey(jsonTypeIterableHeader)) { + String requestedType = jsonTypes.get(jsonTypeIterableHeader); + valueList.add(resolveJsonValueHeader(headers.get(i), requestedType)); + } + else { + valueList.add(headerValueToAddIn(headers.get(i))); + } + } + target.put(headerName, valueList); + } + } + }); } - private void populateJsonValueHeader(Header header, String requestedType, Map headers) { + private Object resolveJsonValueHeader(Header header, String requestedType) { Class type = Object.class; boolean trusted = false; try { @@ -339,22 +394,21 @@ private void populateJsonValueHeader(Header header, String requestedType, Map "Could not load class for header: " + header.key()); } if (String.class.equals(type) && (header.value().length == 0 || header.value()[0] != '"')) { - headers.put(header.key(), new String(header.value(), getCharset())); + return new String(header.value(), getCharset()); } else { if (trusted) { try { - Object value = decodeValue(header, type); - headers.put(header.key(), value); + return decodeValue(header, type); } catch (IOException e) { logger.error(e, () -> "Could not decode json type: " + requestedType + " for key: " + header.key()); - headers.put(header.key(), header.value()); + return header.value(); } } else { - headers.put(header.key(), new NonTrustedHeaderType(header.value(), requestedType)); + return new NonTrustedHeaderType(header.value(), requestedType); } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java index 30effa38e0..14ffee1a61 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,16 @@ package org.springframework.kafka.support; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; +import org.assertj.core.util.Streams; import org.springframework.messaging.MessageHeaders; @@ -35,8 +39,8 @@ * The exceptions are correlation and reply headers for request/reply * * @author Gary Russell + * @author Grzegorz Poznachowski * @since 2.1.3 - * */ public class SimpleKafkaHeaderMapper extends AbstractKafkaHeaderMapper { @@ -69,6 +73,7 @@ public SimpleKafkaHeaderMapper() { * generally should not map the {@code "id" and "timestamp"} headers. Note: * most of the headers in {@link KafkaHeaders} are never mapped as headers since they * represent data in consumer/producer records. + * * @param patterns the patterns. * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ @@ -82,6 +87,7 @@ private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) { /** * Create an instance for inbound mapping only with pattern matching. + * * @param patterns the patterns to match. * @return the header mapper. * @since 2.8.8 @@ -94,27 +100,43 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte public void fromHeaders(MessageHeaders headers, Headers target) { headers.forEach((key, value) -> { if (!NEVER.contains(key)) { - Object valueToAdd = headerValueToAddOut(key, value); - if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) { - target.add(new RecordHeader(key, (byte[]) valueToAdd)); + if (value instanceof Collection values) { + values.forEach(v -> resolveSingleHeader(target, key, v)); + } + else { + resolveSingleHeader(target, key, value); } } }); } + private void resolveSingleHeader(Headers target, String key, Object value) { + Object valueToAdd = headerValueToAddOut(key, value); + if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) { + target.add(new RecordHeader(key, (byte[]) valueToAdd)); + } + } + @Override public void toHeaders(Headers source, Map target) { - source.forEach(header -> { - String headerName = header.key(); - if (matchesForInbound(headerName)) { - if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) { - target.put(headerName, ByteBuffer.wrap(header.value()).getInt()); - } - else { - target.put(headerName, headerValueToAddIn(header)); - } - } - }); + Streams.stream(source) + .collect(Collectors.groupingBy(Header::key)) + .forEach((headerName, headers) -> { + if (matchesForInbound(headerName)) { + if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) { + target.put(headerName, ByteBuffer.wrap(headers.get(headers.size() - 1).value()).getInt()); + } + else { + var values = headers.stream().map(super::headerValueToAddIn).toList(); + if (values.size() == 1) { + target.put(headerName, values.get(0)); + } + else { + target.put(headerName, values); + } + } + } + }); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java index daf1aa6c29..1c5972551a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -32,6 +33,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.DefaultKafkaHeaderMapper.NonTrustedHeaderType; @@ -46,11 +48,12 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Grzegorz Poznachowski * * @since 1.3 * */ -public class DefaultKafkaHeaderMapperTests { +class DefaultKafkaHeaderMapperTests { @Test void testTrustedAndNot() { @@ -274,6 +277,27 @@ void testAlwaysStringConvert() { entry("alwaysRaw", "baz".getBytes())); } + @Test + void testIterableHeader() { + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + mapper.addTrustedPackages(getClass().getPackage().getName()); + Map headersMap = new HashMap<>(); + headersMap.put("iterableHeader", + List.of(new Foo(), "stringValue", "test".getBytes(StandardCharsets.UTF_8), Instant.now())); + MessageHeaders headers = new MessageHeaders(headersMap); + RecordHeaders recordHeaders = new RecordHeaders(); + mapper.fromHeaders(headers, recordHeaders); + assertThat(recordHeaders) + .filteredOn(header -> header.key().equals("iterableHeader")) + .hasSize(4); + headersMap = new HashMap<>(); + mapper.toHeaders(recordHeaders, headersMap); + assertThat(headersMap.get("iterableHeader")).asInstanceOf(InstanceOfAssertFactories.list(Object.class)) + .hasSize(4) + .contains(new Foo(), "stringValue", "test".getBytes(StandardCharsets.UTF_8)) + .hasAtLeastOneElementOfType(NonTrustedHeaderType.class); + } + @Test void deliveryAttempt() { DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java index 77caa3d81a..280bc24bf3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.kafka.common.header.Header; @@ -34,13 +35,15 @@ /** * @author Gary Russell + * @author Grzegorz Poznachowski + * * @since 2.2.5 * */ -public class SimpleKafkaHeaderMapperTests { +class SimpleKafkaHeaderMapperTests { @Test - public void testSpecificStringConvert() { + void testSpecificStringConvert() { SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); Map rawMappedHeaders = new HashMap<>(); rawMappedHeaders.put("thisOnesAString", true); @@ -66,7 +69,27 @@ public void testSpecificStringConvert() { } @Test - public void testNotStringConvert() { + void testIterableHeaderConvert() { + SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); + Map rawMappedHeaders = new HashMap<>(); + rawMappedHeaders.put("stringHeader", true); + mapper.setRawMappedHeaders(rawMappedHeaders); + Map headersMap = new HashMap<>(); + headersMap.put("stringHeader", List.of("firstValue", "secondValue")); + MessageHeaders headers = new MessageHeaders(headersMap); + Headers target = new RecordHeaders(); + mapper.fromHeaders(headers, target); + assertThat(target).containsExactlyInAnyOrder( + new RecordHeader("stringHeader", "firstValue".getBytes()), + new RecordHeader("stringHeader", "secondValue".getBytes()) + ); + headersMap.clear(); + mapper.toHeaders(target, headersMap); + assertThat(headersMap).contains(entry("stringHeader", List.of("firstValue", "secondValue"))); + } + + @Test + void testNotStringConvert() { SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); Map rawMappedHeaders = new HashMap<>(); rawMappedHeaders.put("thisOnesBytes", false); @@ -89,7 +112,7 @@ public void testNotStringConvert() { } @Test - public void testAlwaysStringConvert() { + void testAlwaysStringConvert() { SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); mapper.setMapAllStringsOut(true); Map rawMappedHeaders = new HashMap<>(); @@ -115,7 +138,7 @@ public void testAlwaysStringConvert() { } @Test - public void testDefaultHeaderPatterns() { + void testDefaultHeaderPatterns() { SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); mapper.setMapAllStringsOut(true); Map headersMap = new HashMap<>();