diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.java index 129a14a558..4a1b1fffe9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,28 +18,22 @@ import java.util.Collection; -import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; -import org.springframework.core.log.LogAccessor; - /** * A rebalance listener that provides access to the consumer object. Starting with version * 2.1.5, as a convenience, default no-op implementations are provided for all methods, * allowing the user to implement just those (s)he is interested in. * * @author Gary Russell + * @author Michal Domagala * @since 2.0 * */ public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener { - /** - * {@link LogAccessor} for use in default methods. - */ - LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ConsumerAwareRebalanceListener.class)); /** * The same as {@link #onPartitionsRevoked(Collection)} with the additional consumer @@ -48,12 +42,7 @@ public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListene * @param partitions the partitions. */ default void onPartitionsRevokedBeforeCommit(Consumer consumer, Collection partitions) { - try { - onPartitionsRevoked(partitions); - } - catch (Exception e) { // NOSONAR - LOGGER.error(e, "User method threw exception"); - } + onPartitionsRevoked(partitions); } /** @@ -72,12 +61,7 @@ default void onPartitionsRevokedAfterCommit(Consumer consumer, Collection< * @since 2.4 */ default void onPartitionsLost(Consumer consumer, Collection partitions) { - try { - onPartitionsLost(partitions); - } - catch (Exception e) { // NOSONAR - LOGGER.error(e, "User method threw exception"); - } + onPartitionsLost(partitions); } /** @@ -87,12 +71,7 @@ default void onPartitionsLost(Consumer consumer, Collection consumer, Collection partitions) { - try { - onPartitionsAssigned(partitions); - } - catch (Exception e) { // NOSONAR - LOGGER.error(e, "User method threw exception"); - } + onPartitionsAssigned(partitions); } @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListenerTests.java index 62c4afb2af..605f5b3322 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListenerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListenerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ /** * @author Gary Russell + * @author Michal Domagala * @since 2.6.5 * */ @@ -45,20 +46,6 @@ public void onPartitionsAssigned(Collection partitions) { assertThat(called.get()).isTrue(); } - @Test - void nonConsumerAwareTestAssignedThrows() { - AtomicBoolean called = new AtomicBoolean(); - new ConsumerAwareRebalanceListener() { - - @Override - public void onPartitionsAssigned(Collection partitions) { - called.set(true); - throw new RuntimeException(); - } - - }.onPartitionsAssigned(null, null); - assertThat(called.get()).isTrue(); - } @Test void nonConsumerAwareTestRevoked() { @@ -74,20 +61,6 @@ public void onPartitionsRevoked(Collection partitions) { assertThat(called.get()).isTrue(); } - @Test - void nonConsumerAwareTestRevokedThrows() { - AtomicBoolean called = new AtomicBoolean(); - new ConsumerAwareRebalanceListener() { - - @Override - public void onPartitionsRevoked(Collection partitions) { - called.set(true); - throw new RuntimeException(); - } - - }.onPartitionsRevokedBeforeCommit(null, null); - assertThat(called.get()).isTrue(); - } @Test void nonConsumerAwareTestLost() { @@ -103,19 +76,4 @@ public void onPartitionsLost(Collection partitions) { assertThat(called.get()).isTrue(); } - @Test - void nonConsumerAwareTestLostThrows() { - AtomicBoolean called = new AtomicBoolean(); - new ConsumerAwareRebalanceListener() { - - @Override - public void onPartitionsLost(Collection partitions) { - called.set(true); - throw new RuntimeException(); - } - - }.onPartitionsLost(null, null); - assertThat(called.get()).isTrue(); - } - }