Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A better way to configure topics with autoconfig #3527

Closed
StephenFlavin opened this issue Oct 4, 2024 · 3 comments
Closed

A better way to configure topics with autoconfig #3527

StephenFlavin opened this issue Oct 4, 2024 · 3 comments

Comments

@StephenFlavin
Copy link

Expected Behavior

Topics can be configured with all that automagic goodness

Current Behavior

Topics are not part of auto configuration.

Context

Currently the topics to be produced to or consumed from are not part of the spring-boot auto configuration, meaning there's no way to create topics via configs or completely configure the @KafkaListener annotation as the topics property must always be specified, driving the topics value from config must be done with SpEL which can get a bit ugly when you're trying to consume from multiple topics.
Similarly configuring by topic serialization can get difficult to read.

I propose that this config should sit under spring.kafka.topics with the following functionality:

spring.kafka.topics:
  create-topics: true                                                             # a way to disable automatic topic creation [boolean optional default=false]
  partitions: 2                                                                   # defines the number of partitions to create topics with [numeric optional default=1]
  replicas: 2                                                                     # defines the number of partitions to create topics with [numeric optional default=1]
  compact: true                                                                   # defines compact cleanup policy to create topics with [boolean optional default=false]
  topic-properties:                                                               # allows `org.apache.kafka.clients.producer.ProducerConfig` values to be used for topic creation  [map optional default={}]
    retention.ms: 720000
  sensor-events:                                                                  # alias/user friendly name for topic
    name: ...-sensor-events                                                       # full topic name to consume from [string required]
    enabled: true                                                                 # provides a way to stop consuming from this topic in config [boolean optional default=true]
    key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer     # configures the by-topic-serializer
    value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer          # configures the by-topic-serializer
    key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer # configures the by-topic-deserializer
    value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer      # configures the by-topic-deserializer
    partitions: 3                                                                 # overwrite for default above
    replicas: 3                                                                   # overwrite for default above
    compact: false                                                                # overwrite for default above
    topic-properties:                                                             # overwrite for default above
      retention.ms: 360000
  ...-events:
    name: ...
    ...

the serialisation configs could be cleaned up with Serde's potentially.

I've been able to get some of this done locally.

WIP Implementation:
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@AutoConfiguration
@AutoConfigureAfter(KafkaAutoConfiguration.class)
public class KafkaTopicAutoConfiguration {

  @Bean
  @ConfigurationProperties(prefix = "spring.kafka.topics")
  public TopicsConfig topicsConfig() {
      return new TopicsConfig();
  }

  @Bean
  public String[] topics(final TopicsConfig topicsConfig) {
      return topicsConfig.getByTopicConfig().values().stream()
              .filter(TopicConfig::enabled)
              .map(TopicConfig::name)
              .toArray(String[]::new);
  }

  @Bean
  public String[] realtimeTopics(final TopicsConfig topicsConfig) {
      return topicsConfig.getByTopicConfig().entrySet().stream()
              .filter(entry -> !entry.getValue().compact() && entry.getValue().enabled())
              .map(Map.Entry::getKey)
              .toArray(String[]::new);
  }

  @Bean
  public String[] compactTopics(final TopicsConfig topicsConfig) {
      return topicsConfig.getByTopicConfig().entrySet().stream()
              .filter(entry -> entry.getValue().compact() && entry.getValue().enabled())
              .map(Map.Entry::getKey)
              .toArray(String[]::new);
  }

  @Bean
  @ConditionalOnProperty("spring.kafka.topics.compact-topic-group-id")
  public Header compactTopicRecordHeader(final TopicsConfig topicsConfig) {
      return new RecordHeader("compactTopicGroupId", topicsConfig.getCompactTopicGroupId().getBytes(StandardCharsets.UTF_8));
  }

