diff --git a/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerPausedEvent.java b/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerPausedEvent.java index b14152440e..d2a9bf7646 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerPausedEvent.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerPausedEvent.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,8 @@ public class ConsumerPausedEvent extends KafkaEvent { private final Collection partitions; + private final String reason; + /** * Construct an instance with the provided source and partitions. * @param source the container instance that generated the event. @@ -43,6 +45,23 @@ public class ConsumerPausedEvent extends KafkaEvent { public ConsumerPausedEvent(Object source, Object container, Collection partitions) { super(source, container); this.partitions = partitions; + this.reason = null; + } + + /** + * Construct an instance with the provided source and partitions. + * @param source the container instance that generated the event. + * @param container the container or the parent container if the container is a child. + * @param partitions the partitions. + * @param reason the reason for the pause. + * @since 2.8.9 + */ + public ConsumerPausedEvent(Object source, Object container, Collection partitions, + String reason) { + + super(source, container); + this.partitions = partitions; + this.reason = reason; } /** @@ -55,7 +74,7 @@ public Collection getPartitions() { @Override public String toString() { - return "ConsumerPausedEvent [partitions=" + this.partitions + "]"; + return "ConsumerPausedEvent [reason=" + this.reason + ", partitions=" + this.partitions + "]"; } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java index d87c9118c8..8c53143040 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java @@ -218,9 +218,11 @@ default void setAckAfterHandle(boolean ack) { * Called when partitions are assigned. * @param consumer the consumer. * @param partitions the newly assigned partitions. - * @since 2.8.8 + * @param publishPause called to publish a consumer paused event. + * @since 2.8.9 */ - default void onPartitionsAssigned(Consumer consumer, Collection partitions) { + default void onPartitionsAssigned(Consumer consumer, Collection partitions, + Runnable publishPause) { } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java index 4d39ae9f49..6ffb038bf5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java @@ -202,8 +202,10 @@ public void handleOtherException(Exception thrownException, Consumer consu } @Override - public void onPartitionsAssigned(Consumer consumer, Collection partitions) { - getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions); + public void onPartitionsAssigned(Consumer consumer, Collection partitions, + Runnable publishPause) { + + getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions, publishPause); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java index 712ce71a10..b7f0bc8e48 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java @@ -163,9 +163,12 @@ public void handleBatch(Exception thrownException, ConsumerRecords data, C } @Override - public void onPartitionsAssigned(Consumer consumer, Collection partitions) { + public void onPartitionsAssigned(Consumer consumer, Collection partitions, + Runnable publishPause) { + if (this.batchErrorHandler instanceof FallbackBatchErrorHandler) { - ((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions); + ((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions, + publishPause); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index a243b6b83b..b4014013b9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -17,10 +17,12 @@ package org.springframework.kafka.listener; import java.time.Duration; +import java.util.Set; import java.util.function.BiConsumer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.KafkaException; @@ -63,7 +65,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r BackOffExecution execution = backOff.start(); long nextBackOff = execution.nextBackOff(); String failed = null; - consumer.pause(consumer.assignment()); + Set assignment = consumer.assignment(); + consumer.pause(assignment); + if (container instanceof KafkaMessageListenerContainer) { + ((KafkaMessageListenerContainer) container).publishConsumerPausedEvent(assignment, "For batch retry"); + } try { while (nextBackOff != BackOffExecution.STOP) { consumer.poll(Duration.ZERO); @@ -100,7 +106,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r } } finally { - consumer.resume(consumer.assignment()); + Set assignment2 = consumer.assignment(); + consumer.resume(assignment2); + if (container instanceof KafkaMessageListenerContainer) { + ((KafkaMessageListenerContainer) container).publishConsumerResumedEvent(assignment2); + } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java index b64addc0b3..fda027394d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java @@ -113,9 +113,12 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords re } } - public void onPartitionsAssigned(Consumer consumer, Collection partitions) { + public void onPartitionsAssigned(Consumer consumer, Collection partitions, + Runnable publishPause) { + if (this.retrying.get()) { consumer.pause(consumer.assignment()); + publishPause.run(); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index dd9960bc4d..51eb634387 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -452,15 +452,15 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer< } } - private void publishConsumerPausedEvent(Collection partitions) { + void publishConsumerPausedEvent(Collection partitions, String reason) { ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { publisher.publishEvent(new ConsumerPausedEvent(this, this.thisOrParentContainer, - Collections.unmodifiableCollection(partitions))); + Collections.unmodifiableCollection(partitions), reason)); } } - private void publishConsumerResumedEvent(Collection partitions) { + void publishConsumerResumedEvent(Collection partitions) { ApplicationEventPublisher publisher = getApplicationEventPublisher(); if (publisher != null) { publisher.publishEvent(new ConsumerResumedEvent(this, this.thisOrParentContainer, @@ -1710,7 +1710,9 @@ private void doPauseConsumerIfNecessary() { this.consumerPaused = true; this.pauseForPending = false; this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused()); - publishConsumerPausedEvent(assigned); + publishConsumerPausedEvent(assigned, this.pausedForAsyncAcks + ? "Incomplete out of order acks" + : "User requested"); } } } @@ -1721,6 +1723,7 @@ private void resumeConsumerIfNeccessary() { this.nackWakeTimeMillis = 0; this.consumer.resume(this.pausedForNack); this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack); + publishConsumerResumedEvent(this.pausedForNack); this.pausedForNack.clear(); } } @@ -2653,6 +2656,7 @@ private void pauseForNackSleep() { this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack); try { this.consumer.pause(this.pausedForNack); + publishConsumerPausedEvent(this.pausedForNack, "Nack with sleep time received"); } catch (IllegalStateException ex) { // this should never happen; defensive, just in case... @@ -3479,7 +3483,8 @@ public void onPartitionsAssigned(Collection partitions) { } if (ListenerConsumer.this.commonErrorHandler != null) { ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer, - partitions); + partitions, () -> publishConsumerPausedEvent(partitions, + "Paused by error handler after rebalance")); } } @@ -3490,7 +3495,7 @@ private void repauseIfNeeded(Collection partitions) { ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; " + "consumer paused again, so the initial poll() will never return any records"); ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + partitions); - publishConsumerPausedEvent(partitions); + publishConsumerPausedEvent(partitions, "Re-paused after rebalance"); } Collection toRepause = new LinkedList<>(); partitions.forEach(tp -> { @@ -3501,7 +3506,7 @@ private void repauseIfNeeded(Collection partitions) { if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) { ListenerConsumer.this.consumer.pause(toRepause); ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + toRepause); - publishConsumerPausedEvent(toRepause); + publishConsumerPausedEvent(toRepause, "Re-paused after rebalance"); } this.revoked.removeAll(toRepause); ListenerConsumer.this.pausedPartitions.removeAll(this.revoked); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java index 6649cf4a87..63f46c0b56 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java @@ -186,11 +186,14 @@ void rePauseOnRebalance() { Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar"))); ConsumerRecords records = new ConsumerRecords<>(map); Consumer consumer = mock(Consumer.class); + given(consumer.assignment()).willReturn(map.keySet()); + AtomicBoolean pubPauseCalled = new AtomicBoolean(); willAnswer(inv -> { - eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1))); + eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)), + () -> pubPauseCalled.set(true)); return records; }).given(consumer).poll(any()); - MessageListenerContainer container = mock(MessageListenerContainer.class); + KafkaMessageListenerContainer container = mock(KafkaMessageListenerContainer.class); given(container.isRunning()).willReturn(true); eh.handle(new RuntimeException(), records, consumer, container, () -> { this.invoked++; @@ -198,13 +201,16 @@ void rePauseOnRebalance() { }); assertThat(this.invoked).isEqualTo(1); assertThat(recovered).hasSize(2); - InOrder inOrder = inOrder(consumer); + InOrder inOrder = inOrder(consumer, container); inOrder.verify(consumer).pause(any()); + inOrder.verify(container).publishConsumerPausedEvent(map.keySet(), "For batch retry"); inOrder.verify(consumer).poll(any()); inOrder.verify(consumer).pause(any()); inOrder.verify(consumer).resume(any()); + inOrder.verify(container).publishConsumerResumedEvent(map.keySet()); verify(consumer, times(3)).assignment(); verifyNoMoreInteractions(consumer); + assertThat(pubPauseCalled.get()).isTrue(); } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java index 91bf9d8dee..4df3e49156 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,11 +51,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.event.ConsumerPausedEvent; +import org.springframework.kafka.event.ConsumerResumedEvent; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -93,6 +96,8 @@ public void dontResumeAlreadyPaused() throws Exception { assertThat(this.config.resumedForNack).hasSize(1); assertThat(this.config.pausedForNack).contains(new TopicPartition("foo", 1)); assertThat(this.config.resumedForNack).contains(new TopicPartition("foo", 1)); + assertThat(this.config.pauseEvents).hasSize(1); + assertThat(this.config.resumeEvents).hasSize(1); } @Configuration @@ -113,6 +118,10 @@ public static class Config { final Set resumedForNack = new HashSet<>(); + final List pauseEvents = new ArrayList<>(); + + final List resumeEvents = new ArrayList<>(); + volatile int count; volatile long replayTime; @@ -232,6 +241,16 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { return factory; } + @EventListener + public void paused(ConsumerPausedEvent event) { + this.pauseEvents.add(event); + } + + @EventListener + public void resumed(ConsumerResumedEvent event) { + this.resumeEvents.add(event); + } + } }