Skip to content

Commit

Permalink
cleanup ConcurrentMessageListenerContainerMockTests
Browse files Browse the repository at this point in the history
* change `Arrays.asList` to `List.of`
  • Loading branch information
Wzy19930507 committed Feb 17, 2024
1 parent 22764cb commit da4dd42
Showing 1 changed file with 48 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,6 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -130,7 +129,7 @@ else if (event instanceof ConsumerFailedToStartEvent) {
exec.destroy();
}

@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void testCorrectContainerForConsumerError() throws InterruptedException {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
Expand Down Expand Up @@ -200,10 +199,8 @@ void delayedIdleEvent() throws InterruptedException {
containerProperties);
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(2);
AtomicReference<Long> eventTime = new AtomicReference<>();
container.setApplicationEventPublisher(event -> {
if (event instanceof ListenerContainerIdleEvent) {
eventTime.set(System.currentTimeMillis());
latch1.countDown();
latch2.countDown();
}
Expand Down Expand Up @@ -263,7 +260,7 @@ void testSyncRelativeSeeks() throws InterruptedException {
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> assignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -304,7 +301,7 @@ void testAsyncRelativeSeeks() throws InterruptedException {
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> assignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar")));
recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
Expand Down Expand Up @@ -363,7 +360,7 @@ void testSyncTimestampSeeks() throws InterruptedException {
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> assignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -410,7 +407,7 @@ void testAsyncTimestampSeeks() throws InterruptedException {
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> assignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar")));
recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
Expand Down Expand Up @@ -508,7 +505,9 @@ void testBatchInterceptBeforeTx1() throws InterruptedException {
}

@SuppressWarnings({ "rawtypes", "unchecked" })
void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batch) throws InterruptedException {
void testIntercept(boolean beforeTx, @Nullable AssignmentCommitOption option, boolean batch)
throws InterruptedException {

ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = mock(Consumer.class);
TopicPartition tp0 = new TopicPartition("foo", 0);
Expand All @@ -523,7 +522,7 @@ void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batc
Thread.sleep(10);
return firstOrSecondPoll.incrementAndGet() < 3 ? records : empty;
}).given(consumer).poll(any());
List<TopicPartition> assignments = Arrays.asList(tp0);
List<TopicPartition> assignments = List.of(tp0);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -676,7 +675,7 @@ void testInterceptInTxNonKafkaTM() throws InterruptedException {
Thread.sleep(10);
return firstOrSecondPoll.incrementAndGet() < 2 ? records : empty;
}).given(consumer).poll(any());
List<TopicPartition> assignments = Arrays.asList(tp0);
List<TopicPartition> assignments = List.of(tp0);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -771,7 +770,7 @@ void testNoCommitOnAssignmentWithEarliest() throws InterruptedException {
return records;
}).given(consumer).poll(any());
TopicPartition tp0 = new TopicPartition("foo", 0);
List<TopicPartition> assignments = Arrays.asList(tp0);
List<TopicPartition> assignments = List.of(tp0);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -814,7 +813,7 @@ private void testInitialCommitIBasedOnCommitted(boolean committed) throws Interr
return records;
}).given(consumer).poll(any());
TopicPartition tp0 = new TopicPartition("foo", 0);
List<TopicPartition> assignments = Arrays.asList(tp0);
List<TopicPartition> assignments = List.of(tp0);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -865,7 +864,7 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept
return null;
}).given(consumer).pause(any());
TopicPartition tp0 = new TopicPartition("foo", 0);
List<TopicPartition> assignments = Arrays.asList(tp0);
List<TopicPartition> assignments = List.of(tp0);
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
willAnswer(invocation -> {
rebal.set(invocation.getArgument(1));
Expand Down Expand Up @@ -911,14 +910,14 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLegacyAssi
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> allAssignments = List.of(tp0, tp1, tp2, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
allRecordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar")));
allRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar")));
allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1, tp3);
List<TopicPartition> afterRevokeAssignments = List.of(tp1, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> afterRevokeRecordMap = new HashMap<>();
afterRevokeRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
afterRevokeRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
Expand Down Expand Up @@ -979,10 +978,11 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
Thread.sleep(50);
pollLatch.countDown();
switch (pollPhase.getAndIncrement()) {
case 0:
case 0 -> {
rebal.get().onPartitionsAssigned(allAssignments);
return allRecords;
case 1:
}
case 1 -> {
rebal.get().onPartitionsRevoked(allAssignments);
rebal.get().onPartitionsAssigned(afterRevokeAssignments);
rebalLatch.countDown();
Expand All @@ -991,11 +991,13 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
return ConsumerRecords.empty();
}
return afterRevokeRecords;
default:
}
default -> {
if (paused.get()) {
return ConsumerRecords.empty();
}
return afterRevokeRecords;
}
}
}).given(consumer).poll(any());
container.start();
Expand Down Expand Up @@ -1023,7 +1025,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> allAssignments = List.of(tp0, tp1, tp2, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new LinkedHashMap<>();
ConsumerRecord record0 = new ConsumerRecord("foo", 0, 0, null, "bar");
ConsumerRecord record1 = new ConsumerRecord("foo", 1, 0, null, "bar");
Expand All @@ -1032,7 +1034,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign
allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar")));
allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
List<TopicPartition> revokedAssignments = Arrays.asList(tp0, tp2);
List<TopicPartition> revokedAssignments = List.of(tp0, tp2);
AtomicInteger pollPhase = new AtomicInteger();

Consumer consumer = mock(Consumer.class);
Expand All @@ -1044,9 +1046,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign
return null;
}).given(consumer).subscribe(any(Collection.class), any());
CountDownLatch pauseLatch = new CountDownLatch(1);
AtomicBoolean paused = new AtomicBoolean();
willAnswer(inv -> {
paused.set(true);
pauseLatch.countDown();
return null;
}).given(consumer).pause(any());
Expand Down Expand Up @@ -1087,17 +1087,20 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
Thread.sleep(50);
pollLatch.countDown();
switch (pollPhase.getAndIncrement()) {
case 0:
case 0 -> {
rebal.get().onPartitionsAssigned(allAssignments);
return allRecords;
case 1:
}
case 1 -> {
rebal.get().onPartitionsRevoked(revokedAssignments);
rebal.get().onPartitionsAssigned(Collections.emptyList());
rebalLatch.countDown();
continueLatch.await(10, TimeUnit.SECONDS);
return ConsumerRecords.empty();
default:
}
default -> {
return ConsumerRecords.empty();
}
}
}).given(consumer).poll(any());
container.start();
Expand Down Expand Up @@ -1128,14 +1131,14 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws InterruptedException {
TopicPartition tp0 = new TopicPartition("foo", 0);
TopicPartition tp1 = new TopicPartition("foo", 1);
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1);
List<TopicPartition> allAssignments = List.of(tp0, tp1);
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
allRecordMap.put(tp0,
List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar")));
allRecordMap.put(tp1,
List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar")));
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1);
List<TopicPartition> afterRevokeAssignments = List.of(tp1);
AtomicInteger pollPhase = new AtomicInteger();

Consumer consumer = mock(Consumer.class);
Expand All @@ -1147,9 +1150,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I
return null;
}).given(consumer).subscribe(any(Collection.class), any());
CountDownLatch pauseLatch = new CountDownLatch(1);
AtomicBoolean paused = new AtomicBoolean();
willAnswer(inv -> {
paused.set(true);
pauseLatch.countDown();
return null;
}).given(consumer).pause(any());
Expand All @@ -1171,17 +1172,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I
Thread.sleep(50);
pollLatch.countDown();
switch (pollPhase.getAndIncrement()) {
case 0:
case 0 -> {
rebal.get().onPartitionsAssigned(allAssignments);
return allRecords;
case 1:
}
case 1 -> {
rebal.get().onPartitionsRevoked(allAssignments);
rebal.get().onPartitionsAssigned(afterRevokeAssignments);
rebalLatch.countDown();
continueLatch.await(10, TimeUnit.SECONDS);
return ConsumerRecords.empty();
default:
}
default -> {
return ConsumerRecords.empty();
}
}
}).given(consumer).poll(any());
container.start();
Expand All @@ -1206,14 +1210,13 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws InterruptedException {
TopicPartition tp0 = new TopicPartition("foo", 0);
TopicPartition tp1 = new TopicPartition("foo", 1);
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1);
List<TopicPartition> allAssignments = List.of(tp0, tp1);
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
allRecordMap.put(tp0,
List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar")));
allRecordMap.put(tp1,
List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar")));
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1);
AtomicInteger pollPhase = new AtomicInteger();

Consumer consumer = mock(Consumer.class);
Expand All @@ -1225,9 +1228,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int
return null;
}).given(consumer).subscribe(any(Collection.class), any());
CountDownLatch pauseLatch = new CountDownLatch(1);
AtomicBoolean paused = new AtomicBoolean();
willAnswer(inv -> {
paused.set(true);
pauseLatch.countDown();
return null;
}).given(consumer).pause(any());
Expand All @@ -1249,17 +1250,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int
Thread.sleep(50);
pollLatch.countDown();
switch (pollPhase.getAndIncrement()) {
case 0:
case 0 -> {
rebal.get().onPartitionsAssigned(allAssignments);
return allRecords;
case 1:
}
case 1 -> {
rebal.get().onPartitionsRevoked(List.of(tp0));
rebal.get().onPartitionsAssigned(List.of(new TopicPartition("foo", 2)));
rebalLatch.countDown();
continueLatch.await(10, TimeUnit.SECONDS);
return ConsumerRecords.empty();
default:
}
default -> {
return ConsumerRecords.empty();
}
}
}).given(consumer).poll(any());
container.start();
Expand All @@ -1285,7 +1289,7 @@ private AcknowledgingMessageListener ackOffset1() {

@Override
public void onMessage(ConsumerRecord rec, @Nullable Acknowledgment ack) {
if (rec.offset() == 1) {
if (rec.offset() == 1 && ack != null) {
ack.acknowledge();
}
}
Expand All @@ -1299,7 +1303,7 @@ public void onMessage(Object data) {

public static class TestMessageListener1 implements MessageListener<String, String>, ConsumerSeekAware {

private static ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();
private static final ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();

CountDownLatch latch = new CountDownLatch(1);

Expand Down Expand Up @@ -1335,7 +1339,7 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC

public static class TestMessageListener2 implements MessageListener<String, String>, ConsumerSeekAware {

private static ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();
private static final ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();

CountDownLatch latch = new CountDownLatch(1);

Expand Down

0 comments on commit da4dd42

Please sign in to comment.