diff --git a/build.gradle b/build.gradle index d1df1c8481..1bd30204f3 100644 --- a/build.gradle +++ b/build.gradle @@ -63,13 +63,13 @@ ext { kotlinCoroutinesVersion = '1.7.3' log4jVersion = '2.22.1' micrometerDocsVersion = '1.0.2' - micrometerVersion = '1.13.0-M1' - micrometerTracingVersion = '1.3.0-M1' + micrometerVersion = '1.13.0-SNAPSHOT' + micrometerTracingVersion = '1.3.0-SNAPSHOT' mockitoVersion = '5.8.0' reactorVersion = '2023.0.3' scalaVersion = '2.13' springBootVersion = '3.2.2' // docs module - springDataVersion = '2024.0.0-M1' + springDataVersion = '2024.0.0-SNAPSHOT' springRetryVersion = '2.0.5' springVersion = '6.1.4' zookeeperVersion = '3.8.3' diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc index 52f9ec06cc..c83c2bbf0b 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc @@ -65,8 +65,7 @@ In either case, you should NOT perform any seeks on the consumer because the con Starting with version 2.8, the legacy `ErrorHandler` and `BatchErrorHandler` interfaces have been superseded by a new `CommonErrorHandler`. These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener. -`CommonErrorHandler` implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated. -The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release. +`CommonErrorHandler` implementations to replace most legacy framework error handler implementations are provided. See xref:kafka/annotation-error-handling.adoc#migrating-legacy-eh[Migrating Custom Legacy Error Handler Implementations to `CommonErrorHandler`] for information to migrate custom error handlers to `CommonErrorHandler`. @@ -425,7 +424,7 @@ To replace any `BatchErrorHandler` implementation, you should implement `handleB You should also implement `handleOtherException()` - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors). [[after-rollback]] -== After-rollback Processor +== After Rollback Processor When using transactions, if the listener throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back. By default, any unprocessed records (including the failed record) are re-fetched on the next poll. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-props.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-props.adoc index b126974ebe..8ef577d618 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-props.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-props.adoc @@ -30,7 +30,7 @@ See the JavaDocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options. |[[asyncAcks]]<> -|false +|`false` |Enable out-of-order commits (see xref:kafka/receiving-messages/ooo-commits.adoc[Manually Committing Offsets]); the consumer is paused and commits are deferred until gaps are filled. |[[authExceptionRetryInterval]]<> @@ -38,6 +38,10 @@ See the JavaDocs for `ContainerProperties.AssignmentCommitOption` for more infor |When not null, a `Duration` to sleep between polls when an `AuthenticationException` or `AuthorizationException` is thrown by the Kafka client. When null, such exceptions are considered fatal and the container will stop. +|[[batchRecoverAfterRollback]]<> +|`false` +|Set to `true` to enable batch recovery, See xref:kafka/annotation-error-handling.adoc#after-rollback[After Rollback Processor]. + |[[clientId]]<> |(empty string) |A prefix for the `client.id` consumer property. @@ -57,10 +61,6 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize |`null` |When present and `syncCommits` is `false` a callback invoked after the commit completes. -|[[offsetAndMetadataProvider]]<> -|`null` -|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata. - |[[commitLogLevel]]<> |DEBUG |The logging level for logs pertaining to committing offsets. @@ -69,15 +69,15 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize |`null` |A rebalance listener; see xref:kafka/receiving-messages/rebalance-listeners.adoc[Rebalancing Listeners]. -|[[consumerStartTimout]]<> +|[[commitRetries]]<> +|3 +|Set the number of retries `RetriableCommitFailedException` when using `syncCommits` set to true. +Default 3 (4-attempt total). + +|[[consumerStartTimeout]]<> |30s |The time to wait for the consumer to start before logging an error; this might happen if, say, you use a task executor with insufficient threads. -|[[consumerTaskExecutor]]<> -|`SimpleAsyncTaskExecutor` -|A task executor to run the consumer threads. -The default executor creates threads named `-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-n` where n is incremented for each child container. - |[[deliveryAttemptHeader]]<> |`false` |See xref:kafka/annotation-error-handling.adoc#delivery-header[Delivery Attempts Header]. @@ -123,9 +123,14 @@ Also see `idleBeforeDataMultiplier`. |None |Used to override any arbitrary consumer properties configured on the consumer factory. +|[[listenerTaskExecutor]]<> +|`SimpleAsyncTaskExecutor` +|A task executor to run the consumer threads. +The default executor creates threads named `-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-n` where n is incremented for each child container. + |[[logContainerConfig]]<> |`false` -|Set to true to log at INFO level all container properties. +|Set to `true` to log at INFO level all container properties. |[[messageListener]]<> |`null` @@ -145,7 +150,7 @@ Also see `idleBeforeDataMultiplier`. |[[missingTopicsFatal]]<> |`false` -|When true prevents the container from starting if the confifgured topic(s) are not present on the broker. +|When true prevents the container from starting if the configured topic(s) are not present on the broker. |[[monitorInterval]]<> |30s @@ -157,9 +162,21 @@ See `noPollThreshold` and `pollTimeout`. |Multiplied by `pollTimeOut` to determine whether to publish a `NonResponsiveConsumerEvent`. See `monitorInterval`. +|[[observationConvention]]<> +|`null` +|When set, add dynamic tags to the timers and traces, based on information in the consumer records. + +|[[observationEnabled]]<> +|`false` +|Set to `true` to enable observation via Micrometer. + +|[[offsetAndMetadataProvider]]<> +|`null` +|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata. + |[[onlyLogRecordMetadata]]<> |`false` -|Set to false to log the complete consumer record (in error, debug logs etc) instead of just `topic-partition@offset`. +|Set to `false` to log the complete consumer record (in error, debug logs etc.) instead of just `topic-partition@offset`. |[[pauseImmediate]]<> |`false` @@ -256,14 +273,6 @@ See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Han |`ContainerProperties` |The container properties instance. -|[[errorHandler]]<> -|See desc. -|Deprecated - see `commonErrorHandler`. - -|[[genericErrorHandler]]<> -|See desc. -|Deprecated - see `commonErrorHandler`. - |[[groupId2]]<> |See desc. |The `containerProperties.groupId`, if present, otherwise the `group.id` property from the consumer factory. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc index b819408bf8..7b2bb36502 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc @@ -24,7 +24,7 @@ NOTE: With the concurrent container, timers are created for each thread and the [[monitoring-kafkatemplate-performance]] == Monitoring KafkaTemplate Performance -Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s for send operations, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context. +Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s+++ for send operations, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context. The timers can be disabled by setting the template's `micrometerEnabled` property to `false`. Two timers are maintained - one for successful calls to the listener and one for failures. @@ -111,6 +111,6 @@ Starting with version 3.0.6, you can add dynamic tags to the timers and traces, To do so, add a custom `KafkaListenerObservationConvention` and/or `KafkaTemplateObservationConvention` to the listener container properties or `KafkaTemplate` respectively. The `record` property in both observation contexts contains the `ConsumerRecord` or `ProducerRecord` respectively. -The sender and receiver contexts' `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`. +The sender and receiver contexts `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`. If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual `clusterId` on the `KafkaAdmin` and inject it into `KafkaTemplate` s and listener containers. When it is `null` (default), the admin will invoke the `describeCluster` admin operation to retrieve it from the broker. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume.adoc index c06436ba35..58f55aa34d 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume.adoc @@ -13,11 +13,11 @@ Starting with version 2.1.5, you can call `isPauseRequested()` to see if `pause( However, the consumers might not have actually paused yet. `isConsumerPaused()` returns true if all `Consumer` instances have actually paused. -In addition (also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property. +In addition(also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property. Starting with version 2.9, a new container property `pauseImmediate`, when set to true, causes the pause to take effect after the current record is processed. -By default, the pause takes effect when all of the records from the previous poll have been processed. -See <>. +By default, the pause takes effect when all the records from the previous poll have been processed. +See xref:kafka/container-props.adoc#pauseImmediate[pauseImmediate]. The following simple Spring Boot application demonstrates by using the container registry to get a reference to a `@KafkaListener` method's container and pausing or resuming its consumers as well as receiving the corresponding events: diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 616db9bea3..0ff38a6d7f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -1059,6 +1059,7 @@ public String toString() { + "\n ackMode=" + this.ackMode + "\n ackCount=" + this.ackCount + "\n ackTime=" + this.ackTime + + "\n consumerStartTimeout=" + this.consumerStartTimeout + "\n messageListener=" + this.messageListener + (this.listenerTaskExecutor != null ? "\n listenerTaskExecutor=" + this.listenerTaskExecutor @@ -1074,16 +1075,21 @@ public String toString() { + "\n monitorInterval=" + this.monitorInterval + (this.scheduler != null ? "\n scheduler=" + this.scheduler : "") + "\n noPollThreshold=" + this.noPollThreshold + + "\n pauseImmediate=" + this.pauseImmediate + "\n pollTimeoutWhilePaused=" + this.pollTimeoutWhilePaused + "\n subBatchPerPartition=" + this.subBatchPerPartition + "\n assignmentCommitOption=" + this.assignmentCommitOption + "\n deliveryAttemptHeader=" + this.deliveryAttemptHeader + + "\n batchRecoverAfterRollback=" + this.batchRecoverAfterRollback + "\n eosMode=" + this.eosMode + "\n transactionDefinition=" + this.transactionDefinition + "\n stopContainerWhenFenced=" + this.stopContainerWhenFenced + "\n stopImmediate=" + this.stopImmediate + "\n asyncAcks=" + this.asyncAcks + + "\n logContainerConfig=" + this.logContainerConfig + + "\n missingTopicsFatal=" + this.missingTopicsFatal + "\n idleBeforeDataMultiplier=" + this.idleBeforeDataMultiplier + + "\n idleBetweenPolls=" + this.idleBetweenPolls + "\n micrometerEnabled=" + this.micrometerEnabled + "\n observationEnabled=" + this.observationEnabled + (this.observationConvention != null diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 390bbd8a5e..ff26ea06e1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -3091,25 +3091,20 @@ private void processSeeks() { traceSeek(offset); try { SeekPosition position = offset.getPosition(); + TopicPartition topicPartition = offset.getTopicPartition(); Long whereTo = offset.getOffset(); if (position == null) { if (offset.isRelativeToCurrent()) { - whereTo += this.consumer.position(offset.getTopicPartition()); + whereTo += this.consumer.position(topicPartition); whereTo = Math.max(whereTo, 0); } - this.consumer.seek(offset.getTopicPartition(), whereTo); + this.consumer.seek(topicPartition, whereTo); } - else if (position.equals(SeekPosition.BEGINNING)) { - this.consumer.seekToBeginning(Collections.singletonList(offset.getTopicPartition())); - if (whereTo != null) { - this.consumer.seek(offset.getTopicPartition(), whereTo); - } - } - else if (position.equals(SeekPosition.TIMESTAMP)) { + else if (SeekPosition.TIMESTAMP.equals(position)) { // possible late addition since the grouped processing above Map offsetsForTimes = this.consumer .offsetsForTimes( - Collections.singletonMap(offset.getTopicPartition(), offset.getOffset())); + Collections.singletonMap(topicPartition, offset.getOffset())); offsetsForTimes.forEach((tp, ot) -> { if (ot != null) { this.consumer.seek(tp, ot.offset()); @@ -3117,10 +3112,15 @@ else if (position.equals(SeekPosition.TIMESTAMP)) { }); } else { - this.consumer.seekToEnd(Collections.singletonList(offset.getTopicPartition())); + if (SeekPosition.BEGINNING.equals(position)) { + this.consumer.seekToBeginning(Collections.singletonList(topicPartition)); + } + else { + this.consumer.seekToEnd(Collections.singletonList(topicPartition)); + } if (whereTo != null) { - whereTo += this.consumer.position(offset.getTopicPartition()); - this.consumer.seek(offset.getTopicPartition(), whereTo); + whereTo += this.consumer.position(topicPartition); + this.consumer.seek(topicPartition, whereTo); } } } @@ -3354,7 +3354,7 @@ public void seekToEnd(Collection partitions) { @Override public void seekRelative(String topic, int partition, long offset, boolean toCurrent) { if (toCurrent) { - this.seeks.add(new TopicPartitionOffset(topic, partition, offset, toCurrent)); + this.seeks.add(new TopicPartitionOffset(topic, partition, offset, true)); } else if (offset >= 0) { this.seeks.add(new TopicPartitionOffset(topic, partition, offset, SeekPosition.BEGINNING)); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java index 2b82126b81..d49323ac41 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -78,6 +77,8 @@ /** * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.2.4 * */ @@ -130,7 +131,7 @@ else if (event instanceof ConsumerFailedToStartEvent) { exec.destroy(); } - @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void testCorrectContainerForConsumerError() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); @@ -200,10 +201,8 @@ void delayedIdleEvent() throws InterruptedException { containerProperties); CountDownLatch latch1 = new CountDownLatch(1); CountDownLatch latch2 = new CountDownLatch(2); - AtomicReference eventTime = new AtomicReference<>(); container.setApplicationEventPublisher(event -> { if (event instanceof ListenerContainerIdleEvent) { - eventTime.set(System.currentTimeMillis()); latch1.countDown(); latch2.countDown(); } @@ -263,7 +262,7 @@ void testSyncRelativeSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -304,7 +303,7 @@ void testAsyncRelativeSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); Map>> recordMap = new HashMap<>(); recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar"))); recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); @@ -343,7 +342,7 @@ void testAsyncRelativeSeeks() throws InterruptedException { verify(consumer).seekToEnd(Collections.singletonList(tp2)); verify(consumer).seek(tp2, 70L); // position - 30 (seekToEnd ignored by mock) verify(consumer).seekToBeginning(Collections.singletonList(tp3)); - verify(consumer).seek(tp3, 30L); + verify(consumer).seek(tp3, 130L); // position + 30 (seekToBeginning ignored by mock) container.stop(); } @@ -363,7 +362,7 @@ void testSyncTimestampSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -410,7 +409,7 @@ void testAsyncTimestampSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); Map>> recordMap = new HashMap<>(); recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar"))); recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); @@ -508,7 +507,9 @@ void testBatchInterceptBeforeTx1() throws InterruptedException { } @SuppressWarnings({ "rawtypes", "unchecked" }) - void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batch) throws InterruptedException { + void testIntercept(boolean beforeTx, @Nullable AssignmentCommitOption option, boolean batch) + throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); final Consumer consumer = mock(Consumer.class); TopicPartition tp0 = new TopicPartition("foo", 0); @@ -523,7 +524,7 @@ void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batc Thread.sleep(10); return firstOrSecondPoll.incrementAndGet() < 3 ? records : empty; }).given(consumer).poll(any()); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -676,7 +677,7 @@ void testInterceptInTxNonKafkaTM() throws InterruptedException { Thread.sleep(10); return firstOrSecondPoll.incrementAndGet() < 2 ? records : empty; }).given(consumer).poll(any()); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -771,7 +772,7 @@ void testNoCommitOnAssignmentWithEarliest() throws InterruptedException { return records; }).given(consumer).poll(any()); TopicPartition tp0 = new TopicPartition("foo", 0); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -814,7 +815,7 @@ private void testInitialCommitIBasedOnCommitted(boolean committed) throws Interr return records; }).given(consumer).poll(any()); TopicPartition tp0 = new TopicPartition("foo", 0); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -865,7 +866,7 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept return null; }).given(consumer).pause(any()); TopicPartition tp0 = new TopicPartition("foo", 0); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); AtomicReference rebal = new AtomicReference<>(); willAnswer(invocation -> { rebal.set(invocation.getArgument(1)); @@ -911,14 +912,14 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLegacyAssi TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List allAssignments = Arrays.asList(tp0, tp1, tp2, tp3); + List allAssignments = List.of(tp0, tp1, tp2, tp3); Map>> allRecordMap = new HashMap<>(); allRecordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar"))); allRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar"))); allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List afterRevokeAssignments = Arrays.asList(tp1, tp3); + List afterRevokeAssignments = List.of(tp1, tp3); Map>> afterRevokeRecordMap = new HashMap<>(); afterRevokeRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); afterRevokeRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar"))); @@ -979,10 +980,11 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(allAssignments); rebal.get().onPartitionsAssigned(afterRevokeAssignments); rebalLatch.countDown(); @@ -991,11 +993,13 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, return ConsumerRecords.empty(); } return afterRevokeRecords; - default: + } + default -> { if (paused.get()) { return ConsumerRecords.empty(); } return afterRevokeRecords; + } } }).given(consumer).poll(any()); container.start(); @@ -1023,7 +1027,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List allAssignments = Arrays.asList(tp0, tp1, tp2, tp3); + List allAssignments = List.of(tp0, tp1, tp2, tp3); Map>> allRecordMap = new LinkedHashMap<>(); ConsumerRecord record0 = new ConsumerRecord("foo", 0, 0, null, "bar"); ConsumerRecord record1 = new ConsumerRecord("foo", 1, 0, null, "bar"); @@ -1032,7 +1036,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar"))); allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List revokedAssignments = Arrays.asList(tp0, tp2); + List revokedAssignments = List.of(tp0, tp2); AtomicInteger pollPhase = new AtomicInteger(); Consumer consumer = mock(Consumer.class); @@ -1044,9 +1048,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign return null; }).given(consumer).subscribe(any(Collection.class), any()); CountDownLatch pauseLatch = new CountDownLatch(1); - AtomicBoolean paused = new AtomicBoolean(); willAnswer(inv -> { - paused.set(true); pauseLatch.countDown(); return null; }).given(consumer).pause(any()); @@ -1087,17 +1089,20 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(revokedAssignments); rebal.get().onPartitionsAssigned(Collections.emptyList()); rebalLatch.countDown(); continueLatch.await(10, TimeUnit.SECONDS); return ConsumerRecords.empty(); - default: + } + default -> { return ConsumerRecords.empty(); + } } }).given(consumer).poll(any()); container.start(); @@ -1128,14 +1133,14 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws InterruptedException { TopicPartition tp0 = new TopicPartition("foo", 0); TopicPartition tp1 = new TopicPartition("foo", 1); - List allAssignments = Arrays.asList(tp0, tp1); + List allAssignments = List.of(tp0, tp1); Map>> allRecordMap = new HashMap<>(); allRecordMap.put(tp0, List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar"))); allRecordMap.put(tp1, List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List afterRevokeAssignments = Arrays.asList(tp1); + List afterRevokeAssignments = List.of(tp1); AtomicInteger pollPhase = new AtomicInteger(); Consumer consumer = mock(Consumer.class); @@ -1147,9 +1152,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I return null; }).given(consumer).subscribe(any(Collection.class), any()); CountDownLatch pauseLatch = new CountDownLatch(1); - AtomicBoolean paused = new AtomicBoolean(); willAnswer(inv -> { - paused.set(true); pauseLatch.countDown(); return null; }).given(consumer).pause(any()); @@ -1171,17 +1174,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(allAssignments); rebal.get().onPartitionsAssigned(afterRevokeAssignments); rebalLatch.countDown(); continueLatch.await(10, TimeUnit.SECONDS); return ConsumerRecords.empty(); - default: + } + default -> { return ConsumerRecords.empty(); + } } }).given(consumer).poll(any()); container.start(); @@ -1206,14 +1212,13 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws InterruptedException { TopicPartition tp0 = new TopicPartition("foo", 0); TopicPartition tp1 = new TopicPartition("foo", 1); - List allAssignments = Arrays.asList(tp0, tp1); + List allAssignments = List.of(tp0, tp1); Map>> allRecordMap = new HashMap<>(); allRecordMap.put(tp0, List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar"))); allRecordMap.put(tp1, List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List afterRevokeAssignments = Arrays.asList(tp1); AtomicInteger pollPhase = new AtomicInteger(); Consumer consumer = mock(Consumer.class); @@ -1225,9 +1230,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int return null; }).given(consumer).subscribe(any(Collection.class), any()); CountDownLatch pauseLatch = new CountDownLatch(1); - AtomicBoolean paused = new AtomicBoolean(); willAnswer(inv -> { - paused.set(true); pauseLatch.countDown(); return null; }).given(consumer).pause(any()); @@ -1249,17 +1252,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(List.of(tp0)); rebal.get().onPartitionsAssigned(List.of(new TopicPartition("foo", 2))); rebalLatch.countDown(); continueLatch.await(10, TimeUnit.SECONDS); return ConsumerRecords.empty(); - default: + } + default -> { return ConsumerRecords.empty(); + } } }).given(consumer).poll(any()); container.start(); @@ -1285,7 +1291,7 @@ private AcknowledgingMessageListener ackOffset1() { @Override public void onMessage(ConsumerRecord rec, @Nullable Acknowledgment ack) { - if (rec.offset() == 1) { + if (rec.offset() == 1 && ack != null) { ack.acknowledge(); } } @@ -1299,7 +1305,7 @@ public void onMessage(Object data) { public static class TestMessageListener1 implements MessageListener, ConsumerSeekAware { - private static ThreadLocal callbacks = new ThreadLocal<>(); + private static final ThreadLocal callbacks = new ThreadLocal<>(); CountDownLatch latch = new CountDownLatch(1); @@ -1335,7 +1341,7 @@ public void onIdleContainer(Map assignments, ConsumerSeekC public static class TestMessageListener2 implements MessageListener, ConsumerSeekAware { - private static ThreadLocal callbacks = new ThreadLocal<>(); + private static final ThreadLocal callbacks = new ThreadLocal<>(); CountDownLatch latch = new CountDownLatch(1);