diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 23644c9e84..db02a527ad 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -228,7 +228,7 @@ public enum EOSMode { /** * The time (ms) after which outstanding offsets should be committed when * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be - * larger than + * larger than zero. */ private long ackTime = DEFAULT_ACK_TIME; @@ -388,7 +388,7 @@ public void setAckCount(int count) { /** * Set the time (ms) after which outstanding offsets should be committed when * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be - * larger than + * larger than zero. * @param ackTime the time */ public void setAckTime(long ackTime) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 4419d5d90c..7e6c93408e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -286,7 +286,7 @@ else if (partitionsListenerConsumer.assignedPartitions != null) { @Nullable public Map> getAssignmentsByClientId() { ListenerConsumer partitionsListenerConsumer = this.listenerConsumer; - if (this.listenerConsumer != null) { + if (partitionsListenerConsumer != null) { return Collections.singletonMap(partitionsListenerConsumer.getClientId(), getAssignedPartitions()); } else { @@ -324,7 +324,7 @@ public void resume() { super.resume(); KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; if (consumer != null) { - this.listenerConsumer.wakeIfNecessary(); + consumer.wakeIfNecessary(); } } @@ -333,7 +333,7 @@ public void resumePartition(TopicPartition topicPartition) { super.resumePartition(topicPartition); KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; if (consumer != null) { - this.listenerConsumer.wakeIfNecessary(); + consumer.wakeIfNecessary(); } } @@ -926,7 +926,6 @@ else if (listener instanceof MessageListener) { if (this.containerProperties.isLogContainerConfig()) { this.logger.info(toString()); } - Map props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties(); ApplicationContext applicationContext = getApplicationContext(); this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, @@ -942,7 +941,7 @@ else if (listener instanceof MessageListener) { : applicationContext.getClassLoader()); this.syncCommitTimeout = determineSyncCommitTimeout(); if (this.containerProperties.getSyncCommitTimeout() == null) { - // update the property so we can use it directly from code elsewhere + // update the property, so we can use it directly from code elsewhere this.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout); if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) { KafkaMessageListenerContainer.this.thisOrParentContainer @@ -1019,15 +1018,7 @@ private void obtainClusterId() { @Nullable private ThreadStateProcessor setUpPollProcessor(boolean batch) { - if (batch) { - if (this.commonBatchInterceptor != null) { - return this.commonBatchInterceptor; - } - } - else if (this.commonRecordInterceptor != null) { - return this.commonRecordInterceptor; - } - return null; + return batch ? this.commonBatchInterceptor : this.commonRecordInterceptor; } @Nullable @@ -3050,14 +3041,14 @@ private void timedAcks(AckMode ackMode) { long now; now = System.currentTimeMillis(); boolean elapsed = now - this.last > this.containerProperties.getAckTime(); - if (ackMode.equals(AckMode.TIME) && elapsed) { + if (AckMode.TIME.equals(ackMode) && elapsed) { this.logger.debug(() -> "Committing in AckMode.TIME " + "because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()); commitIfNecessary(); this.last = now; } - else if (ackMode.equals(AckMode.COUNT_TIME) && elapsed) { + else if (AckMode.COUNT_TIME.equals(ackMode) && elapsed) { this.logger.debug(() -> "Committing in AckMode.COUNT_TIME " + "because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime()); @@ -3433,8 +3424,8 @@ public void acknowledge() { acknowledge(this.partial + 1); return; } - Map> offs = ListenerConsumer.this.offsetsInThisBatch; if (!this.acked) { + Map> offs = ListenerConsumer.this.offsetsInThisBatch; Map>> deferred = ListenerConsumer.this.deferredOffsets; for (ConsumerRecord cRecord : getHighestOffsetRecords(this.records)) { if (offs != null) { @@ -3871,7 +3862,7 @@ private class StopCallback implements BiConsumer { private final Runnable callback; - StopCallback(Runnable callback) { + StopCallback(@Nullable Runnable callback) { this.callback = callback; } @@ -3879,16 +3870,13 @@ private class StopCallback implements BiConsumer { public void accept(Object result, @Nullable Throwable throwable) { if (throwable != null) { KafkaMessageListenerContainer.this.logger.error(throwable, "Error while stopping the container"); - if (this.callback != null) { - this.callback.run(); - } } else { KafkaMessageListenerContainer.this.logger .debug(() -> KafkaMessageListenerContainer.this + " stopped normally"); - if (this.callback != null) { - this.callback.run(); - } + } + if (this.callback != null) { + this.callback.run(); } }