  @Bean
  @ConditionalOnProperty(value = "spring.kafka.topics.create-topics", havingValue = "true")
  public KafkaAdmin.NewTopics newTopics(final TopicsConfig topicsConfig) {
      final var newTopics = topicsConfig.getByTopicConfig().values().stream()
              .filter(TopicConfig::enabled)
              .map(topicConfig -> {
                  final var topicBuilder = TopicBuilder.name(topicConfig.name())
                          .partitions(Optional.ofNullable(topicConfig.partitions())
                                  .orElse(topicsConfig.getPartitions()))
                          .replicas(Optional.ofNullable(topicConfig.replicas())
                                  .orElse(topicsConfig.getReplicas()))
                          .configs(Optional.ofNullable(topicConfig.topicProperties())
                                  .map(topicProperties -> Stream.of(topicProperties, topicsConfig.getTopicProperties())
                                          .filter(Objects::nonNull)
                                          .map(Map::entrySet)
                                          .flatMap(Collection::stream)
                                          .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
                                                  Map.Entry::getValue,
                                                  (topicProperty, defaultTopicProperty) -> topicProperty)))
                                  .orElse(topicsConfig.getTopicProperties()));
                  if (topicConfig.compact()) {
                      topicBuilder.compact();
                  }
                  return topicBuilder.build();
              })
              .toArray(NewTopic[]::new);

      return new KafkaAdmin.NewTopics(newTopics);
  }

  @Bean
  public Map<String, KafkaTemplate<Object, Object>> byTopicKafkaTemplate(final TopicsConfig topicsConfig,
                                                                         final KafkaProperties kafkaProperties) {
      return topicsConfig.getByTopicConfig().entrySet().stream()
              .filter(entry -> entry.getValue().enabled())
              .filter(entry -> entry.getValue().keySerializer() != null || entry.getValue().valueSerializer() != null)
              .map(topicConfig -> {
                  var producerConfig = new HashMap<>(kafkaProperties.buildProducerProperties(null));

                  if (topicConfig.getValue().keySerializer() != null) {
                      producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, topicConfig.getValue().keySerializer());
                  }
                  if (topicConfig.getValue().valueDeserializer() != null) {
                      producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, topicConfig.getValue().valueDeserializer());
                  }

                  final var producerFactory = new DefaultKafkaProducerFactory<>(producerConfig);

                  final var kafkaTemplate = new KafkaTemplate<>(producerFactory);
                  kafkaTemplate.setDefaultTopic(topicConfig.getValue().name());

                  return Map.entry(topicConfig.getKey(), kafkaTemplate);
              })
              .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
                      Map.Entry::getValue));
  }

  @Bean
  @Order(Ordered.HIGHEST_PRECEDENCE)
  public DefaultKafkaConsumerFactoryCustomizer kafkaConsumerFactoryCustomizer(final TopicsConfig topicsConfig) {
      final var byTopicKeyDeserializerConfig = topicsConfig.getByTopicConfig().values().stream()
              .filter(TopicConfig::enabled)
              .filter(topicConfig -> topicConfig.keyDeserializer() != null)
              .map(topicConfig -> toByTopicPattern(topicConfig.name(), topicConfig.keyDeserializer()))
              .collect(Collectors.joining(","));

      final var byTopicValueDeserializerConfig = topicsConfig.getByTopicConfig().values().stream()
              .filter(TopicConfig::enabled)
              .filter(topicConfig -> topicConfig.valueDeserializer() != null)
              .map(topicConfig -> toByTopicPattern(topicConfig.name(), topicConfig.valueDeserializer()))
              .collect(Collectors.joining(","));

      return consumerFactory -> consumerFactory.updateConfigs(Map.of(
              "spring.kafka.key.serialization.bytopic.config", byTopicKeyDeserializerConfig,
              "spring.kafka.value.serialization.bytopic.config", byTopicValueDeserializerConfig,
              "spring.kafka.filtering.by.headers", "compactTopicGroupId:" + topicsConfig.getCompactTopicGroupId()));
  }

  private static String toByTopicPattern(String topic, String deserializer) {
      return topic + ":" + deserializer;
  }
}

My @KafkaListener is configured like this

    @KafkaListener(
            topics = "#{@realtimeTopics}",
            autoStartup = "true",
            batch = "true")

and the automatic generation of the byTopicKafkaTemplate has been useful for writing integration tests though I wasn't aware of DelegatingByTopicSerializer.java at the time which may be preferable.

This is still a WIP on my end and I still have questions, such as how best to integrate this with async retries and dead letter topics, ideally for my case I would have 1 topic for dead letters where the serialization of the messages mimics the source topic and the deserialization uses the source topic name from the headers to deserialize.

would love to hear thoughts or be told I've completely missed something that's already implemented 🙏

Apologies if I should be raising this in https://github.com/spring-projects/spring-boot

@sobychacko
Copy link
Contributor

Introducing properties under spring.kafka.topic or spring.kafka.topics should be a Boot consideration, as all such configuration properties reside in Boot, not Spring Kafka. If we decide to provide these as Boot properties, we need to carefully evaluate which properties to add. Spring Boot typically selects important properties and offers customization options for others.
We noticed you're passing topic-specific serializers/deserializers in your configuration. However, the Kafka client doesn't provide a way to set ser/des like that. Spring Kafka's delegating serializer/deserializer is a higher-level abstraction that uses the appropriate ser/de at runtime based on the topic. Therefore, those properties likely can't be supported as presented, even if we support topic properties at the Boot level.

Cc @wilkinsona, What are your thoughts on this? If you believe this needs discussion on the Boot side, please feel free to transfer this issue to the Spring Boot repository. Thank you!

@wilkinsona
Copy link
Member

My first impression is that there's at least some overlap with spring-projects/spring-boot#39066. I think we can probably use that issue for now. It already contains a few different suggestions and we'll have to consider all of them to figure out the right way to proceed. @StephenFlavin please feel free to add a comment to spring-projects/spring-boot#39066 where you feel that it doesn't already capture your suggestions.

@StephenFlavin
Copy link
Author

Thanks both, I'll close this issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants