diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index 25bf914bd3..1ee37d6e01 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -26,10 +27,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; +import org.assertj.core.util.Streams; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; @@ -49,12 +52,14 @@ * @author Gary Russell * @author Artem Bilan * @author Soby Chacko + * @author Grzegorz Poznachowski * * @since 1.3 - * */ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper { + private static final String ITERABLE_HEADER_TYPE_PATTERN = "%s#%s"; + private static final String JAVA_LANG_STRING = "java.lang.String"; private static final Set TRUSTED_ARRAY_TYPES = Set.of( @@ -97,6 +102,7 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper { * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in * {@link KafkaHeaders} are never mapped as headers since they represent data in * consumer/producer records. + * * @see #DefaultKafkaHeaderMapper(ObjectMapper) */ public DefaultKafkaHeaderMapper() { @@ -111,6 +117,7 @@ public DefaultKafkaHeaderMapper() { * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in * {@link KafkaHeaders} are never mapped as headers since they represent data in * consumer/producer records. + * * @param objectMapper the object mapper. * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ @@ -129,6 +136,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { * generally should not map the {@code "id" and "timestamp"} headers. Note: * most of the headers in {@link KafkaHeaders} are ever mapped as headers since they * represent data in consumer/producer records. + * * @param patterns the patterns. * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ @@ -144,8 +152,9 @@ public DefaultKafkaHeaderMapper(String... patterns) { * you generally should not map the {@code "id" and "timestamp"} headers. Note: most * of the headers in {@link KafkaHeaders} are never mapped as headers since they * represent data in consumer/producer records. + * * @param objectMapper the object mapper. - * @param patterns the patterns. + * @param patterns the patterns. * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { @@ -161,6 +170,7 @@ private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, St /** * Create an instance for inbound mapping only with pattern matching. + * * @param patterns the patterns to match. * @return the header mapper. * @since 2.8.8 @@ -171,8 +181,9 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patt /** * Create an instance for inbound mapping only with pattern matching. + * * @param objectMapper the object mapper. - * @param patterns the patterns to match. + * @param patterns the patterns to match. * @return the header mapper. * @since 2.8.8 */ @@ -182,6 +193,7 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper o /** * Return the object mapper. + * * @return the mapper. */ protected ObjectMapper getObjectMapper() { @@ -190,6 +202,7 @@ protected ObjectMapper getObjectMapper() { /** * Provide direct access to the trusted packages set for subclasses. + * * @return the trusted packages. * @since 2.2 */ @@ -199,6 +212,7 @@ protected Set getTrustedPackages() { /** * Provide direct access to the toString() classes by subclasses. + * * @return the toString() classes. * @since 2.2 */ @@ -215,6 +229,7 @@ protected boolean isEncodeStrings() { * raw String value is converted to a byte array using the configured charset. Set to * true if a consumer of the outbound record is using Spring for Apache Kafka version * less than 2.3 + * * @param encodeStrings true to encode (default false). * @since 2.3 */ @@ -235,6 +250,7 @@ public void setEncodeStrings(boolean encodeStrings) { * If any of the supplied packages is {@code "*"}, all packages are trusted. * If a class for a non-trusted package is encountered, the header is returned to the * application with value of type {@link NonTrustedHeaderType}. + * * @param packagesToTrust the packages to trust. */ public void addTrustedPackages(String... packagesToTrust) { @@ -254,6 +270,7 @@ public void addTrustedPackages(String... packagesToTrust) { /** * Add class names that the outbound mapper should perform toString() operations on * before mapping. + * * @param classNames the class names. * @since 2.2 */ @@ -265,32 +282,15 @@ public void addToStringClasses(String... classNames) { public void fromHeaders(MessageHeaders headers, Headers target) { final Map jsonHeaders = new HashMap<>(); final ObjectMapper headerObjectMapper = getObjectMapper(); - headers.forEach((key, rawValue) -> { - if (matches(key, rawValue)) { - Object valueToAdd = headerValueToAddOut(key, rawValue); - if (valueToAdd instanceof byte[]) { - target.add(new RecordHeader(key, (byte[]) valueToAdd)); + headers.forEach((key, value) -> { + if (matches(key, value)) { + if (value instanceof List values) { + for (int i = 0; i < values.size(); i++) { + resolveHeader(key, values.get(i), target, jsonHeaders, i); + } } else { - try { - String className = valueToAdd.getClass().getName(); - boolean encodeToJson = this.encodeStrings; - if (this.toStringClasses.contains(className)) { - valueToAdd = valueToAdd.toString(); - className = JAVA_LANG_STRING; - encodeToJson = true; - } - if (!encodeToJson && valueToAdd instanceof String) { - target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset()))); - } - else { - target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd))); - } - jsonHeaders.put(key, className); - } - catch (Exception e) { - logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName()); - } + resolveHeader(key, value, target, jsonHeaders, null); } } }); @@ -304,34 +304,84 @@ public void fromHeaders(MessageHeaders headers, Headers target) { } } - @Override - public void toHeaders(Headers source, final Map headers) { - final Map jsonTypes = decodeJsonTypes(source); - source.forEach(header -> { - String headerName = header.key(); - if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) { - headers.put(headerName, ByteBuffer.wrap(header.value()).getInt()); - } - else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) { - headers.put(headerName, new String(header.value(), getCharset())); - } - else if (headerName.equals(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) || - headerName.equals(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)) { - headers.put(headerName, header); - } - else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { - if (jsonTypes.containsKey(headerName)) { - String requestedType = jsonTypes.get(headerName); - populateJsonValueHeader(header, requestedType, headers); + private void resolveHeader(String headerName, Object value, Headers target, Map jsonHeaders, Integer headerIndex) { + Object valueToAdd = headerValueToAddOut(headerName, value); + if (valueToAdd instanceof byte[] byteArray) { + target.add(new RecordHeader(headerName, byteArray)); + } + else { + try { + String className = valueToAdd.getClass().getName(); + boolean encodeToJson = this.encodeStrings; + if (this.toStringClasses.contains(className)) { + valueToAdd = valueToAdd.toString(); + className = JAVA_LANG_STRING; + encodeToJson = true; + } + if (!encodeToJson && valueToAdd instanceof String stringValue) { + target.add(new RecordHeader(headerName, stringValue.getBytes(getCharset()))); } else { - headers.put(headerName, headerValueToAddIn(header)); + target.add(new RecordHeader(headerName, this.objectMapper.writeValueAsBytes(valueToAdd))); } + jsonHeaders.put(headerIndex == null ? + headerName : + ITERABLE_HEADER_TYPE_PATTERN.formatted(headerName, headerIndex), className); } - }); + catch (Exception e) { + logger.error(e, () -> "Could not map " + headerName + " with type " + value.getClass().getName()); + } + } + } + + @Override + public void toHeaders(Headers source, final Map target) { + final Map jsonTypes = decodeJsonTypes(source); + + Streams.stream(source) + .collect(Collectors.groupingBy(Header::key)) + .forEach((headerName, headers) -> { + Header lastHeader = headers.get(headers.size() - 1); + if (headerName.equals(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) || + headerName.equals(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)) { + target.put(headerName, lastHeader); + } + else if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT) && matchesForInbound(headerName)) { + target.put(headerName, ByteBuffer.wrap(lastHeader.value()).getInt()); + } + else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) { + target.put(headerName, new String(lastHeader.value(), getCharset())); + } + else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { + if (headers.size() == 1) { + if (jsonTypes.containsKey(headerName)) { + String requestedType = jsonTypes.get(headerName); + target.put(headerName, resolveJsonValueHeader(headers.get(0), requestedType)); + } + else { + target.put(headerName, headerValueToAddIn(headers.get(0))); + } + } + else { + List valueList = new ArrayList<>(); + for (int i = 0; i < headers.size(); i++) { + var jsonTypeIterableHeader = ITERABLE_HEADER_TYPE_PATTERN.formatted(headerName, i); + if (jsonTypes.containsKey(jsonTypeIterableHeader)) { + String requestedType = jsonTypes.get(jsonTypeIterableHeader); + valueList.add(resolveJsonValueHeader(headers.get(i), requestedType)); + } + else { + valueList.add(headerValueToAddIn(headers.get(i))); + } + } + Collections.reverse(valueList); + target.put(headerName, valueList); + } + } + }); } - private void populateJsonValueHeader(Header header, String requestedType, Map headers) { + private Object resolveJsonValueHeader(Header header, String requestedType) { Class type = Object.class; boolean trusted = false; try { @@ -344,22 +394,21 @@ private void populateJsonValueHeader(Header header, String requestedType, Map "Could not load class for header: " + header.key()); } if (String.class.equals(type) && (header.value().length == 0 || header.value()[0] != '"')) { - headers.put(header.key(), new String(header.value(), getCharset())); + return new String(header.value(), getCharset()); } else { if (trusted) { try { - Object value = decodeValue(header, type); - headers.put(header.key(), value); + return decodeValue(header, type); } catch (IOException e) { logger.error(e, () -> "Could not decode json type: " + requestedType + " for key: " + header.key()); - headers.put(header.key(), header.value()); + return header.value(); } } else { - headers.put(header.key(), new NonTrustedHeaderType(header.value(), requestedType)); + return new NonTrustedHeaderType(header.value(), requestedType); } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaMessageHeaderAccessor.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaMessageHeaderAccessor.java index 96975cbdd7..3c2c08b7d3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaMessageHeaderAccessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaMessageHeaderAccessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.kafka.support; import java.nio.ByteBuffer; +import java.util.List; import org.springframework.kafka.retrytopic.RetryTopicHeaders; import org.springframework.lang.Nullable; @@ -64,10 +65,27 @@ public int getNonBlockingRetryDeliveryAttempt() { } private int fromBytes(String headerName) { - byte[] header = getHeader(headerName, byte[].class); + byte[] header = getFirstHeaderIfIterable(headerName, byte[].class); return header == null ? 1 : ByteBuffer.wrap(header).getInt(); } + @SuppressWarnings("unchecked") + @Nullable + public T getFirstHeaderIfIterable(String key, Class type) { + Object value = getHeader(key); + if (value == null) { + return null; + } + if (value instanceof List iterable) { + value = iterable.get(0); + } + if (!type.isAssignableFrom(value.getClass())) { + throw new IllegalArgumentException("Incorrect type specified for header '" + key + "'. Expected [" + type + + "] but actual type is [" + value.getClass() + "]"); + } + return (T) value; + } + /** * Get a header value with a specific type. * @param the type. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java index 30effa38e0..0946e59a01 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,17 @@ package org.springframework.kafka.support; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; +import org.assertj.core.util.Streams; import org.springframework.messaging.MessageHeaders; @@ -35,8 +40,8 @@ * The exceptions are correlation and reply headers for request/reply * * @author Gary Russell + * @author Grzegorz Poznachowski * @since 2.1.3 - * */ public class SimpleKafkaHeaderMapper extends AbstractKafkaHeaderMapper { @@ -69,6 +74,7 @@ public SimpleKafkaHeaderMapper() { * generally should not map the {@code "id" and "timestamp"} headers. Note: * most of the headers in {@link KafkaHeaders} are never mapped as headers since they * represent data in consumer/producer records. + * * @param patterns the patterns. * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ @@ -82,6 +88,7 @@ private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) { /** * Create an instance for inbound mapping only with pattern matching. + * * @param patterns the patterns to match. * @return the header mapper. * @since 2.8.8 @@ -94,27 +101,48 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte public void fromHeaders(MessageHeaders headers, Headers target) { headers.forEach((key, value) -> { if (!NEVER.contains(key)) { - Object valueToAdd = headerValueToAddOut(key, value); - if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) { - target.add(new RecordHeader(key, (byte[]) valueToAdd)); + if (value instanceof Collection values) { + values.forEach(v -> resolveSingleHeader(target, key, v)); + } + else { + resolveSingleHeader(target, key, value); } } }); } + private void resolveSingleHeader(Headers target, String key, Object value) { + Object valueToAdd = headerValueToAddOut(key, value); + if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) { + target.add(new RecordHeader(key, (byte[]) valueToAdd)); + } + } + @Override public void toHeaders(Headers source, Map target) { - source.forEach(header -> { - String headerName = header.key(); - if (matchesForInbound(headerName)) { - if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) { - target.put(headerName, ByteBuffer.wrap(header.value()).getInt()); - } - else { - target.put(headerName, headerValueToAddIn(header)); - } - } - }); + Streams.stream(source) + .collect(Collectors.groupingBy(Header::key)) + .forEach((headerName, headers) -> { + if (matchesForInbound(headerName)) { + if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) { + target.put(headerName, ByteBuffer.wrap(headers.get(headers.size() - 1).value()).getInt()); + } + else { + var values = headers.stream() + .map(super::headerValueToAddIn) + .collect(Collectors.collectingAndThen(Collectors.toList(), list -> { + Collections.reverse(list); + return list; + })); + if (values.size() == 1) { + target.put(headerName, values.get(0)); + } + else { + target.put(headerName, values); + } + } + } + }); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java index adb56002fe..7c4266a6d1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java @@ -22,6 +22,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -52,11 +53,12 @@ * @author Gary Russell * @author Artem Bilan * @author Soby Chacko + * @author Grzegorz Poznachowski * * @since 1.3 * */ -public class DefaultKafkaHeaderMapperTests { +class DefaultKafkaHeaderMapperTests { @Test void testTrustedAndNot() { @@ -177,7 +179,7 @@ void testMimeTypeInHeaders() { Object fooHeader = receivedHeaders.get("foo"); assertThat(fooHeader).isInstanceOf(List.class); assertThat(fooHeader).asInstanceOf(InstanceOfAssertFactories.LIST) - .containsExactly("application/json", "text/plain"); + .containsExactlyInAnyOrder("application/json", "text/plain"); } @Test @@ -281,6 +283,27 @@ void testAlwaysStringConvert() { entry("alwaysRaw", "baz".getBytes())); } + @Test + void testIterableHeader() { + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + mapper.addTrustedPackages(getClass().getPackage().getName()); + Map headersMap = new HashMap<>(); + headersMap.put("iterableHeader", + List.of(new Foo(), "stringValue", "test".getBytes(StandardCharsets.UTF_8), Instant.now())); + MessageHeaders headers = new MessageHeaders(headersMap); + RecordHeaders recordHeaders = new RecordHeaders(); + mapper.fromHeaders(headers, recordHeaders); + assertThat(recordHeaders) + .filteredOn(header -> header.key().equals("iterableHeader")) + .hasSize(4); + headersMap = new HashMap<>(); + mapper.toHeaders(recordHeaders, headersMap); + assertThat(headersMap.get("iterableHeader")).asInstanceOf(InstanceOfAssertFactories.list(Object.class)) + .hasSize(4) + .hasAtLeastOneElementOfType(NonTrustedHeaderType.class) + .containsSequence("test".getBytes(StandardCharsets.UTF_8), "stringValue", new Foo()); + } + @Test void deliveryAttempt() { DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); @@ -406,10 +429,10 @@ public static class Bar { private String field; - public Bar() { + Bar() { } - public Bar(String field) { + Bar(String field) { this.field = field; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java index 77caa3d81a..e562f7c6d7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.kafka.common.header.Header; @@ -34,13 +35,15 @@ /** * @author Gary Russell + * @author Grzegorz Poznachowski + * * @since 2.2.5 * */ -public class SimpleKafkaHeaderMapperTests { +class SimpleKafkaHeaderMapperTests { @Test - public void testSpecificStringConvert() { + void testSpecificStringConvert() { SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); Map rawMappedHeaders = new HashMap<>(); rawMappedHeaders.put("thisOnesAString", true); @@ -66,7 +69,27 @@ public void testSpecificStringConvert() { } @Test - public void testNotStringConvert() { + void testIterableHeaderConvert() { + SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); + Map rawMappedHeaders = new HashMap<>(); + rawMappedHeaders.put("stringHeader", true); + mapper.setRawMappedHeaders(rawMappedHeaders); + Map headersMap = new HashMap<>(); + headersMap.put("stringHeader", List.of("firstValue", "secondValue")); + MessageHeaders headers = new MessageHeaders(headersMap); + Headers target = new RecordHeaders(); + mapper.fromHeaders(headers, target); + assertThat(target).containsExactly( + new RecordHeader("stringHeader", "firstValue".getBytes()), + new RecordHeader("stringHeader", "secondValue".getBytes()) + ); + headersMap.clear(); + mapper.toHeaders(target, headersMap); + assertThat(headersMap).contains(entry("stringHeader", List.of("secondValue", "firstValue"))); + } + + @Test + void testNotStringConvert() { SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); Map rawMappedHeaders = new HashMap<>(); rawMappedHeaders.put("thisOnesBytes", false); @@ -89,7 +112,7 @@ public void testNotStringConvert() { } @Test - public void testAlwaysStringConvert() { + void testAlwaysStringConvert() { SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); mapper.setMapAllStringsOut(true); Map rawMappedHeaders = new HashMap<>(); @@ -115,7 +138,7 @@ public void testAlwaysStringConvert() { } @Test - public void testDefaultHeaderPatterns() { + void testDefaultHeaderPatterns() { SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); mapper.setMapAllStringsOut(true); Map headersMap = new HashMap<>();