Skip to content

Commit

Permalink
GH-2387: Add Missing Pause/Resume Events
Browse files Browse the repository at this point in the history
Resolves #2387

- `FallbackBatchErrorHandler`
- `Acknowledgment.nack()`

**2.9.x only - I will back port/forward port**
  • Loading branch information
garyrussell committed Sep 6, 2022
1 parent 950f8ca commit f1253d4
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,8 @@ public class ConsumerPausedEvent extends KafkaEvent {

private final Collection<TopicPartition> partitions;

private final String reason;

/**
* Construct an instance with the provided source and partitions.
* @param source the container instance that generated the event.
Expand All @@ -43,6 +45,23 @@ public class ConsumerPausedEvent extends KafkaEvent {
public ConsumerPausedEvent(Object source, Object container, Collection<TopicPartition> partitions) {
super(source, container);
this.partitions = partitions;
this.reason = null;
}

/**
* Construct an instance with the provided source and partitions.
* @param source the container instance that generated the event.
* @param container the container or the parent container if the container is a child.
* @param partitions the partitions.
* @param reason the reason for the pause.
* @since 2.8.9
*/
public ConsumerPausedEvent(Object source, Object container, Collection<TopicPartition> partitions,
String reason) {

super(source, container);
this.partitions = partitions;
this.reason = reason;
}

/**
Expand All @@ -55,7 +74,7 @@ public Collection<TopicPartition> getPartitions() {

@Override
public String toString() {
return "ConsumerPausedEvent [partitions=" + this.partitions + "]";
return "ConsumerPausedEvent [reason=" + this.reason + ", partitions=" + this.partitions + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,11 @@ default void setAckAfterHandle(boolean ack) {
* Called when partitions are assigned.
* @param consumer the consumer.
* @param partitions the newly assigned partitions.
* @since 2.8.8
* @param publishPause called to publish a consumer paused event.
* @since 2.8.9
*/
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
Runnable publishPause) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
}

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions);
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
Runnable publishPause) {

getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions, publishPause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,12 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
}

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
Runnable publishPause) {

if (this.batchErrorHandler instanceof FallbackBatchErrorHandler) {
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions);
((FallbackBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions,
publishPause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.Set;
import java.util.function.BiConsumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
Expand Down Expand Up @@ -63,7 +65,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
BackOffExecution execution = backOff.start();
long nextBackOff = execution.nextBackOff();
String failed = null;
consumer.pause(consumer.assignment());
Set<TopicPartition> assignment = consumer.assignment();
consumer.pause(assignment);
if (container instanceof KafkaMessageListenerContainer) {
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
}
try {
while (nextBackOff != BackOffExecution.STOP) {
consumer.poll(Duration.ZERO);
Expand Down Expand Up @@ -100,7 +106,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
}
}
finally {
consumer.resume(consumer.assignment());
Set<TopicPartition> assignment2 = consumer.assignment();
consumer.resume(assignment2);
if (container instanceof KafkaMessageListenerContainer) {
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerResumedEvent(assignment2);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,12 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
}
}

public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
Runnable publishPause) {

if (this.retrying.get()) {
consumer.pause(consumer.assignment());
publishPause.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,15 +452,15 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<
}
}

private void publishConsumerPausedEvent(Collection<TopicPartition> partitions) {
void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
publisher.publishEvent(new ConsumerPausedEvent(this, this.thisOrParentContainer,
Collections.unmodifiableCollection(partitions)));
Collections.unmodifiableCollection(partitions), reason));
}
}

