From cf63cab77f8a8b4ad124867e3d9dc5ebca24f1ec Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 17 Jan 2024 14:06:13 -0500 Subject: [PATCH] Some clean up in the EnableKafkaIntegrationTests --- .../EnableKafkaIntegrationTests.java | 134 +++++++----------- 1 file changed, 55 insertions(+), 79 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 29ae413970..dd757944f6 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,7 +52,6 @@ import java.util.stream.Collectors; import org.aopalliance.intercept.MethodInterceptor; -import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.admin.NewTopic; @@ -180,7 +179,7 @@ */ @SpringJUnitConfig @DirtiesContext -@EmbeddedKafka(topics = { "annotated1", "annotated2", "annotated3", "annotated3x", +@EmbeddedKafka(topics = {"annotated1", "annotated2", "annotated3", "annotated3x", "annotated4", "annotated5", "annotated6", "annotated7", "annotated8", "annotated8reply", "annotated9", "annotated10", "annotated11", "annotated12", "annotated13", "annotated14", "annotated15", "annotated16", "annotated17", @@ -190,7 +189,7 @@ "annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33", "annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle", "annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41", "annotated42", - "annotated43", "annotated43reply" }) + "annotated43", "annotated43reply"}) @TestPropertySource(properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10") public class EnableKafkaIntegrationTests { @@ -273,7 +272,6 @@ public void testAnonymous() { container.stop(); } - @SuppressWarnings("deprecation") @Test public void manyTests() throws Exception { this.recordFilter.called = false; @@ -299,7 +297,7 @@ public void manyTests() throws Exception { .tag("result", "success") .timer() .count()) - .isGreaterThan(0L); + .isGreaterThan(0L); assertThat(this.meterRegistry.get("spring.kafka.listener") .tag("name", "bar-0") @@ -307,7 +305,7 @@ public void manyTests() throws Exception { .tag("result", "success") .timer() .count()) - .isGreaterThan(0L); + .isGreaterThan(0L); template.send("annotated3", 0, "foo"); assertThat(this.listener.latch3.await(60, TimeUnit.SECONDS)).isTrue(); @@ -323,7 +321,7 @@ public void manyTests() throws Exception { assertThat(this.listener.capturedRecord.value()).isEqualTo("foo"); assertThat(this.listener.ack).isNotNull(); assertThat(this.listener.eventLatch.await(60, TimeUnit.SECONDS)).isTrue(); - assertThat(this.listener.event.getListenerId().startsWith("qux-")); + assertThat(this.listener.event.getListenerId()).startsWith("qux-"); MessageListenerContainer manualContainer = this.registry.getListenerContainer("qux"); assertThat(KafkaTestUtils.getPropertyValue(manualContainer, "containerProperties.messageListener")) .isInstanceOf(FilteringMessageListenerAdapter.class); @@ -339,7 +337,7 @@ public void manyTests() throws Exception { assertThat( KafkaTestUtils.getPropertyValue(this.listener.listen4Consumer, "fetcher.fetchConfig.maxPollRecords", Integer.class)) - .isEqualTo(100); + .isEqualTo(100); assertThat(this.quxGroup).hasSize(1); assertThat(this.quxGroup.get(0)).isSameAs(manualContainer); List containers = KafkaTestUtils.getPropertyValue(manualContainer, "containers", List.class); @@ -350,7 +348,7 @@ public void manyTests() throws Exception { template.send("annotated4", 0, "foo"); assertThat(this.listener.noLongerIdleEventLatch.await(60, TimeUnit.SECONDS)).isTrue(); - assertThat(this.listener.noLongerIdleEvent.getListenerId().startsWith("qux-")); + assertThat(this.listener.noLongerIdleEvent.getListenerId()).startsWith("qux-"); template.send("annotated5", 0, 0, "foo"); template.send("annotated5", 1, 0, "bar"); @@ -369,7 +367,7 @@ public void manyTests() throws Exception { TopicPartitionOffset[].class)[3]; assertThat(offset.isRelativeToCurrent()).isTrue(); assertThat(KafkaTestUtils.getPropertyValue(fizContainer, - "listenerConsumer.consumer.groupId", Optional.class).get()) + "listenerConsumer.consumer.groupId", Optional.class).get()) .isEqualTo("fiz"); assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.clientId")) .isEqualTo("clientIdViaAnnotation-0"); @@ -424,8 +422,8 @@ public void testAutoStartup() { assertThat(listenerContainer.isRunning()).isTrue(); KafkaMessageListenerContainer kafkaMessageListenerContainer = ((ConcurrentMessageListenerContainer) listenerContainer) - .getContainers() - .get(0); + .getContainers() + .get(0); assertThat(kafkaMessageListenerContainer .getContainerProperties().getSyncCommitTimeout()) .isEqualTo(Duration.ofSeconds(59)); @@ -454,7 +452,7 @@ public void testInterface() throws Exception { this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "annotated43reply"); template.send("annotated43", 0, "foo"); ConsumerRecord reply = KafkaTestUtils.getSingleRecord(consumer, "annotated43reply"); - assertThat(reply).extracting(rec -> rec.value()).isEqualTo("FOO"); + assertThat(reply).extracting(ConsumerRecord::value).isEqualTo("FOO"); consumer.close(); } @@ -531,7 +529,7 @@ public void testJson() throws Exception { MessageListenerContainer buzContainer = (MessageListenerContainer) KafkaTestUtils .getPropertyValue(buzConcurrentContainer, "containers", List.class).get(0); assertThat(KafkaTestUtils.getPropertyValue(buzContainer, - "listenerConsumer.consumer.groupId", Optional.class).get()) + "listenerConsumer.consumer.groupId", Optional.class).get()) .isEqualTo("buz.explicitGroupId"); assertThat(KafkaTestUtils.getPropertyValue(buzContainer, "listenerConsumer.consumer.fetcher.fetchConfig.maxPollRecords")) @@ -581,6 +579,7 @@ public void testNulls() throws Exception { .timer(); } catch (MeterNotFoundException ex) { + // Ignore for the next attempt } assertThat(timer) .describedAs("Timer not found in " + ((SimpleMeterRegistry) this.meterRegistry).getMetersAsString()) @@ -599,6 +598,7 @@ public void testNulls() throws Exception { .timer(); } catch (MeterNotFoundException ex) { + // Ignore for the next attempt } assertThat(timer) .describedAs("Timer not found in " + ((SimpleMeterRegistry) this.meterRegistry).getMetersAsString()) @@ -949,14 +949,14 @@ public void testKeyConversion() throws Exception { .tag("spring.id", "bytesStringConsumerFactory.tag-0") .functionCounter() .count()) - .isGreaterThan(0); + .isGreaterThan(0); assertThat(this.meterRegistry.get("kafka.producer.incoming.byte.total") .tag("producerTag", "bytesString") .tag("spring.id", "bytesStringProducerFactory.bsPF-1") .functionCounter() .count()) - .isGreaterThan(0); + .isGreaterThan(0); } catch (Exception e) { logger.error(this.meterRegistry.getMeters() @@ -998,7 +998,7 @@ public void testSeekToLastOnIdle() throws InterruptedException { assertThat(KafkaTestUtils.getPropertyValue(this.seekOnIdleListener, "callbacks", Map.class)).hasSize(0); } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testReplyingBatchListenerReturnCollection() { Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties()); @@ -1100,7 +1100,7 @@ public PlatformTransactionManager transactionManager() { @Bean public KafkaListenerContainerFactory> - kafkaListenerContainerFactory() { + kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); @@ -1130,9 +1130,8 @@ public void handleRemaining(Exception thrownException, List } @Bean - @SuppressWarnings("deprecation") public KafkaListenerContainerFactory> - factoryWithBadConverter() { + factoryWithBadConverter() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); @@ -1163,7 +1162,8 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle @Override public ProducerRecord fromMessage(Message message, String defaultTopic) { - throw new UnsupportedOperationException(); } + throw new UnsupportedOperationException(); + } }); return factory; @@ -1171,7 +1171,7 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle @Bean public KafkaListenerContainerFactory> - withNoReplyTemplateContainerFactory() { + withNoReplyTemplateContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); @@ -1275,7 +1275,7 @@ public KafkaListenerContainerFactory batchJsonReplyFactory() { return factory; } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) @Bean public KafkaListenerContainerFactory batchSpyFactory() { ConcurrentKafkaListenerContainerFactory factory = @@ -1329,9 +1329,8 @@ public KafkaListenerContainerFactory batchManualFactory2() { } @Bean - @SuppressWarnings("deprecation") public KafkaListenerContainerFactory> - kafkaManualAckListenerContainerFactory() { + kafkaManualAckListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); @@ -1347,7 +1346,7 @@ public KafkaListenerContainerFactory batchManualFactory2() { @Bean public KafkaListenerContainerFactory> - kafkaAutoStartFalseListenerContainerFactory() { + kafkaAutoStartFalseListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); @@ -1362,7 +1361,7 @@ public KafkaListenerContainerFactory batchManualFactory2() { @Bean public KafkaListenerContainerFactory> - kafkaRebalanceListenerContainerFactory() { + kafkaRebalanceListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); @@ -1374,9 +1373,8 @@ public KafkaListenerContainerFactory batchManualFactory2() { } @Bean - @SuppressWarnings("deprecation") public KafkaListenerContainerFactory> - recordAckListenerContainerFactory() { + recordAckListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); @@ -1412,9 +1410,7 @@ private ConsumerFactory configuredConsumerFactory(String client @Bean public Map consumerConfigs() { - Map consumerProps = - KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", this.embeddedKafka); - return consumerProps; + return KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", this.embeddedKafka); } @Bean @@ -1484,7 +1480,7 @@ public ProducerFactory bytesStringProducerFactory() { configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); configs.put(ProducerConfig.CLIENT_ID_CONFIG, "bsPF"); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(configs); - pf.addListener(new MicrometerProducerListener(meterRegistry(), + pf.addListener(new MicrometerProducerListener<>(meterRegistry(), Collections.singletonList(new ImmutableTag("producerTag", "bytesString")))); return pf; } @@ -1772,7 +1768,6 @@ protected boolean supports(Class clazz) { } @Override - @Nullable protected Object convertFromInternal(Message message, Class targetClass, @Nullable Object conversionHint) { @@ -1964,8 +1959,8 @@ static class Listener implements ConsumerSeekAware { @KafkaListener(id = "manualStart", topics = "manualStart", containerFactory = "kafkaAutoStartFalseListenerContainerFactory", - properties = { ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + ":301000", - ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG + ":59000" }) + properties = {ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + ":301000", + ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG + ":59000"}) public void manualStart(String foo) { } @@ -1989,7 +1984,7 @@ public void listen1Batch(List foo) { PrintWriter printWriter = new PrintWriter(stringWriter, true); new RuntimeException().printStackTrace(printWriter); this.batchOverrideStackTrace = stringWriter.getBuffer().toString(); - foo.forEach(s -> this.latch1Batch.countDown()); + foo.forEach(s -> this.latch1Batch.countDown()); } @KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", filter = "#{@lambdaAll}", @@ -2058,7 +2053,7 @@ public void eventHandler(ListenerContainerNoLongerIdleEvent event) { } @KafkaListener(id = "fiz", topicPartitions = { - @TopicPartition(topic = "annotated5", partitions = { "#{'${foo:0,1}'.split(',')}" }), + @TopicPartition(topic = "annotated5", partitions = {"#{'${foo:0,1}'.split(',')}"}), @TopicPartition(topic = "annotated6", partitions = "0", partitionOffsets = @PartitionOffset(partition = "${xxx:1}", initialOffset = "${yyy:0}", relativeToCurrent = "${zzz:true}")) @@ -2125,6 +2120,7 @@ public void listen11(List list, @Header(KafkaHeaders.RECEIVED_TOPIC) List topics, @Header(KafkaHeaders.OFFSET) List offsets, @Header(KafkaHeaders.LISTENER_INFO) String info) { + this.payload = list; this.keys = keys; this.partitions = partitions; @@ -2228,7 +2224,7 @@ public Collection replyingBatchListenerCollection(List in) { .collect(Collectors.toList()); } - @KafkaListener(id = "batchAckListener", topics = { "annotated26", "annotated27" }, + @KafkaListener(id = "batchAckListener", topics = {"annotated26", "annotated27"}, containerFactory = "batchFactory") public void batchAckListener(@SuppressWarnings("unused") List in, @Header(KafkaHeaders.RECEIVED_PARTITION) List partitionsHeader, @@ -2243,17 +2239,17 @@ public void batchAckListener(@SuppressWarnings("unused") List in, new org.apache.kafka.common.TopicPartition(inTopic, partitionsHeader.get(i)))); if (committed.values().iterator().next() != null) { if ("annotated26".equals(inTopic) && committed - .values() - .iterator() - .next() - .offset() == 1) { + .values() + .iterator() + .next() + .offset() == 1) { this.latch18.countDown(); } else if ("annotated27".equals(inTopic) && committed - .values() - .iterator() - .next() - .offset() == 3) { + .values() + .iterator() + .next() + .offset() == 3) { this.latch18.countDown(); } } @@ -2323,13 +2319,10 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro if ("multiListenerSendTo".equals(beanName)) { ProxyFactory proxyFactory = new ProxyFactory(bean); proxyFactory.setProxyTargetClass(true); - proxyFactory.addAdvice(new MethodInterceptor() { - @Override - public Object invoke(MethodInvocation invocation) throws Throwable { - logger.info(String.format("Proxy listener for %s.$s", - invocation.getMethod().getDeclaringClass(), invocation.getMethod().getName())); - return invocation.proceed(); - } + proxyFactory.addAdvice((MethodInterceptor) invocation -> { + logger.info(String.format("Proxy listener for %s.%s", + invocation.getMethod().getDeclaringClass(), invocation.getMethod().getName())); + return invocation.proceed(); }); return proxyFactory.getProxy(); } @@ -2380,13 +2373,13 @@ public void onIdleContainer(Map as public void rewindAllOneRecord() { getSeekCallbacks() - .forEach((tp, callback) -> - callback.seekRelative(tp.topic(), tp.partition(), -1, true)); + .forEach((tp, callback) -> + callback.seekRelative(tp.topic(), tp.partition(), -1, true)); } public void rewindOnePartitionOneRecord(String topic, int partition) { getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition)) - .seekRelative(topic, partition, -1, true); + .seekRelative(topic, partition, -1, true); } @Override @@ -2394,7 +2387,7 @@ public synchronized void onPartitionsAssigned(Map 0) { + if (!assignments.isEmpty()) { this.consumerThreads.add(Thread.currentThread()); notifyAll(); } @@ -2635,7 +2628,7 @@ public void setBar(String bar) { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((this.bar == null) ? 0 : this.bar.hashCode()); + result = prime * result + this.bar.hashCode(); return result; } @@ -2651,12 +2644,7 @@ public boolean equals(Object obj) { return false; } Foo other = (Foo) obj; - if (this.bar == null) { - if (other.bar != null) { - return false; - } - } - else if (!this.bar.equals(other.bar)) { + if (!this.bar.equals(other.bar)) { return false; } return true; @@ -2728,7 +2716,6 @@ public List> filterBatch( return records; } - } public static class FooConverter implements Converter { @@ -2797,18 +2784,7 @@ interface ProjectionSample { } - static class CustomMethodArgument { - - final String body; - - final String topic; - - CustomMethodArgument(String body, String topic) { - this.body = body; - this.topic = topic; - } - + record CustomMethodArgument(String body, String topic) { } - }