Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenchickenlove committed Apr 27, 2024
1 parent 5be340d commit 4eca979
Show file tree
Hide file tree
Showing 12 changed files with 406 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.springframework.kafka.config;

import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider;

public class OnMissingParallelConsumerOptionsProviderCondition implements Condition {

@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
return context.getBeanFactory().getBean(ParallelConsumerOptionsProvider.class) == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,15 @@

package org.springframework.kafka.config;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.Consumer;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;

import org.apache.kafka.clients.producer.Producer;
import org.springframework.util.StringUtils;
import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider;
import org.springframework.kafka.annotation.EnableParallelConsumer;

// It would be better to be migrated to org.springframework.boot.autoconfigure.kafka.KafkaProperties.

/**
Expand All @@ -37,40 +34,16 @@
* @since 3.3
*/

public class ParallelConsumerConfig {
public class ParallelConsumerConfig<K, V> {

public static final String DEFAULT_BEAN_NAME = "parallelConsumerConfig";
private static final String PARALLEL_CONSUMER_MAX_CONCURRENCY = "PARALLEL_CONSUMER_MAX_CONCURRENCY";
private static final String PARALLEL_CONSUMER_ORDERING = "PARALLEL_CONSUMER_ORDERING";
private static final String ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT = "ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT";
private static final String COMMIT_LOCK_ACQUISITION_TIMEOUT = "COMMIT_LOCK_ACQUISITION_TIMEOUT";
private static final String COMMIT_INTERVAL = "COMMIT_INTERVAL";
private final Map<String, String> properties = new HashMap<>();

public ParallelConsumerConfig() {

final String maxConcurrency = System.getenv(PARALLEL_CONSUMER_MAX_CONCURRENCY);
final String ordering = System.getenv(PARALLEL_CONSUMER_ORDERING);
final String allowEagerProcessingDuringTransactionCommit = System.getenv(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT);
final String commitLockAcquisitionTimeout = System.getenv(COMMIT_LOCK_ACQUISITION_TIMEOUT);
final String commitInterval = System.getenv(COMMIT_INTERVAL);

this.properties.put(PARALLEL_CONSUMER_MAX_CONCURRENCY, maxConcurrency);
this.properties.put(PARALLEL_CONSUMER_ORDERING, ordering);
this.properties.put(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT, allowEagerProcessingDuringTransactionCommit);
this.properties.put(COMMIT_LOCK_ACQUISITION_TIMEOUT, commitLockAcquisitionTimeout);
this.properties.put(COMMIT_INTERVAL, commitInterval);
}
private final ParallelConsumerOptionsProvider<K, V> provider;

private ProcessingOrder toOrder(String order) {
return switch (order) {
case "partition" -> ProcessingOrder.PARTITION;
case "unordered" -> ProcessingOrder.UNORDERED;
default -> ProcessingOrder.KEY; // Confluent Consumer Default Policy
};
public ParallelConsumerConfig(ParallelConsumerOptionsProvider<K, V> provider) {
this.provider = provider;
}

public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(
public ParallelConsumerOptions<K, V> toConsumerOptions(
ParallelConsumerOptions.ParallelConsumerOptionsBuilder<K, V> builder,
Consumer<K, V> consumer,
Producer<K, V> producer) {
Expand All @@ -79,43 +52,110 @@ public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(
return toConsumerOptions(builder, consumer);
}

public <K,V> ParallelConsumerOptions<K, V> toConsumerOptions(
public ParallelConsumerOptions<K, V> toConsumerOptions(
ParallelConsumerOptions.ParallelConsumerOptionsBuilder<K, V> builder,
Consumer<K, V> consumer) {

builder.consumer(consumer);
return buildRemainOptions(builder);
}

private ParallelConsumerOptions<K, V> buildRemainOptions(ParallelConsumerOptions.ParallelConsumerOptionsBuilder<K, V> builder) {
if (this.provider.managedExecutorService() != null){
builder.managedExecutorService(this.provider.managedExecutorService());
}

final String maxConcurrencyString = this.properties.get(PARALLEL_CONSUMER_MAX_CONCURRENCY);
final String orderingString = this.properties.get(PARALLEL_CONSUMER_ORDERING);
final String allowEagerProcessingDuringTransactionCommitString = this.properties.get(ALLOW_EAGER_PROCESSING_DURING_TRANSACTION_COMMIT);
final String commitLockAcquisitionTimeoutString = this.properties.get(COMMIT_LOCK_ACQUISITION_TIMEOUT);
final String commitIntervalString = this.properties.get(COMMIT_INTERVAL);
if (this.provider.managedThreadFactory() != null){
builder.managedThreadFactory(this.provider.managedThreadFactory());
}

if (this.provider.meterRegistry() != null){
builder.meterRegistry(this.provider.meterRegistry());
}

if (this.provider.pcInstanceTag() != null){
builder.pcInstanceTag(this.provider.pcInstanceTag());
}

if (this.provider.metricsTags() != null){
builder.metricsTags(this.provider.metricsTags());
}

if (this.provider.allowEagerProcessingDuringTransactionCommit() != null){
builder.allowEagerProcessingDuringTransactionCommit(this.provider.allowEagerProcessingDuringTransactionCommit());
}

if (this.provider.commitLockAcquisitionTimeout() != null){
builder.commitLockAcquisitionTimeout(this.provider.commitLockAcquisitionTimeout());
}

if (this.provider.produceLockAcquisitionTimeout() != null){
builder.produceLockAcquisitionTimeout(this.provider.produceLockAcquisitionTimeout());
}

if (this.provider.commitInterval() != null){
builder.commitInterval(this.provider.commitInterval());
}

if (StringUtils.hasText(maxConcurrencyString)) {
final Integer maxConcurrency = Integer.valueOf(maxConcurrencyString);
builder.maxConcurrency(maxConcurrency);
if (this.provider.ordering() != null){
builder.ordering(this.provider.ordering());
}

if (StringUtils.hasText(orderingString)) {
final ProcessingOrder processingOrder = toOrder(orderingString);
builder.ordering(processingOrder);
if (this.provider.commitMode() != null){
builder.commitMode(this.provider.commitMode());
}

if (StringUtils.hasText(allowEagerProcessingDuringTransactionCommitString)) {
final Boolean allowEagerProcessingDuringTransactionCommit = Boolean.valueOf(allowEagerProcessingDuringTransactionCommitString);
builder.allowEagerProcessingDuringTransactionCommit(allowEagerProcessingDuringTransactionCommit);
if (this.provider.maxConcurrency() != null){
builder.maxConcurrency(this.provider.maxConcurrency());
}

if (StringUtils.hasText(commitLockAcquisitionTimeoutString)) {
final Long commitLockAcquisitionTimeout = Long.valueOf(commitLockAcquisitionTimeoutString);
builder.commitLockAcquisitionTimeout(Duration.ofSeconds(commitLockAcquisitionTimeout));
if (this.provider.invalidOffsetMetadataPolicy() != null){
builder.invalidOffsetMetadataPolicy(this.provider.invalidOffsetMetadataPolicy());
}

if (StringUtils.hasText(commitIntervalString)) {
final Long commitInterval = Long.valueOf(commitIntervalString);
builder.commitInterval(Duration.ofMillis(commitInterval));
if (this.provider.retryDelayProvider() != null){
builder.retryDelayProvider(this.provider.retryDelayProvider());
}

if (this.provider.sendTimeout() != null){
builder.sendTimeout(this.provider.sendTimeout());
}

if (this.provider.offsetCommitTimeout() != null){
builder.offsetCommitTimeout(this.provider.offsetCommitTimeout());
}

if (this.provider.batchSize() != null){
builder.batchSize(this.provider.batchSize());
}

if (this.provider.thresholdForTimeSpendInQueueWarning() != null){
builder.thresholdForTimeSpendInQueueWarning(this.provider.thresholdForTimeSpendInQueueWarning());
}

if (this.provider.maxFailureHistory() != null){
builder.maxFailureHistory(this.provider.maxFailureHistory());
}

if (this.provider.shutdownTimeout() != null){
builder.shutdownTimeout(this.provider.shutdownTimeout());
}

if (this.provider.drainTimeout() != null){
builder.drainTimeout(this.provider.drainTimeout());
}

if (this.provider.messageBufferSize() != null){
builder.messageBufferSize(this.provider.messageBufferSize());
}

if (this.provider.initialLoadFactor() != null){
builder.initialLoadFactor(this.provider.initialLoadFactor());
}
if (this.provider.maximumLoadFactor() != null){
builder.maximumLoadFactor(this.provider.maximumLoadFactor());
}

return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package org.springframework.kafka.config;

import javax.annotation.Nullable;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ParallelConsumerCallback;
import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback;
import org.springframework.kafka.core.ParallelConsumerFactory;
import org.springframework.kafka.annotation.EnableParallelConsumer;
import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider;

/**
* If User decide to use parallelConsumer on SpringKafka, User should import this class to their ComponentScan scopes.
Expand All @@ -37,22 +37,29 @@

public class ParallelConsumerConfiguration<K, V> {

@Bean
@Conditional(OnMissingParallelConsumerOptionsProviderCondition.class)
public ParallelConsumerOptionsProvider<K, V> parallelConsumerOptionsProvider() {
return new ParallelConsumerOptionsProvider<K, V>() {};
}

@Bean(name = ParallelConsumerConfig.DEFAULT_BEAN_NAME)
public ParallelConsumerConfig parallelConsumerConfig() {
return new ParallelConsumerConfig();
public ParallelConsumerConfig<K, V> parallelConsumerConfig(ParallelConsumerOptionsProvider<K, V> parallelConsumerOptionsProvider) {
return new ParallelConsumerConfig<K, V>(parallelConsumerOptionsProvider);
}

@Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME)
public ParallelConsumerContext<K,V> parallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig,
public ParallelConsumerContext<K,V> parallelConsumerContext(ParallelConsumerConfig<K, V> parallelConsumerConfig,
ParallelConsumerCallback<K, V> parallelConsumerCallback) {
return new ParallelConsumerContext(parallelConsumerConfig,
parallelConsumerCallback);
return new ParallelConsumerContext<K, V>(parallelConsumerConfig,
parallelConsumerCallback);
}

@Bean(name = ParallelConsumerFactory.DEFAULT_BEAN_NAME)
public ParallelConsumerFactory<K,V> parallelConsumerFactory(DefaultKafkaConsumerFactory<K,V> consumerFactory,
DefaultKafkaProducerFactory<K,V> producerFactory,
ParallelConsumerContext<K,V> parallelConsumerContext) {
return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory, producerFactory);
return new ParallelConsumerFactory<K, V>(parallelConsumerContext, consumerFactory, producerFactory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@

package org.springframework.kafka.config;

import java.time.Duration;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.kafka.core.ParallelConsumerCallback;
import org.springframework.kafka.core.parallelconsumer.ParallelConsumerCallback;

import io.confluent.parallelconsumer.JStreamParallelStreamProcessor;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode;

/**
* This class is for aggregating all related with ParallelConsumer.
Expand All @@ -41,7 +37,7 @@ public class ParallelConsumerContext<K,V> {
private final ParallelConsumerConfig parallelConsumerConfig;
private final ParallelConsumerCallback<K, V> parallelConsumerCallback;

public ParallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig,
public ParallelConsumerContext(ParallelConsumerConfig<K, V> parallelConsumerConfig,
ParallelConsumerCallback<K, V> callback) {
this.parallelConsumerConfig = parallelConsumerConfig;
this.parallelConsumerCallback = callback;
Expand All @@ -61,9 +57,4 @@ public ParallelConsumerOptions<K, V> getParallelConsumerOptions(Consumer<K, V> c
return parallelConsumerConfig.toConsumerOptions(builder, consumer, producer);
}

public void stop(ParallelStreamProcessor<K, V> parallelStreamProcessor) {
parallelStreamProcessor.close();

}

}
Loading

0 comments on commit 4eca979

Please sign in to comment.