private void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
publisher.publishEvent(new ConsumerResumedEvent(this, this.thisOrParentContainer,
Expand Down Expand Up @@ -1710,7 +1710,9 @@ private void doPauseConsumerIfNecessary() {
this.consumerPaused = true;
this.pauseForPending = false;
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
publishConsumerPausedEvent(assigned);
publishConsumerPausedEvent(assigned, this.pausedForAsyncAcks
? "Incomplete out of order acks"
: "User requested");
}
}
}
Expand All @@ -1721,6 +1723,7 @@ private void resumeConsumerIfNeccessary() {
this.nackWakeTimeMillis = 0;
this.consumer.resume(this.pausedForNack);
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
publishConsumerResumedEvent(this.pausedForNack);
this.pausedForNack.clear();
}
}
Expand Down Expand Up @@ -2653,6 +2656,7 @@ private void pauseForNackSleep() {
this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack);
try {
this.consumer.pause(this.pausedForNack);
publishConsumerPausedEvent(this.pausedForNack, "Nack with sleep time received");
}
catch (IllegalStateException ex) {
// this should never happen; defensive, just in case...
Expand Down Expand Up @@ -3479,7 +3483,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
if (ListenerConsumer.this.commonErrorHandler != null) {
ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer,
partitions);
partitions, () -> publishConsumerPausedEvent(partitions,
"Paused by error handler after rebalance"));
}
}

Expand All @@ -3490,7 +3495,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
+ "consumer paused again, so the initial poll() will never return any records");
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + partitions);
publishConsumerPausedEvent(partitions);
publishConsumerPausedEvent(partitions, "Re-paused after rebalance");
}
Collection<TopicPartition> toRepause = new LinkedList<>();
partitions.forEach(tp -> {
Expand All @@ -3501,7 +3506,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) {
ListenerConsumer.this.consumer.pause(toRepause);
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + toRepause);
publishConsumerPausedEvent(toRepause);
publishConsumerPausedEvent(toRepause, "Re-paused after rebalance");
}
this.revoked.removeAll(toRepause);
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,25 +186,31 @@ void rePauseOnRebalance() {
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
Consumer<?, ?> consumer = mock(Consumer.class);
given(consumer.assignment()).willReturn(map.keySet());
AtomicBoolean pubPauseCalled = new AtomicBoolean();
willAnswer(inv -> {
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)),
() -> pubPauseCalled.set(true));
return records;
}).given(consumer).poll(any());
MessageListenerContainer container = mock(MessageListenerContainer.class);
KafkaMessageListenerContainer<?, ?> container = mock(KafkaMessageListenerContainer.class);
given(container.isRunning()).willReturn(true);
eh.handle(new RuntimeException(), records, consumer, container, () -> {
this.invoked++;
throw new RuntimeException();
});
assertThat(this.invoked).isEqualTo(1);
assertThat(recovered).hasSize(2);
InOrder inOrder = inOrder(consumer);
InOrder inOrder = inOrder(consumer, container);
inOrder.verify(consumer).pause(any());
inOrder.verify(container).publishConsumerPausedEvent(map.keySet(), "For batch retry");
inOrder.verify(consumer).poll(any());
inOrder.verify(consumer).pause(any());
inOrder.verify(consumer).resume(any());
inOrder.verify(container).publishConsumerResumedEvent(map.keySet());
verify(consumer, times(3)).assignment();
verifyNoMoreInteractions(consumer);
assertThat(pubPauseCalled.get()).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,11 +51,14 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ConsumerPausedEvent;
import org.springframework.kafka.event.ConsumerResumedEvent;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.utils.KafkaTestUtils;
Expand Down Expand Up @@ -93,6 +96,8 @@ public void dontResumeAlreadyPaused() throws Exception {
assertThat(this.config.resumedForNack).hasSize(1);
assertThat(this.config.pausedForNack).contains(new TopicPartition("foo", 1));
assertThat(this.config.resumedForNack).contains(new TopicPartition("foo", 1));
assertThat(this.config.pauseEvents).hasSize(1);
assertThat(this.config.resumeEvents).hasSize(1);
}

@Configuration
Expand All @@ -113,6 +118,10 @@ public static class Config {

final Set<TopicPartition> resumedForNack = new HashSet<>();

final List<ConsumerPausedEvent> pauseEvents = new ArrayList<>();

final List<ConsumerResumedEvent> resumeEvents = new ArrayList<>();

volatile int count;

volatile long replayTime;
Expand Down Expand Up @@ -232,6 +241,16 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
return factory;
}

@EventListener
public void paused(ConsumerPausedEvent event) {
this.pauseEvents.add(event);
}

@EventListener
public void resumed(ConsumerResumedEvent event) {
this.resumeEvents.add(event);
}

}

}

0 comments on commit f1253d4

Please sign in to comment.