Skip to content

Commit

Permalink
spring-projectsGH-3067: Draft of mapping multiple headers with same k…
Browse files Browse the repository at this point in the history
…ey with SimpleKafkaHeaderMapper
  • Loading branch information
poznachowski committed Mar 5, 2024
1 parent 265e55f commit f625dad
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,16 @@
package org.springframework.kafka.support;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.assertj.core.util.Streams;

import org.springframework.messaging.MessageHeaders;

Expand All @@ -36,7 +40,6 @@
*
* @author Gary Russell
* @since 2.1.3
*
*/
public class SimpleKafkaHeaderMapper extends AbstractKafkaHeaderMapper {

Expand Down Expand Up @@ -69,6 +72,7 @@ public SimpleKafkaHeaderMapper() {
* generally should not map the {@code "id" and "timestamp"} headers. Note:
* most of the headers in {@link KafkaHeaders} are never mapped as headers since they
* represent data in consumer/producer records.
*
* @param patterns the patterns.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
Expand All @@ -82,6 +86,7 @@ private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) {

/**
* Create an instance for inbound mapping only with pattern matching.
*
* @param patterns the patterns to match.
* @return the header mapper.
* @since 2.8.8
Expand All @@ -94,27 +99,40 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte
public void fromHeaders(MessageHeaders headers, Headers target) {
headers.forEach((key, value) -> {
if (!NEVER.contains(key)) {
Object valueToAdd = headerValueToAddOut(key, value);
if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
if (value instanceof Collection<?> values) {
values.forEach(v -> mapIfMatched(target, key, v));
} else {
mapIfMatched(target, key, value);
}
}
});
}

private void mapIfMatched(Headers target, String key, Object value) {
Object valueToAdd = headerValueToAddOut(key, value);
if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
}
}

@Override
public void toHeaders(Headers source, Map<String, Object> target) {
source.forEach(header -> {
String headerName = header.key();
if (matchesForInbound(headerName)) {
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) {
target.put(headerName, ByteBuffer.wrap(header.value()).getInt());
}
else {
target.put(headerName, headerValueToAddIn(header));
}
}
});
Streams.stream(source)
.collect(Collectors.groupingBy(Header::key))
.forEach((headerName, headers) -> {
if (matchesForInbound(headerName)) {
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) {
target.put(headerName, ByteBuffer.wrap(headers.get(headers.size() - 1).value()).getInt());
} else {
var values = headers.stream().map(super::headerValueToAddIn).toList();
if (values.size() == 1) {
target.put(headerName, values.get(0));
} else {
target.put(headerName, values);
}
}
}
});
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.header.Header;
Expand Down Expand Up @@ -65,6 +66,26 @@ public void testSpecificStringConvert() {
entry("neverConverted", "baz".getBytes()));
}

@Test
void testIterableHeaderConvert() {
SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("stringHeader", true);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("stringHeader", List.of("firstValue", "secondValue"));
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("stringHeader", "firstValue".getBytes()),
new RecordHeader("stringHeader", "secondValue".getBytes())
);
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(entry("stringHeader", List.of("firstValue", "secondValue")));
}

@Test
public void testNotStringConvert() {
SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper();
Expand Down

0 comments on commit f625dad

Please sign in to comment.