From 33332425ff9f36dc32f1c36cd6c7fca54407b225 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 18 Jan 2024 17:37:07 -0500 Subject: [PATCH] Minor cleanup in KMLCT Cleanup redundant suppress warnings, other Java 17 improvements etc. in KafkaMessageListenerContainerTests --- .../KafkaMessageListenerContainerTests.java | 92 +++++++++---------- 1 file changed, 44 insertions(+), 48 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 80e7439a33..73f9afd235 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -125,6 +125,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.ContainerTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.transaction.PlatformTransactionManager; @@ -140,6 +141,7 @@ * @author Lukasz Kaminski * @author Ray Chuan Tay * @author Daniel Gentes + * @author Soby Chacko */ @EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2, KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4, @@ -235,8 +237,7 @@ public void testDelegateType() throws Exception { container.setBeanName("delegate"); AtomicReference> offsets = new AtomicReference<>(); container.setApplicationEventPublisher(e -> { - if (e instanceof ConsumerStoppingEvent) { - ConsumerStoppingEvent event = (ConsumerStoppingEvent) e; + if (e instanceof ConsumerStoppingEvent event) { offsets.set(event.getPartitions().stream() .map(p -> new TopicPartitionOffset(p.topic(), p.partition(), event.getConsumer().position(p, Duration.ofMillis(10_000)))) @@ -929,7 +930,7 @@ public void testRecordAckAfterStop() throws Exception { Consumer consumer = mock(Consumer.class); given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer); final Map>> records = new HashMap<>(); - records.put(new TopicPartition("foo", 0), Arrays.asList( + records.put(new TopicPartition("foo", 0), List.of( new ConsumerRecord<>("foo", 0, 0L, 1, "foo"))); ConsumerRecords consumerRecords = new ConsumerRecords<>(records); given(consumer.poll(any(Duration.class))).willAnswer(i -> { @@ -1344,7 +1345,6 @@ else if (entry.getValue().offset() == 2) { logger.info("Stop batch listener manual"); } - @SuppressWarnings("deprecation") @Test public void testBatchListenerErrors() throws Exception { logger.info("Start batch listener errors"); @@ -1417,7 +1417,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords data, C logger.info("Stop batch listener errors"); } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked"}) @Test public void testBatchListenerAckAfterRecoveryMock() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); @@ -1680,7 +1680,7 @@ public void testDefinedPartitions() throws Exception { @Override protected KafkaConsumer createKafkaConsumer(Map configs) { assertThat(configs).containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG); - return new KafkaConsumer(props) { + return new KafkaConsumer<>(props) { @Override public ConsumerRecords poll(Duration timeout) { @@ -2281,10 +2281,8 @@ public void testStaticAssign() throws Exception { Map props = KafkaTestUtils.consumerProps("testStatic", "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); - ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset[] { - new TopicPartitionOffset(topic22, 0), - new TopicPartitionOffset(topic22, 1) - }); + ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic22, 0), + new TopicPartitionOffset(topic22, 1)); final CountDownLatch latch = new CountDownLatch(1); final List> received = new ArrayList<>(); containerProps.setMessageListener((MessageListener) record -> { @@ -2362,7 +2360,7 @@ public void testBadListenerType() { containerProps.setMissingTopicsFatal(false); KafkaMessageListenerContainer badContainer = new KafkaMessageListenerContainer<>(cf, containerProps); - assertThatIllegalStateException().isThrownBy(() -> badContainer.start()) + assertThatIllegalStateException().isThrownBy(badContainer::start) .withMessageContaining("implementation must be provided"); badContainer.setupMessageListener((GenericMessageListener) data -> { }); @@ -2370,7 +2368,7 @@ public void testBadListenerType() { badContainer.pause(); assertThat(badContainer.isContainerPaused()).isFalse(); assertThat(badContainer.metrics()).isEqualTo(Collections.emptyMap()); - assertThatIllegalArgumentException().isThrownBy(() -> badContainer.start()) + assertThatIllegalArgumentException().isThrownBy(badContainer::start) .withMessageContaining("Listener must be"); assertThat(badContainer.toString()).contains("none assigned"); @@ -2387,7 +2385,7 @@ public void testBadAckMode() { new KafkaMessageListenerContainer<>(cf, containerProps); badContainer.setupMessageListener((MessageListener) m -> { }); - assertThatIllegalStateException().isThrownBy(() -> badContainer.start()) + assertThatIllegalStateException().isThrownBy(badContainer::start) .withMessageContaining("Consumer cannot be configured for auto commit for ackMode"); } @@ -2566,14 +2564,16 @@ public void onPartitionsAssigned(Map assignments, Consumer public void onMessage(ConsumerRecord data) { if (data.partition() == 0 && data.offset() == 0) { TopicPartition topicPartition = new TopicPartition(data.topic(), data.partition()); - getSeekCallbackFor(topicPartition).seekToBeginning(records.keySet()); + final ConsumerSeekCallback seekCallbackFor = getSeekCallbackFor(topicPartition); + assertThat(seekCallbackFor).isNotNull(); + seekCallbackFor.seekToBeginning(records.keySet()); Iterator iterator = records.keySet().iterator(); - getSeekCallbackFor(topicPartition).seekToBeginning(Collections.singletonList(iterator.next())); - getSeekCallbackFor(topicPartition).seekToBeginning(Collections.singletonList(iterator.next())); - getSeekCallbackFor(topicPartition).seekToEnd(records.keySet()); + seekCallbackFor.seekToBeginning(Collections.singletonList(iterator.next())); + seekCallbackFor.seekToBeginning(Collections.singletonList(iterator.next())); + seekCallbackFor.seekToEnd(records.keySet()); iterator = records.keySet().iterator(); - getSeekCallbackFor(topicPartition).seekToEnd(Collections.singletonList(iterator.next())); - getSeekCallbackFor(topicPartition).seekToEnd(Collections.singletonList(iterator.next())); + seekCallbackFor.seekToEnd(Collections.singletonList(iterator.next())); + seekCallbackFor.seekToEnd(Collections.singletonList(iterator.next())); } } @@ -2679,7 +2679,7 @@ public void dontResumePausedPartition() throws Exception { containerProps.setAckMode(AckMode.RECORD); containerProps.setClientId("clientId"); containerProps.setIdleEventInterval(100L); - containerProps.setMessageListener((MessageListener) rec -> { }); + containerProps.setMessageListener((MessageListener) rec -> { }); containerProps.setMissingTopicsFatal(false); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); @@ -2746,7 +2746,7 @@ public void rePausePartitionAfterRebalance() throws Exception { containerProps.setAckMode(AckMode.RECORD); containerProps.setClientId("clientId"); containerProps.setIdleEventInterval(100L); - containerProps.setMessageListener((MessageListener) rec -> { }); + containerProps.setMessageListener((MessageListener) rec -> { }); containerProps.setMissingTopicsFatal(false); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); @@ -2828,7 +2828,7 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception { containerProps.setAckMode(AckMode.RECORD); containerProps.setClientId("clientId"); containerProps.setIdleEventInterval(100L); - containerProps.setMessageListener((MessageListener) rec -> { }); + containerProps.setMessageListener((MessageListener) rec -> { }); containerProps.setMissingTopicsFatal(false); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); @@ -2956,7 +2956,7 @@ public void testIdleEarlyExit() throws Exception { container.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); new DirectFieldAccessor(container).setPropertyValue("listenerConsumer.assignedPartitions", - Arrays.asList(new TopicPartition("foo", 0))); + List.of(new TopicPartition("foo", 0))); Thread.sleep(500); long t1 = System.currentTimeMillis(); container.stop(); @@ -3061,16 +3061,12 @@ public void testAckModeCount() throws Exception { given(consumer.poll(any(Duration.class))).willAnswer(i -> { Thread.sleep(50); int recordsToUse = which.incrementAndGet(); - switch (recordsToUse) { - case 1: - return consumerRecords1; - case 2: - return consumerRecords2; - case 3: - return consumerRecords3; - default: - return emptyRecords; - } + return switch (recordsToUse) { + case 1 -> consumerRecords1; + case 2 -> consumerRecords2; + case 3 -> consumerRecords3; + default -> emptyRecords; + }; }); final CountDownLatch commitLatch = new CountDownLatch(3); willAnswer(i -> { @@ -3108,7 +3104,7 @@ public void testAckModeCount() throws Exception { container.stop(); } - @SuppressWarnings({ "unchecked", "rawtypes", "deprecation" }) + @SuppressWarnings({ "unchecked", "rawtypes"}) @Test public void testCommitErrorHandlerCalled() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); @@ -3436,7 +3432,7 @@ public void testCooperativeRebalance() throws Exception { ContainerProperties containerProps = new ContainerProperties("foo"); containerProps.setGroupId("grp"); containerProps.setClientId("clientId"); - containerProps.setMessageListener((MessageListener) msg -> { }); + containerProps.setMessageListener((MessageListener) msg -> { }); Properties consumerProps = new Properties(); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); @@ -3468,7 +3464,7 @@ void testCommitRebalanceInProgressRecord() throws Exception { assertThat(commits.get(5)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked assertThat(commits.get(5).get(new TopicPartition("foo", 1))) .isNotNull() - .extracting(om -> om.offset()) + .extracting(OffsetAndMetadata::offset) .isEqualTo(2L); }); } @@ -3528,7 +3524,7 @@ else if (call == 1) { containerProps.setAckMode(ackMode); containerProps.setClientId("clientId"); containerProps.setIdleEventInterval(100L); - containerProps.setMessageListener((MessageListener) msg -> { }); + containerProps.setMessageListener((MessageListener) msg -> { }); Properties consumerProps = new Properties(); containerProps.setKafkaConsumerProperties(consumerProps); KafkaMessageListenerContainer container = @@ -3539,7 +3535,7 @@ else if (call == 1) { verifier.accept(commits); } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({ "unchecked"}) @Test void testCommitFailsOnRevoke() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); @@ -3672,7 +3668,7 @@ void commitAfterHandleManual() throws InterruptedException { cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000); // wins given(cf.getConfigurationProperties()).willReturn(cfProps); final Map>> records = new HashMap<>(); - records.put(new TopicPartition("foo", 0), Arrays.asList( + records.put(new TopicPartition("foo", 0), List.of( new ConsumerRecord<>("foo", 0, 0L, 1, "foo"))); ConsumerRecords consumerRecords = new ConsumerRecords<>(records); ConsumerRecords emptyRecords = new ConsumerRecords<>(Collections.emptyMap()); @@ -3755,7 +3751,7 @@ void stopImmediately() throws InterruptedException { } @Test - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings({"unchecked"}) public void testInvokeRecordInterceptorSuccess() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -3797,7 +3793,7 @@ public void onMessage(ConsumerRecord data) { RecordInterceptor recordInterceptor = spy(new RecordInterceptor() { @Override - @Nullable + @NonNull public ConsumerRecord intercept(ConsumerRecord record, Consumer consumer) { @@ -3843,7 +3839,7 @@ private static Stream paramsForRecordAllSkipped() { @ParameterizedTest(name = "{index} testInvokeRecordInterceptorAllSkipped AckMode.{0} early intercept {1}") @MethodSource("paramsForRecordAllSkipped") - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings({"unchecked"}) public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early) throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -3870,7 +3866,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early containerProps.setGroupId("grp"); containerProps.setAckMode(ackMode); - containerProps.setMessageListener((MessageListener) msg -> { + containerProps.setMessageListener((MessageListener) msg -> { }); containerProps.setClientId("clientId"); @@ -3913,7 +3909,7 @@ public ConsumerRecord intercept(ConsumerRecord @ParameterizedTest(name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}") @ValueSource(booleans = { true, false }) - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings({"unchecked"}) public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -3940,7 +3936,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception containerProps.setGroupId("grp"); containerProps.setAckMode(AckMode.BATCH); - containerProps.setMessageListener((BatchMessageListener) msgs -> { + containerProps.setMessageListener((BatchMessageListener) msgs -> { }); containerProps.setClientId("clientId"); if (!early) { @@ -3976,7 +3972,7 @@ public ConsumerRecords intercept(ConsumerRecords cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -4016,7 +4012,7 @@ public void onMessage(ConsumerRecord data) { RecordInterceptor recordInterceptor = spy(new RecordInterceptor() { @Override - @Nullable + @NonNull public ConsumerRecord intercept(ConsumerRecord record, Consumer consumer) {