Skip to content

Commit

Permalink
Enhancements in Kafka binder FunctionBatchingConversionTests
Browse files Browse the repository at this point in the history
  • Loading branch information
sobychacko committed Sep 18, 2024
1 parent fac9eb1 commit 058fc66
Showing 1 changed file with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -33,6 +34,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.util.Assert;

Expand All @@ -56,40 +58,47 @@ public class FunctionBatchingConversionTests {

static CountDownLatch latch = new CountDownLatch(3);

static List<Person> persons = new ArrayList<>();
static List<Map<String, Object>> batchConvertedHeaders = new ArrayList<>();

@Test
void conversionFailuresRemoveTheHeadersProperly() throws Exception {
streamBridge.send("cfrthp-topic", "hello".getBytes(StandardCharsets.UTF_8));
streamBridge.send("cfrthp-topic", "hello".getBytes(StandardCharsets.UTF_8));
streamBridge.send("cfrthp-topic", "{\"name\":\"Ricky\"}".getBytes(StandardCharsets.UTF_8));
streamBridge.send("cfrthp-topic", "{\"name\":\"Julian\"}".getBytes(StandardCharsets.UTF_8));
streamBridge.send("cfrthp-topic", "{\"name\":\"Bubbles\"}".getBytes(StandardCharsets.UTF_8));
streamBridge.send("cfrthp-topic", MessageBuilder.withPayload("hello".getBytes(StandardCharsets.UTF_8))
.setHeader("index", 0).build());
streamBridge.send("cfrthp-topic", MessageBuilder.withPayload("{\"name\":\"Ricky\"}".getBytes(StandardCharsets.UTF_8))
.setHeader("index", 1).build());
streamBridge.send("cfrthp-topic", MessageBuilder.withPayload("{\"name\":\"Julian\"}".getBytes(StandardCharsets.UTF_8))
.setHeader("index", 2).build());
streamBridge.send("cfrthp-topic", MessageBuilder.withPayload("hello".getBytes(StandardCharsets.UTF_8))
.setHeader("index", 3).build());
streamBridge.send("cfrthp-topic", MessageBuilder.withPayload("{\"name\":\"Bubbles\"}".getBytes(StandardCharsets.UTF_8))
.setHeader("index", 4).build());

Assert.isTrue(latch.await(10, TimeUnit.SECONDS), "Failed to receive message");

assertThat(persons.size()).isEqualTo(3);
assertThat(persons.get(0).toString().contains("Ricky")).isTrue();
assertThat(persons.get(1).toString().contains("Julian")).isTrue();
assertThat(persons.get(2).toString().contains("Bubbles")).isTrue();
assertThat(batchConvertedHeaders.size()).isEqualTo(3);

assertThat(batchConvertedHeaders.get(0).get("index")).isEqualTo(1);
assertThat(batchConvertedHeaders.get(1).get("index")).isEqualTo(2);
assertThat(batchConvertedHeaders.get(2).get("index")).isEqualTo(4);
}

@EnableAutoConfiguration
@Configuration
public static class Config {

@Bean
@SuppressWarnings("unchecked")
Consumer<Message<List<Person>>> batchConsumer() {
return message -> {
if (!message.getPayload().isEmpty()) {
List<Map<String, Object>> o = (List<Map<String, Object>>) message.getHeaders().get("kafka_batchConvertedHeaders");
batchConvertedHeaders.addAll(o);
message.getPayload().forEach(c -> {
persons.add(c);
latch.countDown();
});
}
};
}

}

record Person(String name) {
Expand Down

0 comments on commit 058fc66

Please sign in to comment.