From 4eca9795342bc482e39214c1271a718ac24611a6 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sun, 28 Apr 2024 08:11:52 +0900 Subject: [PATCH] refactor --- ...allelConsumerOptionsProviderCondition.java | 14 ++ .../kafka/config/ParallelConsumerConfig.java | 152 ++++++++++------ .../config/ParallelConsumerConfiguration.java | 25 ++- .../kafka/config/ParallelConsumerContext.java | 13 +- .../kafka/core/ParallelConsumerFactory.java | 86 +++++---- .../ParallelConsumerCallback.java | 22 ++- .../ParallelConsumerOptionsProvider.java | 168 ++++++++++++++++++ .../Poll.java} | 16 +- .../PollAndProduce.java} | 16 +- .../PollAndProduceMany.java} | 15 +- .../PollAndProduceManyResult.java} | 23 +-- .../PollAndProduceResult.java} | 23 +-- 12 files changed, 406 insertions(+), 167 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java rename spring-kafka/src/main/java/org/springframework/kafka/core/{ => parallelconsumer}/ParallelConsumerCallback.java (76%) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollCallback.java => parallelconsumer/Poll.java} (80%) rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceResultCallback.java => parallelconsumer/PollAndProduce.java} (72%) rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceManyResultCallback.java => parallelconsumer/PollAndProduceMany.java} (73%) rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceCallback.java => parallelconsumer/PollAndProduceManyResult.java} (51%) rename spring-kafka/src/main/java/org/springframework/kafka/core/{PollAndProduceManyCallback.java => parallelconsumer/PollAndProduceResult.java} (50%) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java new file mode 100644 index 0000000000..a383383755 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java @@ -0,0 +1,14 @@ +package org.springframework.kafka.config; + +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.type.AnnotatedTypeMetadata; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; + +public class OnMissingParallelConsumerOptionsProviderCondition implements Condition { + + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + return context.getBeanFactory().getBean(ParallelConsumerOptionsProvider.class) == null; + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java index 0ad99e6bb2..8cb5557f35 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java @@ -16,18 +16,15 @@ package org.springframework.kafka.config; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; import org.apache.kafka.clients.consumer.Consumer; import io.confluent.parallelconsumer.ParallelConsumerOptions; -import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; import org.apache.kafka.clients.producer.Producer; -import org.springframework.util.StringUtils; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; import org.springframework.kafka.annotation.EnableParallelConsumer; + // It would be better to be migrated to org.springframework.boot.autoconfigure.kafka.KafkaProperties. /** @@ -37,40 +34,16 @@ * @since 3.3 */ -public class ParallelConsumerConfig { +public class ParallelConsumerConfig { public static final String DEFAULT_BEAN_NAME = "parallelConsumerConfig"; - private static final String PARALLEL_CONSUMER_MAX_CONCURRENCY = "PARALLEL_CONSUMER_MAX_CONCURRENCY"; - private static final String PARALLEL_CONSUMER_ORDERING = "PARALLEL_CONSUMER_ORDERING"; - private static final String ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT = "ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT"; - private static final String COMMIT_LOCK_ACQUISITION_TIMEOUT = "COMMIT_LOCK_ACQUISITION_TIMEOUT"; - private static final String COMMIT_INTERVAL = "COMMIT_INTERVAL"; - private final Map properties = new HashMap<>(); - - public ParallelConsumerConfig() { - - final String maxConcurrency = System.getenv(PARALLEL_CONSUMER_MAX_CONCURRENCY); - final String ordering = System.getenv(PARALLEL_CONSUMER_ORDERING); - final String allowEagerProcessingDuringTransactionCommit = System.getenv(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT); - final String commitLockAcquisitionTimeout = System.getenv(COMMIT_LOCK_ACQUISITION_TIMEOUT); - final String commitInterval = System.getenv(COMMIT_INTERVAL); - - this.properties.put(PARALLEL_CONSUMER_MAX_CONCURRENCY, maxConcurrency); - this.properties.put(PARALLEL_CONSUMER_ORDERING, ordering); - this.properties.put(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT, allowEagerProcessingDuringTransactionCommit); - this.properties.put(COMMIT_LOCK_ACQUISITION_TIMEOUT, commitLockAcquisitionTimeout); - this.properties.put(COMMIT_INTERVAL, commitInterval); - } + private final ParallelConsumerOptionsProvider provider; - private ProcessingOrder toOrder(String order) { - return switch (order) { - case "partition" -> ProcessingOrder.PARTITION; - case "unordered" -> ProcessingOrder.UNORDERED; - default -> ProcessingOrder.KEY; // Confluent Consumer Default Policy - }; + public ParallelConsumerConfig(ParallelConsumerOptionsProvider provider) { + this.provider = provider; } - public ParallelConsumerOptions toConsumerOptions( + public ParallelConsumerOptions toConsumerOptions( ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder, Consumer consumer, Producer producer) { @@ -79,43 +52,110 @@ public ParallelConsumerOptions toConsumerOptions( return toConsumerOptions(builder, consumer); } - public ParallelConsumerOptions toConsumerOptions( + public ParallelConsumerOptions toConsumerOptions( ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder, Consumer consumer) { - builder.consumer(consumer); + return buildRemainOptions(builder); + } + + private ParallelConsumerOptions buildRemainOptions(ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder) { + if (this.provider.managedExecutorService() != null){ + builder.managedExecutorService(this.provider.managedExecutorService()); + } - final String maxConcurrencyString = this.properties.get(PARALLEL_CONSUMER_MAX_CONCURRENCY); - final String orderingString = this.properties.get(PARALLEL_CONSUMER_ORDERING); - final String allowEagerProcessingDuringTransactionCommitString = this.properties.get(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT); - final String commitLockAcquisitionTimeoutString = this.properties.get(COMMIT_LOCK_ACQUISITION_TIMEOUT); - final String commitIntervalString = this.properties.get(COMMIT_INTERVAL); + if (this.provider.managedThreadFactory() != null){ + builder.managedThreadFactory(this.provider.managedThreadFactory()); + } + + if (this.provider.meterRegistry() != null){ + builder.meterRegistry(this.provider.meterRegistry()); + } + + if (this.provider.pcInstanceTag() != null){ + builder.pcInstanceTag(this.provider.pcInstanceTag()); + } + + if (this.provider.metricsTags() != null){ + builder.metricsTags(this.provider.metricsTags()); + } + + if (this.provider.allowEagerProcessingDuringTransactionCommit() != null){ + builder.allowEagerProcessingDuringTransactionCommit(this.provider.allowEagerProcessingDuringTransactionCommit()); + } + + if (this.provider.commitLockAcquisitionTimeout() != null){ + builder.commitLockAcquisitionTimeout(this.provider.commitLockAcquisitionTimeout()); + } + + if (this.provider.produceLockAcquisitionTimeout() != null){ + builder.produceLockAcquisitionTimeout(this.provider.produceLockAcquisitionTimeout()); + } + + if (this.provider.commitInterval() != null){ + builder.commitInterval(this.provider.commitInterval()); + } - if (StringUtils.hasText(maxConcurrencyString)) { - final Integer maxConcurrency = Integer.valueOf(maxConcurrencyString); - builder.maxConcurrency(maxConcurrency); + if (this.provider.ordering() != null){ + builder.ordering(this.provider.ordering()); } - if (StringUtils.hasText(orderingString)) { - final ProcessingOrder processingOrder = toOrder(orderingString); - builder.ordering(processingOrder); + if (this.provider.commitMode() != null){ + builder.commitMode(this.provider.commitMode()); } - if (StringUtils.hasText(allowEagerProcessingDuringTransactionCommitString)) { - final Boolean allowEagerProcessingDuringTransactionCommit = Boolean.valueOf(allowEagerProcessingDuringTransactionCommitString); - builder.allowEagerProcessingDuringTransactionCommit(allowEagerProcessingDuringTransactionCommit); + if (this.provider.maxConcurrency() != null){ + builder.maxConcurrency(this.provider.maxConcurrency()); } - if (StringUtils.hasText(commitLockAcquisitionTimeoutString)) { - final Long commitLockAcquisitionTimeout = Long.valueOf(commitLockAcquisitionTimeoutString); - builder.commitLockAcquisitionTimeout(Duration.ofSeconds(commitLockAcquisitionTimeout)); + if (this.provider.invalidOffsetMetadataPolicy() != null){ + builder.invalidOffsetMetadataPolicy(this.provider.invalidOffsetMetadataPolicy()); } - if (StringUtils.hasText(commitIntervalString)) { - final Long commitInterval = Long.valueOf(commitIntervalString); - builder.commitInterval(Duration.ofMillis(commitInterval)); + if (this.provider.retryDelayProvider() != null){ + builder.retryDelayProvider(this.provider.retryDelayProvider()); + } + + if (this.provider.sendTimeout() != null){ + builder.sendTimeout(this.provider.sendTimeout()); + } + + if (this.provider.offsetCommitTimeout() != null){ + builder.offsetCommitTimeout(this.provider.offsetCommitTimeout()); + } + + if (this.provider.batchSize() != null){ + builder.batchSize(this.provider.batchSize()); + } + + if (this.provider.thresholdForTimeSpendInQueueWarning() != null){ + builder.thresholdForTimeSpendInQueueWarning(this.provider.thresholdForTimeSpendInQueueWarning()); + } + + if (this.provider.maxFailureHistory() != null){ + builder.maxFailureHistory(this.provider.maxFailureHistory()); + } + + if (this.provider.shutdownTimeout() != null){ + builder.shutdownTimeout(this.provider.shutdownTimeout()); + } + + if (this.provider.drainTimeout() != null){ + builder.drainTimeout(this.provider.drainTimeout()); + } + + if (this.provider.messageBufferSize() != null){ + builder.messageBufferSize(this.provider.messageBufferSize()); + } + + if (this.provider.initialLoadFactor() != null){ + builder.initialLoadFactor(this.provider.initialLoadFactor()); + } + if (this.provider.maximumLoadFactor() != null){ + builder.maximumLoadFactor(this.provider.maximumLoadFactor()); } return builder.build(); } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java index ebf1c011d4..da5e556826 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java @@ -16,14 +16,14 @@ package org.springframework.kafka.config; -import javax.annotation.Nullable; - import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.ParallelConsumerCallback; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback; import org.springframework.kafka.core.ParallelConsumerFactory; import org.springframework.kafka.annotation.EnableParallelConsumer; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; /** * If User decide to use parallelConsumer on SpringKafka, User should import this class to their ComponentScan scopes. @@ -37,22 +37,29 @@ public class ParallelConsumerConfiguration { + @Bean + @Conditional(OnMissingParallelConsumerOptionsProviderCondition.class) + public ParallelConsumerOptionsProvider parallelConsumerOptionsProvider() { + return new ParallelConsumerOptionsProvider() {}; + } + @Bean(name = ParallelConsumerConfig.DEFAULT_BEAN_NAME) - public ParallelConsumerConfig parallelConsumerConfig() { - return new ParallelConsumerConfig(); + public ParallelConsumerConfig parallelConsumerConfig(ParallelConsumerOptionsProvider parallelConsumerOptionsProvider) { + return new ParallelConsumerConfig(parallelConsumerOptionsProvider); } @Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME) - public ParallelConsumerContext parallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, + public ParallelConsumerContext parallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, ParallelConsumerCallback parallelConsumerCallback) { - return new ParallelConsumerContext(parallelConsumerConfig, - parallelConsumerCallback); + return new ParallelConsumerContext(parallelConsumerConfig, + parallelConsumerCallback); } @Bean(name = ParallelConsumerFactory.DEFAULT_BEAN_NAME) public ParallelConsumerFactory parallelConsumerFactory(DefaultKafkaConsumerFactory consumerFactory, DefaultKafkaProducerFactory producerFactory, ParallelConsumerContext parallelConsumerContext) { - return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory, producerFactory); + return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory, producerFactory); } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java index 2c01b2786c..4bb2579a13 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java @@ -16,17 +16,13 @@ package org.springframework.kafka.config; -import java.time.Duration; - import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; -import org.springframework.kafka.core.ParallelConsumerCallback; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback; -import io.confluent.parallelconsumer.JStreamParallelStreamProcessor; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder; import io.confluent.parallelconsumer.ParallelStreamProcessor; -import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode; /** * This class is for aggregating all related with ParallelConsumer. @@ -41,7 +37,7 @@ public class ParallelConsumerContext { private final ParallelConsumerConfig parallelConsumerConfig; private final ParallelConsumerCallback parallelConsumerCallback; - public ParallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, + public ParallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, ParallelConsumerCallback callback) { this.parallelConsumerConfig = parallelConsumerConfig; this.parallelConsumerCallback = callback; @@ -61,9 +57,4 @@ public ParallelConsumerOptions getParallelConsumerOptions(Consumer c return parallelConsumerConfig.toConsumerOptions(builder, consumer, producer); } - public void stop(ParallelStreamProcessor parallelStreamProcessor) { - parallelStreamProcessor.close(); - - } - } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java index 4cea9234c9..43b77c1df1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java @@ -16,29 +16,32 @@ package org.springframework.kafka.core; +import java.time.Duration; import java.util.Collection; import java.util.List; -import java.util.Objects; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.springframework.context.SmartLifecycle; import org.springframework.kafka.config.ParallelConsumerContext; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback; +import org.springframework.kafka.core.parallelconsumer.PollAndProduce; +import org.springframework.kafka.core.parallelconsumer.PollAndProduceMany; +import org.springframework.kafka.core.parallelconsumer.PollAndProduceManyResult; +import org.springframework.kafka.core.parallelconsumer.PollAndProduceResult; +import org.springframework.kafka.core.parallelconsumer.Poll; -import io.confluent.parallelconsumer.ParallelConsumer; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelStreamProcessor; -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; +import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode; /** * ParallelConsumerFactory will be started and closed by Spring LifeCycle. * This class is quite simple, because ParallelConsumer requires delegating the situation to itself. * @author Sanghyeok An - * @since 3.2.0 + * @since 3.3 */ public class ParallelConsumerFactory implements SmartLifecycle { @@ -48,7 +51,6 @@ public class ParallelConsumerFactory implements SmartLifecycle { private final DefaultKafkaConsumerFactory defaultKafkaConsumerFactory; private final DefaultKafkaProducerFactory defaultKafkaProducerFactory; private final ParallelConsumerContext parallelConsumerContext; - private final ParallelStreamProcessor parallelConsumer; private final ParallelConsumerOptions parallelConsumerOptions; private boolean running; @@ -70,8 +72,8 @@ public ParallelConsumerFactory(ParallelConsumerContext parallelConsumerCon private ParallelConsumerOptions parallelConsumerOptions(Consumer consumer, Producer producer) { final ParallelConsumerCallback callback = parallelConsumerContext.parallelConsumerCallback(); - if (callback instanceof PollAndProduceManyCallback || - callback instanceof PollAndProduceCallback) { + if (callback instanceof PollAndProduceMany || + callback instanceof PollAndProduce) { return parallelConsumerContext.getParallelConsumerOptions(consumer, producer); } else { return parallelConsumerContext.getParallelConsumerOptions(consumer); @@ -85,35 +87,40 @@ public void start() { final ParallelConsumerCallback callback0 = parallelConsumerContext.parallelConsumerCallback(); if (callback0 instanceof ResultConsumerCallback) { - if (callback0 instanceof PollAndProduceManyResultCallback) { - final PollAndProduceManyResultCallback callback = - (PollAndProduceManyResultCallback) callback0; + if (callback0 instanceof PollAndProduceManyResult) { + final PollAndProduceManyResult callback = + (PollAndProduceManyResult) callback0; this.parallelConsumer.pollAndProduceMany(callback::accept, callback::resultConsumer); - } else if (callback0 instanceof PollAndProduceCallback) { - final PollAndProduceResultCallback callback = - (PollAndProduceResultCallback) callback0; + } + else if (callback0 instanceof PollAndProduce) { + final PollAndProduceResult callback = + (PollAndProduceResult) callback0; this.parallelConsumer.pollAndProduce(callback::accept, callback::resultConsumer); - } else { + } + else { throw new UnsupportedOperationException(); } } else { - if (callback0 instanceof PollAndProduceManyCallback) { - final PollAndProduceManyCallback callback = - (PollAndProduceManyCallback) callback0; + if (callback0 instanceof PollAndProduceMany) { + final PollAndProduceMany callback = + (PollAndProduceMany) callback0; this.parallelConsumer.pollAndProduceMany(callback::accept); - } else if (callback0 instanceof PollAndProduceCallback) { - final PollAndProduceCallback callback = - (PollAndProduceCallback) callback0; + } + else if (callback0 instanceof PollAndProduce) { + final PollAndProduce callback = + (PollAndProduce) callback0; this.parallelConsumer.pollAndProduce(callback::accept); - } else if (callback0 instanceof PollCallback) { - final PollCallback callback = (PollCallback) callback0; + } + else if (callback0 instanceof Poll) { + final Poll callback = (Poll) callback0; this.parallelConsumer.poll(callback::accept); - } else { + } + else { throw new UnsupportedOperationException(); } } @@ -122,7 +129,12 @@ public void start() { @Override public void stop() { - this.parallelConsumerContext.stop(this.parallelConsumer); + final ParallelConsumerCallback callback = + this.parallelConsumerContext.parallelConsumerCallback(); + final DrainingMode drainingMode = callback.drainingMode(); + final Duration duration = callback.drainTimeOut(); + + this.parallelConsumer.close(duration, drainingMode); this.running = false; } @@ -139,24 +151,28 @@ private void subscribe() { if (topics != null && !topics.isEmpty()) { subscribe(topics, rebalanceListener); - } else { + } + else { subscribe(callback.getSubscribeTopicsPattern(), rebalanceListener); } } - private void subscribe(Collection topics, ConsumerRebalanceListener callback){ - if (callback == null) { + private void subscribe(Collection topics, ConsumerRebalanceListener listenerCallback){ + if (listenerCallback == null) { this.parallelConsumer.subscribe(topics); - } else { - this.parallelConsumer.subscribe(topics, callback); + } + else { + this.parallelConsumer.subscribe(topics, listenerCallback); } } - private void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - if (callback == null) { + private void subscribe(Pattern pattern, ConsumerRebalanceListener listenerCallback) { + if (listenerCallback == null) { this.parallelConsumer.subscribe(pattern); - } else { - this.parallelConsumer.subscribe(pattern, callback); + } + else { + this.parallelConsumer.subscribe(pattern, listenerCallback); } } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java similarity index 76% rename from spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java index 6c3072a15f..29a995580f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerCallback.java @@ -14,23 +14,25 @@ * limitations under the License. */ -package org.springframework.kafka.core; +package org.springframework.kafka.core.parallelconsumer; +import java.time.Duration; import java.util.List; import java.util.regex.Pattern; -import io.confluent.parallelconsumer.PollContext; - import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.config.ParallelConsumerContext; +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode; /** * User should create ConcreteClass of this and register it as Spring Bean. * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext}, * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start. - * @author ... - * @since 3.2.0 + * + * @author Sanghyeok An + * @since 3.3 */ public interface ParallelConsumerCallback { @@ -45,4 +47,12 @@ default Pattern getSubscribeTopicsPattern(){ default ConsumerRebalanceListener getRebalanceListener(){ return null; } + default DrainingMode drainingMode() { + return DrainingMode.DONT_DRAIN; + } + + default Duration drainTimeOut() { + return Duration.ofMillis(0); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java new file mode 100644 index 0000000000..09bfb13029 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java @@ -0,0 +1,168 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.time.Duration; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import org.springframework.kafka.config.ParallelConsumerContext; +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode; +import io.confluent.parallelconsumer.ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; +import io.confluent.parallelconsumer.RecordContext; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; + +/** + * + * @author Sanghyeok An + * @since 3.3 + */ + +public interface ParallelConsumerOptionsProvider { + + default void hello() { + ParallelConsumerOptionsBuilder builder = ParallelConsumerOptions.builder(); + } + + @Nullable + default String managedExecutorService() { + return null; + } + + @Nullable + default String managedThreadFactory() { + return null; + } + + @Nullable + default MeterRegistry meterRegistry() { + return null; + } + + @Nullable + default String pcInstanceTag() { + return null; + } + + @Nullable + default Iterable metricsTags() { + return null; + } + + @Nullable + default Boolean allowEagerProcessingDuringTransactionCommit() { + return null; + } + + @Nullable + default Duration commitLockAcquisitionTimeout() { + return null; + } + + @Nullable + default Duration produceLockAcquisitionTimeout() { + return null; + } + + @Nullable + default Duration commitInterval() { + return null; + } + + @Nullable + default ProcessingOrder ordering() { + return null; + } + + @Nullable + default CommitMode commitMode() { + return null; + } + + @Nullable + default Integer maxConcurrency() { + return null; + } + + @Nullable + default InvalidOffsetMetadataHandlingPolicy invalidOffsetMetadataPolicy() { + return null; + } + + @Nullable + default Function, Duration> retryDelayProvider() { + return null; + } + + @Nullable + default Duration sendTimeout() { + return null; + } + + @Nullable + default Duration offsetCommitTimeout() { + return null; + } + + @Nullable + default Integer batchSize() { + return null; + } + + @Nullable + default Duration thresholdForTimeSpendInQueueWarning () { + return null; + } + + @Nullable + default Integer maxFailureHistory() { + return null; + } + + @Nullable + default Duration shutdownTimeout() { + return null; + } + + @Nullable + default Duration drainTimeout() { + return null; + } + + @Nullable + default Integer messageBufferSize() { + return null; + } + + @Nullable + default Integer initialLoadFactor() { + return null; + } + + @Nullable + default Integer maximumLoadFactor() { + return null; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java similarity index 80% rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java index 5b7e765dc9..bfa3f7dbd3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java @@ -14,14 +14,10 @@ * limitations under the License. */ -package org.springframework.kafka.core; +package org.springframework.kafka.core.parallelconsumer; -import java.util.List; -import java.util.regex.Pattern; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.config.ParallelConsumerContext; +import org.springframework.kafka.core.ParallelConsumerFactory; import io.confluent.parallelconsumer.PollContext; @@ -29,11 +25,12 @@ * User should create ConcreteClass of this and register it as Spring Bean. * Concrete class of ParallelConsumerCallback will be registered {@link ParallelConsumerContext}, * and then it will be used in {@link ParallelConsumerFactory} when ParallelConsumerFactory start. - * @author ... - * @since 3.2.0 + * + * @author Sanghyeok An + * @since 3.3 */ -public interface PollCallback extends ParallelConsumerCallback { +public interface Poll extends ParallelConsumerCallback { /** * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}. @@ -42,4 +39,5 @@ public interface PollCallback extends ParallelConsumerCallback { * @return void. */ void accept(PollContext context); + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java similarity index 72% rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java index 57f1f68732..1b00ed251d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceResultCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java @@ -14,18 +14,24 @@ * limitations under the License. */ -package org.springframework.kafka.core; - -import java.util.function.Consumer; +package org.springframework.kafka.core.parallelconsumer; import org.apache.kafka.clients.producer.ProducerRecord; -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; import io.confluent.parallelconsumer.PollContext; /** * ... + * + * @author Sanghyeok An + * @since 3.3 */ -public interface PollAndProduceResultCallback extends PollAndProduceCallback, ResultConsumerCallback { +public interface PollAndProduce extends ParallelConsumerCallback { + + /** + * ... + */ + ProducerRecord accept(PollContext context); + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java similarity index 73% rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java index 885991cf3c..2d46391c39 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyResultCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java @@ -14,19 +14,26 @@ * limitations under the License. */ -package org.springframework.kafka.core; +package org.springframework.kafka.core.parallelconsumer; import java.util.List; -import java.util.function.Consumer; import org.apache.kafka.clients.producer.ProducerRecord; -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; import io.confluent.parallelconsumer.PollContext; /** * ... + * + * @author Sanghyeok An + * @since 3.3 */ -public interface PollAndProduceManyResultCallback extends PollAndProduceManyCallback, ResultConsumerCallback { +public interface PollAndProduceMany extends ParallelConsumerCallback { + + /** + * ... + */ + List> accept(PollContext context); + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java similarity index 51% rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java index 3d6d27f253..b9be8d9db0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java @@ -14,27 +14,18 @@ * limitations under the License. */ -package org.springframework.kafka.core; +package org.springframework.kafka.core.parallelconsumer; -import java.util.List; -import java.util.function.Consumer; -import java.util.regex.Pattern; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.kafka.config.ParallelConsumerContext; - -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; -import io.confluent.parallelconsumer.PollContext; +import org.springframework.kafka.core.ResultConsumerCallback; /** * ... + * + * @author Sanghyeok An + * @since 3.3 */ -public interface PollAndProduceCallback extends ParallelConsumerCallback { +public interface PollAndProduceManyResult extends PollAndProduceMany, + ResultConsumerCallback { - /** - * ... - */ - ProducerRecord accept(PollContext context); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java similarity index 50% rename from spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java rename to spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java index 8868f9ffe5..fca8fe9d5a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/PollAndProduceManyCallback.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java @@ -14,27 +14,18 @@ * limitations under the License. */ -package org.springframework.kafka.core; +package org.springframework.kafka.core.parallelconsumer; -import java.util.List; -import java.util.function.Consumer; -import java.util.regex.Pattern; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.kafka.config.ParallelConsumerContext; - -import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; -import io.confluent.parallelconsumer.PollContext; +import org.springframework.kafka.core.ResultConsumerCallback; /** * ... + * + * @author Sanghyeok An + * @since 3.3 */ -public interface PollAndProduceManyCallback extends ParallelConsumerCallback { +public interface PollAndProduceResult extends PollAndProduce, + ResultConsumerCallback { - /** - * ... - */ - List> accept(PollContext context); }