diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index f627d2be43..2b136863f1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -783,7 +783,7 @@ public void setConsumerStartTimeout(Duration consumerStartTimeout) { * @since 2.3.2 */ public boolean isSubBatchPerPartition() { - return this.subBatchPerPartition == null ? false : this.subBatchPerPartition; + return this.subBatchPerPartition != null && this.subBatchPerPartition; } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index 79d02a1183..ed3a7d2db7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,6 +49,8 @@ * @author Gary Russell * @author Andrii Pelesh * @author Antonio Tomac + * @author Wang Zhiyang + * * @since 2.8 * */ @@ -245,7 +247,7 @@ public static Exception findRootCause(Exception exception) { * @since 3.0.10 */ public static boolean checkDeserializer(ConsumerFactory consumerFactory, - Properties consumerOverrides, boolean isValue, ClassLoader classLoader) { + Properties consumerOverrides, boolean isValue, @Nullable ClassLoader classLoader) { Object deser = findDeserializerClass(consumerFactory, consumerOverrides, isValue); Class deserializer = null; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index d02faa72db..5a409342e0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -173,8 +173,6 @@ public class KafkaMessageListenerContainer // NOSONAR line count private static final String RAWTYPES = "rawtypes"; - private static final int DEFAULT_ACK_TIME = 5000; - private static final Map CONSUMER_CONFIG_DEFAULTS = ConsumerConfig.configDef().defaultValues(); private final AbstractMessageListenerContainer thisOrParentContainer; @@ -277,13 +275,8 @@ public Collection getAssignedPartitions() { else if (partitionsListenerConsumer.assignedPartitions != null) { return Collections.unmodifiableCollection(partitionsListenerConsumer.assignedPartitions); } - else { - return null; - } - } - else { - return null; } + return null; } @Override @@ -293,9 +286,7 @@ public Map> getAssignmentsByClientId() { if (partitionsListenerConsumer != null) { return Collections.singletonMap(partitionsListenerConsumer.getClientId(), getAssignedPartitions()); } - else { - return null; - } + return null; } @Override @@ -363,7 +354,6 @@ protected void doStart() { checkTopics(); } ContainerProperties containerProperties = getContainerProperties(); - checkAckMode(containerProperties); Object messageListener = containerProperties.getMessageListener(); AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor(); @@ -402,19 +392,6 @@ protected void doStart() { } } - private void checkAckMode(ContainerProperties containerProperties) { - if (!this.consumerFactory.isAutoCommit()) { - AckMode ackMode = containerProperties.getAckMode(); - if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) { - Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0"); - } - if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) - && containerProperties.getAckTime() == 0) { - containerProperties.setAckTime(DEFAULT_ACK_TIME); - } - } - } - private ListenerType determineListenerType(GenericMessageListener listener) { Object delegating = listener; while (delegating instanceof DelegatingMessageListener dml) { @@ -638,7 +615,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final Consumer consumer; - private final Map> offsets = new LinkedHashMap<>(); + private final Map offsets = new LinkedHashMap<>(); private final Collection assignedPartitions = new LinkedHashSet<>(); @@ -668,12 +645,16 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean autoCommit; + private final AckMode ackMode; + private final boolean isManualAck; private final boolean isCountAck; private final boolean isTimeOnlyAck; + private final boolean isTimeAck; + private final boolean isManualImmediateAck; private final boolean isAnyManualAck; @@ -756,7 +737,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final AtomicBoolean polling = new AtomicBoolean(); - private final boolean subBatchPerPartition; + private final boolean subBatchPerPartition = this.containerProperties.isSubBatchPerPartition(); private final Duration authExceptionRetryInterval = this.containerProperties.getAuthExceptionRetryInterval(); @@ -836,8 +817,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private Producer producer; - private boolean commitRecovered; - private boolean wasIdle; private boolean batchFailed; @@ -864,23 +843,19 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume this.asyncReplies = listener instanceof AsyncRepliesAware hmd && hmd.isAsyncReplies() || this.containerProperties.isAsyncAcks(); - AckMode ackMode = determineAckMode(); - this.isManualAck = ackMode.equals(AckMode.MANUAL); - this.isCountAck = ackMode.equals(AckMode.COUNT) - || ackMode.equals(AckMode.COUNT_TIME); - this.isTimeOnlyAck = ackMode.equals(AckMode.TIME); - this.isManualImmediateAck = - ackMode.equals(AckMode.MANUAL_IMMEDIATE); + this.ackMode = determineAckMode(); + this.isCountAck = AckMode.COUNT.equals(this.ackMode) + || AckMode.COUNT_TIME.equals(this.ackMode); + this.isTimeOnlyAck = AckMode.TIME.equals(this.ackMode); + this.isTimeAck = this.isTimeOnlyAck + || AckMode.COUNT_TIME.equals(this.ackMode); + this.isManualAck = AckMode.MANUAL.equals(this.ackMode); + this.isManualImmediateAck = AckMode.MANUAL_IMMEDIATE.equals(this.ackMode); this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck; - this.isRecordAck = ackMode.equals(AckMode.RECORD); - this.offsetsInThisBatch = - this.isAnyManualAck && this.asyncReplies - ? new ConcurrentHashMap<>() - : null; - this.deferredOffsets = - this.isAnyManualAck && this.asyncReplies - ? new ConcurrentHashMap<>() - : null; + this.isRecordAck = this.ackMode.equals(AckMode.RECORD); + boolean isOutOfCommit = this.isAnyManualAck && this.asyncReplies; + this.offsetsInThisBatch = isOutOfCommit ? new ConcurrentHashMap<>() : null; + this.deferredOffsets = isOutOfCommit ? new ConcurrentHashMap<>() : null; this.observationRegistry = observationRegistry; Properties consumerProperties = propertiesFromConsumerPropertyOverrides(); @@ -966,7 +941,6 @@ else if (listener instanceof MessageListener) { this.maxPollInterval = obtainMaxPollInterval(consumerProperties); this.micrometerHolder = obtainMicrometerHolder(); this.deliveryAttemptAware = setupDeliveryAttemptAware(); - this.subBatchPerPartition = setupSubBatchPerPartition(); this.lastReceivePartition = new HashMap<>(); this.lastAlertPartition = new HashMap<>(); this.wasIdlePartition = new HashMap<>(); @@ -1077,14 +1051,6 @@ private void checkGroupInstance(Properties properties, ConsumerFactory con } } - private boolean setupSubBatchPerPartition() { - Boolean subBatching = this.containerProperties.getSubBatchPerPartition(); - if (subBatching != null) { - return subBatching; - } - return false; - } - @Nullable private DeliveryAttemptAware setupDeliveryAttemptAware() { DeliveryAttemptAware aware = null; @@ -1199,8 +1165,7 @@ else if (autoCommitOverride != null) { isAutoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit(); } Assert.state(!this.isAnyManualAck || !isAutoCommit, - () -> "Consumer cannot be configured for auto commit for ackMode " - + this.containerProperties.getAckMode()); + () -> "Consumer cannot be configured for auto commit for ackMode " + this.ackMode); return isAutoCommit; } @@ -1590,12 +1555,7 @@ private void fixTxOffsetsIfNeeded() { if (!toFix.isEmpty()) { this.logger.debug(() -> "Fixing TX offsets: " + toFix); if (this.kafkaTxManager == null) { - if (this.syncCommits) { - commitSync(toFix); - } - else { - commitAsync(toFix); - } + commitOffsets(toFix); } else { this.transactionTemplate.executeWithoutResult(status -> { @@ -2076,7 +2036,7 @@ private synchronized void ackInOrder(ConsumerRecord cRecord) { offs.remove(0); ConsumerRecord recordToAck = cRecord; if (!deferred.isEmpty()) { - Collections.sort(deferred, (a, b) -> Long.compare(a.offset(), b.offset())); + deferred.sort((a, b) -> Long.compare(a.offset(), b.offset())); while (!ObjectUtils.isEmpty(deferred) && deferred.get(0).offset() == recordToAck.offset() + 1) { recordToAck = deferred.remove(0); offs.remove(0); @@ -2103,19 +2063,8 @@ else if (cRecord.offset() < offs.get(0)) { } private void ackImmediate(ConsumerRecord cRecord) { - Map commits = Collections.singletonMap( - new TopicPartition(cRecord.topic(), cRecord.partition()), - createOffsetAndMetadata(cRecord.offset() + 1)); - this.commitLogger.log(() -> COMMITTING + commits); - if (this.producer != null) { - doSendOffsets(this.producer, commits); - } - else if (this.syncCommits) { - commitSync(commits); - } - else { - commitAsync(commits); - } + Map commits = buildSingleCommits(cRecord); + commitOffsetsInTransactions(commits); } private void ackImmediate(ConsumerRecords records) { @@ -2124,25 +2073,7 @@ private void ackImmediate(ConsumerRecords records) { commits.put(part, createOffsetAndMetadata(records.records(part) .get(records.records(part).size() - 1).offset() + 1)); } - this.commitLogger.log(() -> COMMITTING + commits); - if (this.producer != null) { - doSendOffsets(this.producer, commits); - } - else if (this.syncCommits) { - commitSync(commits); - } - else { - commitAsync(commits); - } - } - - private void commitAsync(Map commits) { - this.consumer.commitAsync(commits, (offsetsAttempted, exception) -> { - this.commitCallback.onComplete(offsetsAttempted, exception); - if (exception == null && this.fixTxOffsets) { - this.lastCommits.putAll(commits); - } - }); + commitOffsetsInTransactions(commits); } private void invokeListener(final ConsumerRecords records) { @@ -2173,7 +2104,6 @@ private void invokeBatchListener(final ConsumerRecords recordsArg) { } } - @SuppressWarnings(RAWTYPES) private void invokeBatchListenerInTx(final ConsumerRecords records, @Nullable final List> recordList) { @@ -2310,9 +2240,7 @@ private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecords || this.producer != null) { if (this.remainingRecords != null) { ConsumerRecord firstUncommitted = this.remainingRecords.iterator().next(); - Iterator> it = records.iterator(); - while (it.hasNext()) { - ConsumerRecord next = it.next(); + for (ConsumerRecord next : records) { if (!next.equals(firstUncommitted)) { this.acks.add(next); } @@ -2798,21 +2726,15 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord cRecord) { if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null) || this.producer != null) { - if (this.isManualAck) { - this.commitRecovered = true; - } if (this.remainingRecords == null || !cRecord.equals(this.remainingRecords.iterator().next())) { if (this.offsetsInThisBatch != null) { // NOSONAR (sync) ackInOrder(cRecord); } else { - ackCurrent(cRecord); + ackCurrent(cRecord, this.isManualAck); } } - if (this.isManualAck) { - this.commitRecovered = false; - } } } @@ -2919,11 +2841,11 @@ private void invokeErrorHandler(final ConsumerRecord cRecord, Map>> records = new LinkedHashMap<>(); if (!handled) { records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()), - tp -> new ArrayList>()).add(cRecord); + tp -> new ArrayList<>()).add(cRecord); while (iterator.hasNext()) { ConsumerRecord next = iterator.next(); records.computeIfAbsent(new TopicPartition(next.topic(), next.partition()), - tp -> new ArrayList>()).add(next); + tp -> new ArrayList<>()).add(next); } } if (!records.isEmpty()) { @@ -2994,31 +2916,22 @@ public void checkDeser(final ConsumerRecord cRecord, String headerName) { } public void ackCurrent(final ConsumerRecord cRecord) { + ackCurrent(cRecord, false); + } - if (this.isRecordAck) { - Map offsetsToCommit = - Collections.singletonMap(new TopicPartition(cRecord.topic(), cRecord.partition()), - createOffsetAndMetadata(cRecord.offset() + 1)); - if (this.producer == null) { - this.commitLogger.log(() -> COMMITTING + offsetsToCommit); - if (this.syncCommits) { - commitSync(offsetsToCommit); - } - else { - commitAsync(offsetsToCommit); - } - } - else { - this.acks.add(cRecord); - } + public void ackCurrent(final ConsumerRecord cRecord, boolean commitRecovered) { + if (this.isRecordAck && this.producer == null) { + Map offsetsToCommit = buildSingleCommits(cRecord); + this.commitLogger.log(() -> COMMITTING + offsetsToCommit); + commitOffsets(offsetsToCommit); } - else if (this.producer != null - || ((!this.isAnyManualAck || this.commitRecovered) && !this.autoCommit)) { + else if (this.producer != null) { this.acks.add(cRecord); - } - if (this.producer != null) { sendOffsetsToTransaction(); } + else if (!this.autoCommit && (!this.isAnyManualAck || commitRecovered)) { + this.acks.add(cRecord); + } } private void sendOffsetsToTransaction() { @@ -3029,6 +2942,9 @@ private void sendOffsetsToTransaction() { } private void doSendOffsets(Producer prod, Map commits) { + if (CollectionUtils.isEmpty(commits)) { + return; + } prod.sendOffsetsToTransaction(commits, this.consumer.groupMetadata()); if (this.fixTxOffsets) { this.lastCommits.putAll(commits); @@ -3038,45 +2954,44 @@ private void doSendOffsets(Producer prod, Map= this.containerProperties.getAckCount(); - if ((!this.isTimeOnlyAck && !this.isCountAck) || countExceeded) { - if (this.isCountAck) { - this.logger.debug(() -> "Committing in " + ackMode.name() + " because count " - + this.count - + " exceeds configured limit of " + this.containerProperties.getAckCount()); - } - commitIfNecessary(); - this.count = 0; - } - else { - timedAcks(ackMode); - } + if (this.isCountAck) { + countAcks(); + } + else if (this.isTimeAck) { + timedAcks(); + } + else if (!this.isManualImmediateAck) { + commitIfNecessary(); + this.count = 0; } } - private void timedAcks(AckMode ackMode) { - long now; - now = System.currentTimeMillis(); - boolean elapsed = now - this.last > this.containerProperties.getAckTime(); - if (AckMode.TIME.equals(ackMode) && elapsed) { - this.logger.debug(() -> "Committing in AckMode.TIME " + - "because time elapsed exceeds configured limit of " + - this.containerProperties.getAckTime()); + private void countAcks() { + boolean countExceeded = this.isCountAck && this.count >= this.containerProperties.getAckCount(); + if (countExceeded) { + this.logger.debug(() -> "Committing in " + this.ackMode.name() + " because count " + + this.count + + " exceeds configured limit of " + this.containerProperties.getAckCount()); commitIfNecessary(); - this.last = now; + this.count = 0; + if (AckMode.COUNT_TIME.equals(this.ackMode)) { + this.last = System.currentTimeMillis(); + } } - else if (AckMode.COUNT_TIME.equals(ackMode) && elapsed) { - this.logger.debug(() -> "Committing in AckMode.COUNT_TIME " + + } + + private void timedAcks() { + long now = System.currentTimeMillis(); + boolean elapsed = this.isTimeAck && now - this.last > this.containerProperties.getAckTime(); + if (elapsed) { + this.logger.debug(() -> "Committing in " + this.ackMode.name() + "because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()); commitIfNecessary(); this.last = now; - this.count = 0; + if (AckMode.COUNT_TIME.equals(this.ackMode)) { + this.count = 0; + } } } @@ -3240,17 +3155,9 @@ private void logReset(TopicPartition topicPartition, long newOffset) { this.logger.debug(() -> "Reset " + topicPartition + " to offset " + newOffset); } - private void updatePendingOffsets() { - ConsumerRecord cRecord = this.acks.poll(); - while (cRecord != null) { - addOffset(cRecord); - cRecord = this.acks.poll(); - } - } - private void addOffset(ConsumerRecord cRecord) { - this.offsets.computeIfAbsent(cRecord.topic(), v -> new ConcurrentHashMap<>()) - .compute(cRecord.partition(), (k, v) -> v == null ? cRecord.offset() : Math.max(v, cRecord.offset())); + this.offsets.compute(new TopicPartition(cRecord.topic(), cRecord.partition()), + (k, v) -> v == null ? cRecord.offset() : Math.max(v, cRecord.offset())); } private void commitIfNecessary() { @@ -3259,12 +3166,7 @@ private void commitIfNecessary() { if (!commits.isEmpty()) { this.commitLogger.log(() -> COMMITTING + commits); try { - if (this.syncCommits) { - commitSync(commits); - } - else { - commitAsync(commits); - } + commitOffsets(commits); } catch (@SuppressWarnings(UNUSED) WakeupException e) { // ignore - not polling @@ -3273,6 +3175,37 @@ private void commitIfNecessary() { } } + private void commitOffsetsInTransactions(Map commits) { + this.commitLogger.log(() -> COMMITTING + commits); + if (this.producer != null) { + doSendOffsets(this.producer, commits); + } + else { + commitOffsets(commits); + } + } + + private void commitOffsets(Map commits) { + if (CollectionUtils.isEmpty(commits)) { + return; + } + if (this.syncCommits) { + commitSync(commits); + } + else { + commitAsync(commits); + } + } + + private void commitAsync(Map commits) { + this.consumer.commitAsync(commits, (offsetsAttempted, exception) -> { + this.commitCallback.onComplete(offsetsAttempted, exception); + if (exception == null && this.fixTxOffsets) { + this.lastCommits.putAll(commits); + } + }); + } + private void commitSync(Map commits) { doCommitSync(commits, 0); } @@ -3296,14 +3229,17 @@ private void doCommitSync(Map commits, int re } } + Map buildSingleCommits(ConsumerRecord cRecord) { + return Collections.singletonMap( + new TopicPartition(cRecord.topic(), cRecord.partition()), + createOffsetAndMetadata(cRecord.offset() + 1)); + } + private Map buildCommits() { Map commits = new LinkedHashMap<>(); - for (Entry> entry : this.offsets.entrySet()) { - for (Entry offset : entry.getValue().entrySet()) { - commits.put(new TopicPartition(entry.getKey(), offset.getKey()), - createOffsetAndMetadata(offset.getValue() + 1)); - } - } + this.offsets.forEach((topicPartition, offset) -> { + commits.put(topicPartition, createOffsetAndMetadata(offset + 1)); + }); this.offsets.clear(); return commits; } @@ -3454,10 +3390,10 @@ public void acknowledge() { if (!this.acked) { Map> offs = ListenerConsumer.this.offsetsInThisBatch; Map>> deferred = ListenerConsumer.this.deferredOffsets; - for (ConsumerRecord cRecord : getHighestOffsetRecords(this.records)) { + for (TopicPartition topicPartition : this.records.partitions()) { if (offs != null) { - offs.remove(new TopicPartition(cRecord.topic(), cRecord.partition())); - deferred.remove(new TopicPartition(cRecord.topic(), cRecord.partition())); + offs.remove(topicPartition); + deferred.remove(topicPartition); } } processAcks(this.records); @@ -3641,8 +3577,7 @@ public void onPartitionsAssigned(Collection partitions) { private void repauseIfNeeded(Collection partitions) { boolean pending = false; synchronized (ListenerConsumer.this) { - Map> pendingOffsets = ListenerConsumer.this.offsetsInThisBatch; - if (!ObjectUtils.isEmpty(pendingOffsets)) { + if (!ObjectUtils.isEmpty(ListenerConsumer.this.offsetsInThisBatch)) { pending = true; } } @@ -3720,7 +3655,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { .getProducerFactory()); if (holder != null) { doSendOffsets(holder.getProducer(), - Collections.singletonMap(partition, offsetAndMetadata)); + Collections.singletonMap(partition, offsetAndMetadata)); } }