Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor cleanup in KMLCT #2997

Merged
merged 1 commit into from
Jan 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -125,6 +125,7 @@
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
Expand All @@ -140,6 +141,7 @@
* @author Lukasz Kaminski
* @author Ray Chuan Tay
* @author Daniel Gentes
* @author Soby Chacko
*/
@EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2,
KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4,
Expand Down Expand Up @@ -235,8 +237,7 @@ public void testDelegateType() throws Exception {
container.setBeanName("delegate");
AtomicReference<List<TopicPartitionOffset>> offsets = new AtomicReference<>();
container.setApplicationEventPublisher(e -> {
if (e instanceof ConsumerStoppingEvent) {
ConsumerStoppingEvent event = (ConsumerStoppingEvent) e;
if (e instanceof ConsumerStoppingEvent event) {
offsets.set(event.getPartitions().stream()
.map(p -> new TopicPartitionOffset(p.topic(), p.partition(),
event.getConsumer().position(p, Duration.ofMillis(10_000))))
Expand Down Expand Up @@ -929,7 +930,7 @@ public void testRecordAckAfterStop() throws Exception {
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
records.put(new TopicPartition("foo", 0), Arrays.asList(
records.put(new TopicPartition("foo", 0), List.of(
new ConsumerRecord<>("foo", 0, 0L, 1, "foo")));
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Expand Down Expand Up @@ -1344,7 +1345,6 @@ else if (entry.getValue().offset() == 2) {
logger.info("Stop batch listener manual");
}

@SuppressWarnings("deprecation")
@Test
public void testBatchListenerErrors() throws Exception {
logger.info("Start batch listener errors");
Expand Down Expand Up @@ -1417,7 +1417,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
logger.info("Stop batch listener errors");
}

@SuppressWarnings({ "unchecked", "deprecation" })
@SuppressWarnings({ "unchecked"})
@Test
public void testBatchListenerAckAfterRecoveryMock() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Expand Down Expand Up @@ -1680,7 +1680,7 @@ public void testDefinedPartitions() throws Exception {
@Override
protected KafkaConsumer<Integer, String> createKafkaConsumer(Map<String, Object> configs) {
assertThat(configs).containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
return new KafkaConsumer<Integer, String>(props) {
return new KafkaConsumer<>(props) {

@Override
public ConsumerRecords<Integer, String> poll(Duration timeout) {
Expand Down Expand Up @@ -2281,10 +2281,8 @@ public void testStaticAssign() throws Exception {
Map<String, Object> props = KafkaTestUtils.consumerProps("testStatic", "false", embeddedKafka);

DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset[] {
new TopicPartitionOffset(topic22, 0),
new TopicPartitionOffset(topic22, 1)
});
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topic22, 0),
new TopicPartitionOffset(topic22, 1));
final CountDownLatch latch = new CountDownLatch(1);
final List<ConsumerRecord<Integer, String>> received = new ArrayList<>();
containerProps.setMessageListener((MessageListener<Integer, String>) record -> {
Expand Down Expand Up @@ -2362,15 +2360,15 @@ public void testBadListenerType() {
containerProps.setMissingTopicsFatal(false);
KafkaMessageListenerContainer<Integer, Foo1> badContainer =
new KafkaMessageListenerContainer<>(cf, containerProps);
assertThatIllegalStateException().isThrownBy(() -> badContainer.start())
assertThatIllegalStateException().isThrownBy(badContainer::start)
.withMessageContaining("implementation must be provided");
badContainer.setupMessageListener((GenericMessageListener<String>) data -> {
});
assertThat(badContainer.getAssignedPartitions()).isNull();
badContainer.pause();
assertThat(badContainer.isContainerPaused()).isFalse();
assertThat(badContainer.metrics()).isEqualTo(Collections.emptyMap());
assertThatIllegalArgumentException().isThrownBy(() -> badContainer.start())
assertThatIllegalArgumentException().isThrownBy(badContainer::start)
.withMessageContaining("Listener must be");
assertThat(badContainer.toString()).contains("none assigned");

Expand All @@ -2387,7 +2385,7 @@ public void testBadAckMode() {
new KafkaMessageListenerContainer<>(cf, containerProps);
badContainer.setupMessageListener((MessageListener<String, String>) m -> {
});
assertThatIllegalStateException().isThrownBy(() -> badContainer.start())
assertThatIllegalStateException().isThrownBy(badContainer::start)
.withMessageContaining("Consumer cannot be configured for auto commit for ackMode");

}
Expand Down Expand Up @@ -2566,14 +2564,16 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
public void onMessage(ConsumerRecord<String, String> data) {
if (data.partition() == 0 && data.offset() == 0) {
TopicPartition topicPartition = new TopicPartition(data.topic(), data.partition());
getSeekCallbackFor(topicPartition).seekToBeginning(records.keySet());
final ConsumerSeekCallback seekCallbackFor = getSeekCallbackFor(topicPartition);
assertThat(seekCallbackFor).isNotNull();
seekCallbackFor.seekToBeginning(records.keySet());
Iterator<TopicPartition> iterator = records.keySet().iterator();
getSeekCallbackFor(topicPartition).seekToBeginning(Collections.singletonList(iterator.next()));
getSeekCallbackFor(topicPartition).seekToBeginning(Collections.singletonList(iterator.next()));
getSeekCallbackFor(topicPartition).seekToEnd(records.keySet());
seekCallbackFor.seekToBeginning(Collections.singletonList(iterator.next()));
seekCallbackFor.seekToBeginning(Collections.singletonList(iterator.next()));
seekCallbackFor.seekToEnd(records.keySet());
iterator = records.keySet().iterator();
getSeekCallbackFor(topicPartition).seekToEnd(Collections.singletonList(iterator.next()));
getSeekCallbackFor(topicPartition).seekToEnd(Collections.singletonList(iterator.next()));
seekCallbackFor.seekToEnd(Collections.singletonList(iterator.next()));
seekCallbackFor.seekToEnd(Collections.singletonList(iterator.next()));
}
}

Expand Down Expand Up @@ -2679,7 +2679,7 @@ public void dontResumePausedPartition() throws Exception {
containerProps.setAckMode(AckMode.RECORD);
containerProps.setClientId("clientId");
containerProps.setIdleEventInterval(100L);
containerProps.setMessageListener((MessageListener) rec -> { });
containerProps.setMessageListener((MessageListener<?, ?>) rec -> { });
containerProps.setMissingTopicsFatal(false);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
Expand Down Expand Up @@ -2746,7 +2746,7 @@ public void rePausePartitionAfterRebalance() throws Exception {
containerProps.setAckMode(AckMode.RECORD);
containerProps.setClientId("clientId");
containerProps.setIdleEventInterval(100L);
containerProps.setMessageListener((MessageListener) rec -> { });
containerProps.setMessageListener((MessageListener<?, ?>) rec -> { });
containerProps.setMissingTopicsFatal(false);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
Expand Down Expand Up @@ -2828,7 +2828,7 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception {
containerProps.setAckMode(AckMode.RECORD);
containerProps.setClientId("clientId");
containerProps.setIdleEventInterval(100L);
containerProps.setMessageListener((MessageListener) rec -> { });
containerProps.setMessageListener((MessageListener<?, ?>) rec -> { });
containerProps.setMissingTopicsFatal(false);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
Expand Down Expand Up @@ -2956,7 +2956,7 @@ public void testIdleEarlyExit() throws Exception {
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
new DirectFieldAccessor(container).setPropertyValue("listenerConsumer.assignedPartitions",
Arrays.asList(new TopicPartition("foo", 0)));
List.of(new TopicPartition("foo", 0)));
Thread.sleep(500);
long t1 = System.currentTimeMillis();
container.stop();
Expand Down Expand Up @@ -3061,16 +3061,12 @@ public void testAckModeCount() throws Exception {
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Thread.sleep(50);
int recordsToUse = which.incrementAndGet();
switch (recordsToUse) {
case 1:
return consumerRecords1;
case 2:
return consumerRecords2;
case 3:
return consumerRecords3;
default:
return emptyRecords;
}
return switch (recordsToUse) {
case 1 -> consumerRecords1;
case 2 -> consumerRecords2;
case 3 -> consumerRecords3;
default -> emptyRecords;
};
});
final CountDownLatch commitLatch = new CountDownLatch(3);
willAnswer(i -> {
Expand Down Expand Up @@ -3108,7 +3104,7 @@ public void testAckModeCount() throws Exception {
container.stop();
}

@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
@SuppressWarnings({ "unchecked", "rawtypes"})
@Test
public void testCommitErrorHandlerCalled() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Expand Down Expand Up @@ -3436,7 +3432,7 @@ public void testCooperativeRebalance() throws Exception {
ContainerProperties containerProps = new ContainerProperties("foo");
containerProps.setGroupId("grp");
containerProps.setClientId("clientId");
containerProps.setMessageListener((MessageListener) msg -> { });
containerProps.setMessageListener((MessageListener<?, ?>) msg -> { });
Properties consumerProps = new Properties();
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
Expand Down Expand Up @@ -3468,7 +3464,7 @@ void testCommitRebalanceInProgressRecord() throws Exception {
assertThat(commits.get(5)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
assertThat(commits.get(5).get(new TopicPartition("foo", 1)))
.isNotNull()
.extracting(om -> om.offset())
.extracting(OffsetAndMetadata::offset)
.isEqualTo(2L);
});
}
Expand Down Expand Up @@ -3528,7 +3524,7 @@ else if (call == 1) {
containerProps.setAckMode(ackMode);
containerProps.setClientId("clientId");
containerProps.setIdleEventInterval(100L);
containerProps.setMessageListener((MessageListener) msg -> { });
containerProps.setMessageListener((MessageListener<?, ?>) msg -> { });
Properties consumerProps = new Properties();
containerProps.setKafkaConsumerProperties(consumerProps);
KafkaMessageListenerContainer<Integer, String> container =
Expand All @@ -3539,7 +3535,7 @@ else if (call == 1) {
verifier.accept(commits);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@SuppressWarnings({ "unchecked"})
@Test
void testCommitFailsOnRevoke() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Expand Down Expand Up @@ -3672,7 +3668,7 @@ void commitAfterHandleManual() throws InterruptedException {
cfProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 45000); // wins
given(cf.getConfigurationProperties()).willReturn(cfProps);
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
records.put(new TopicPartition("foo", 0), Arrays.asList(
records.put(new TopicPartition("foo", 0), List.of(
new ConsumerRecord<>("foo", 0, 0L, 1, "foo")));
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
Expand Down Expand Up @@ -3755,7 +3751,7 @@ void stopImmediately() throws InterruptedException {
}

@Test
@SuppressWarnings({"unchecked", "deprecation"})
@SuppressWarnings({"unchecked"})
public void testInvokeRecordInterceptorSuccess() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
Expand Down Expand Up @@ -3797,7 +3793,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {

@Override
@Nullable
@NonNull
public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
Consumer<Integer, String> consumer) {

Expand Down Expand Up @@ -3843,7 +3839,7 @@ private static Stream<Arguments> paramsForRecordAllSkipped() {

@ParameterizedTest(name = "{index} testInvokeRecordInterceptorAllSkipped AckMode.{0} early intercept {1}")
@MethodSource("paramsForRecordAllSkipped")
@SuppressWarnings({"unchecked", "deprecation"})
@SuppressWarnings({"unchecked"})
public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early) throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
Expand All @@ -3870,7 +3866,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
containerProps.setGroupId("grp");
containerProps.setAckMode(ackMode);

containerProps.setMessageListener((MessageListener) msg -> {
containerProps.setMessageListener((MessageListener<?, ?>) msg -> {
});
containerProps.setClientId("clientId");

Expand Down Expand Up @@ -3913,7 +3909,7 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>

@ParameterizedTest(name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}")
@ValueSource(booleans = { true, false })
@SuppressWarnings({"unchecked", "deprecation"})
@SuppressWarnings({"unchecked"})
public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
Expand All @@ -3940,7 +3936,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception
containerProps.setGroupId("grp");
containerProps.setAckMode(AckMode.BATCH);

containerProps.setMessageListener((BatchMessageListener) msgs -> {
containerProps.setMessageListener((BatchMessageListener<?, ?>) msgs -> {
});
containerProps.setClientId("clientId");
if (!early) {
Expand Down Expand Up @@ -3976,7 +3972,7 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
}

@Test
@SuppressWarnings({"unchecked", "deprecation"})
@SuppressWarnings({"unchecked"})
public void testInvokeRecordInterceptorFailure() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
Expand Down Expand Up @@ -4016,7 +4012,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {

@Override
@Nullable
@NonNull
public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
Consumer<Integer, String> consumer) {

Expand Down