diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java index a746d2aabb..b9719e050c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ExceptionClassifier.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -121,7 +121,7 @@ protected BinaryExceptionClassifier getClassifier() { * All others will be retried. * When calling this method, the defaults will not be applied. * @param classifications the classifications. - * @param defaultValue whether or not to retry non-matching exceptions. + * @param defaultValue whether to retry non-matching exceptions. * @see BinaryExceptionClassifier#BinaryExceptionClassifier(Map, boolean) * @see #addNotRetryableExceptions(Class...) */ @@ -219,6 +219,7 @@ public Boolean removeClassification(Class exceptionType) { * @author Gary Russell * */ + @SuppressWarnings("serial") private static final class ExtendedBinaryExceptionClassifier extends BinaryExceptionClassifier { ExtendedBinaryExceptionClassifier(Map, Boolean> typeMap, boolean defaultValue) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index e39c5adccd..73f9afd235 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -125,6 +125,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.ContainerTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.transaction.PlatformTransactionManager; @@ -140,6 +141,7 @@ * @author Lukasz Kaminski * @author Ray Chuan Tay * @author Daniel Gentes + * @author Soby Chacko */ @EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2, KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4, @@ -928,7 +930,7 @@ public void testRecordAckAfterStop() throws Exception { Consumer consumer = mock(Consumer.class); given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer); final Map>> records = new HashMap<>(); - records.put(new TopicPartition("foo", 0), Arrays.asList( + records.put(new TopicPartition("foo", 0), List.of( new ConsumerRecord<>("foo", 0, 0L, 1, "foo"))); ConsumerRecords consumerRecords = new ConsumerRecords<>(records); given(consumer.poll(any(Duration.class))).willAnswer(i -> { @@ -1343,7 +1345,6 @@ else if (entry.getValue().offset() == 2) { logger.info("Stop batch listener manual"); } - @SuppressWarnings("deprecation") @Test public void testBatchListenerErrors() throws Exception { logger.info("Start batch listener errors"); @@ -1416,7 +1417,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords data, C logger.info("Stop batch listener errors"); } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked"}) @Test public void testBatchListenerAckAfterRecoveryMock() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); @@ -1679,7 +1680,7 @@ public void testDefinedPartitions() throws Exception { @Override protected KafkaConsumer createKafkaConsumer(Map configs) { assertThat(configs).containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG); - return new KafkaConsumer(props) { + return new KafkaConsumer<>(props) { @Override public ConsumerRecords poll(Duration timeout) { @@ -2280,10 +2281,8 @@ public void testStaticAssign() throws Exception { Map props = KafkaTestUtils.consumerProps("testStatic", "false", embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); - ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset[] { - new TopicPartitionOffset(topic22, 0), - new TopicPartitionOffset(topic22, 1) - }); + ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic22, 0), + new TopicPartitionOffset(topic22, 1)); final CountDownLatch latch = new CountDownLatch(1); final List> received = new ArrayList<>(); containerProps.setMessageListener((MessageListener) record -> { @@ -2361,7 +2360,7 @@ public void testBadListenerType() { containerProps.setMissingTopicsFatal(false); KafkaMessageListenerContainer badContainer = new KafkaMessageListenerContainer<>(cf, containerProps); - assertThatIllegalStateException().isThrownBy(() -> badContainer.start()) + assertThatIllegalStateException().isThrownBy(badContainer::start) .withMessageContaining("implementation must be provided"); badContainer.setupMessageListener((GenericMessageListener) data -> { }); @@ -2369,7 +2368,7 @@ public void testBadListenerType() { badContainer.pause(); assertThat(badContainer.isContainerPaused()).isFalse(); assertThat(badContainer.metrics()).isEqualTo(Collections.emptyMap()); - assertThatIllegalArgumentException().isThrownBy(() -> badContainer.start()) + assertThatIllegalArgumentException().isThrownBy(badContainer::start) .withMessageContaining("Listener must be"); assertThat(badContainer.toString()).contains("none assigned"); @@ -2386,7 +2385,7 @@ public void testBadAckMode() { new KafkaMessageListenerContainer<>(cf, containerProps); badContainer.setupMessageListener((MessageListener) m -> { }); - assertThatIllegalStateException().isThrownBy(() -> badContainer.start()) + assertThatIllegalStateException().isThrownBy(badContainer::start) .withMessageContaining("Consumer cannot be configured for auto commit for ackMode"); } @@ -2565,14 +2564,16 @@ public void onPartitionsAssigned(Map assignments, Consumer public void onMessage(ConsumerRecord data) { if (data.partition() == 0 && data.offset() == 0) { TopicPartition topicPartition = new TopicPartition(data.topic(), data.partition()); - getSeekCallbackFor(topicPartition).seekToBeginning(records.keySet()); + final ConsumerSeekCallback seekCallbackFor = getSeekCallbackFor(topicPartition); + assertThat(seekCallbackFor).isNotNull(); + seekCallbackFor.seekToBeginning(records.keySet()); Iterator iterator = records.keySet().iterator(); - getSeekCallbackFor(topicPartition).seekToBeginning(Collections.singletonList(iterator.next())); - getSeekCallbackFor(topicPartition).seekToBeginning(Collections.singletonList(iterator.next())); - getSeekCallbackFor(topicPartition).seekToEnd(records.keySet()); + seekCallbackFor.seekToBeginning(Collections.singletonList(iterator.next())); + seekCallbackFor.seekToBeginning(Collections.singletonList(iterator.next())); + seekCallbackFor.seekToEnd(records.keySet()); iterator = records.keySet().iterator(); - getSeekCallbackFor(topicPartition).seekToEnd(Collections.singletonList(iterator.next())); - getSeekCallbackFor(topicPartition).seekToEnd(Collections.singletonList(iterator.next())); + seekCallbackFor.seekToEnd(Collections.singletonList(iterator.next())); + seekCallbackFor.seekToEnd(Collections.singletonList(iterator.next())); } } @@ -2678,7 +2679,7 @@ public void dontResumePausedPartition() throws Exception { containerProps.setAckMode(AckMode.RECORD); containerProps.setClientId("clientId"); containerProps.setIdleEventInterval(100L); - containerProps.setMessageListener((MessageListener) rec -> { }); + containerProps.setMessageListener((MessageListener) rec -> { }); containerProps.setMissingTopicsFatal(false); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); @@ -2745,7 +2746,7 @@ public void rePausePartitionAfterRebalance() throws Exception { containerProps.setAckMode(AckMode.RECORD); containerProps.setClientId("clientId"); containerProps.setIdleEventInterval(100L); - containerProps.setMessageListener((MessageListener) rec -> { }); + containerProps.setMessageListener((MessageListener) rec -> { }); containerProps.setMissingTopicsFatal(false); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); @@ -2827,7 +2828,7 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception { containerProps.setAckMode(AckMode.RECORD); containerProps.setClientId("clientId"); containerProps.setIdleEventInterval(100L); - containerProps.setMessageListener((MessageListener) rec -> { }); + containerProps.setMessageListener((MessageListener) rec -> { }); containerProps.setMissingTopicsFatal(false); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); @@ -2955,7 +2956,7 @@ public void testIdleEarlyExit() throws Exception { container.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); new DirectFieldAccessor(container).setPropertyValue("listenerConsumer.assignedPartitions", - Arrays.asList(new TopicPartition("foo", 0))); + List.of(new TopicPartition("foo", 0))); Thread.sleep(500); long t1 = System.currentTimeMillis(); container.stop(); @@ -3060,16 +3061,12 @@ public void testAckModeCount() throws Exception { given(consumer.poll(any(Duration.class))).willAnswer(i -> { Thread.sleep(50); int recordsToUse = which.incrementAndGet(); - switch (recordsToUse) { - case 1: - return consumerRecords1; - case 2: - return consumerRecords2; - case 3: - return consumerRecords3; - default: - return emptyRecords; - } + return switch (recordsToUse) { + case 1 -> consumerRecords1; + case 2 -> consumerRecords2; + case 3 -> consumerRecords3; + default -> emptyRecords; + }; }); final CountDownLatch commitLatch = new CountDownLatch(3); willAnswer(i -> { @@ -3107,7 +3104,7 @@ public void testAckModeCount() throws Exception { container.stop(); } - @SuppressWarnings({ "unchecked", "rawtypes", "deprecation" }) + @SuppressWarnings({ "unchecked", "rawtypes"}) @Test public void testCommitErrorHandlerCalled() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); @@ -3435,7 +3432,7 @@ public void testCooperativeRebalance() throws Exception { ContainerProperties containerProps = new ContainerProperties("foo"); containerProps.setGroupId("grp"); containerProps.setClientId("clientId"); - containerProps.setMessageListener((MessageListener) msg -> { }); + containerProps.setMessageListener((MessageListener) msg -> { }); Properties consumerProps = new Properties(); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); @@ -3467,7 +3464,7 @@ void testCommitRebalanceInProgressRecord() throws Exception { assertThat(commits.get(5)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked assertThat(commits.get(5).get(new TopicPartition("foo", 1))) .isNotNull() - .extracting(om -> om.offset()) + .extracting(OffsetAndMetadata::offset) .isEqualTo(2L); }); } @@ -3527,7 +3524,7 @@ else if (call == 1) { containerProps.setAckMode(ackMode); containerProps.setClientId("clientId"); containerProps.setIdleEventInterval(100L); - containerProps.setMessageListener((MessageListener) msg -> { }); + containerProps.setMessageListener((MessageListener) msg -> { }); Properties consumerProps = new Properties(); containerProps.setKafkaConsumerProperties(consumerProps); KafkaMessageListenerContainer container = @@ -3538,7 +3535,7 @@ else if (call == 1) { verifier.accept(commits); } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({ "unchecked"}) @Test void testCommitFailsOnRevoke() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); @@ -3671,7 +3668,7 @@ void commitAfterHandleManual() throws InterruptedException { cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000); // wins given(cf.getConfigurationProperties()).willReturn(cfProps); final Map>> records = new HashMap<>(); - records.put(new TopicPartition("foo", 0), Arrays.asList( + records.put(new TopicPartition("foo", 0), List.of( new ConsumerRecord<>("foo", 0, 0L, 1, "foo"))); ConsumerRecords consumerRecords = new ConsumerRecords<>(records); ConsumerRecords emptyRecords = new ConsumerRecords<>(Collections.emptyMap()); @@ -3754,7 +3751,7 @@ void stopImmediately() throws InterruptedException { } @Test - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings({"unchecked"}) public void testInvokeRecordInterceptorSuccess() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -3796,7 +3793,7 @@ public void onMessage(ConsumerRecord data) { RecordInterceptor recordInterceptor = spy(new RecordInterceptor() { @Override - @Nullable + @NonNull public ConsumerRecord intercept(ConsumerRecord record, Consumer consumer) { @@ -3842,7 +3839,7 @@ private static Stream paramsForRecordAllSkipped() { @ParameterizedTest(name = "{index} testInvokeRecordInterceptorAllSkipped AckMode.{0} early intercept {1}") @MethodSource("paramsForRecordAllSkipped") - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings({"unchecked"}) public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early) throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -3869,7 +3866,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early containerProps.setGroupId("grp"); containerProps.setAckMode(ackMode); - containerProps.setMessageListener((MessageListener) msg -> { + containerProps.setMessageListener((MessageListener) msg -> { }); containerProps.setClientId("clientId"); @@ -3912,7 +3909,7 @@ public ConsumerRecord intercept(ConsumerRecord @ParameterizedTest(name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}") @ValueSource(booleans = { true, false }) - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings({"unchecked"}) public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -3939,7 +3936,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception containerProps.setGroupId("grp"); containerProps.setAckMode(AckMode.BATCH); - containerProps.setMessageListener((BatchMessageListener) msgs -> { + containerProps.setMessageListener((BatchMessageListener) msgs -> { }); containerProps.setClientId("clientId"); if (!early) { @@ -3975,7 +3972,7 @@ public ConsumerRecords intercept(ConsumerRecords cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -4015,7 +4012,7 @@ public void onMessage(ConsumerRecord data) { RecordInterceptor recordInterceptor = spy(new RecordInterceptor() { @Override - @Nullable + @NonNull public ConsumerRecord intercept(ConsumerRecord record, Consumer consumer) {