Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Config for consumer/producer by topic Enhancements #39066

Open
monsdroid opened this issue Jan 9, 2024 · 6 comments
Open

Kafka Config for consumer/producer by topic Enhancements #39066

monsdroid opened this issue Jan 9, 2024 · 6 comments
Labels
type: enhancement A general enhancement
Milestone

Comments

@monsdroid
Copy link

monsdroid commented Jan 9, 2024

We recently inherited a Porject with a lot of Kafka. Standing on the shoulder of the Spring Giants and with the recent updates we removed as much boilerplate as possible and tried to solve whats possible via configuration.

Delegating Serializer and Deserializer

Delegating Serializer and Deserializer
It took us quite a while to figure why our config was not picked up.
Obiously a beginners fail, solution is here
If you use spring.kafka.value.serialization.bytopic.config (kafka property) you must set value-deserializer to org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer

A Working Config:

spring:
     kafka:
        ...
        consumer:
        ...
            key-deserializer: "org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer"
            value-deserializer: "org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer"
            properties:
            ...
                spring.kafka.key.serialization.bytopic.config: "${kafka.consumer.topic_one}:org.apache.kafka.common.serialization.StringDeserializer"
                spring.kafka.value.serialization.bytopic.config: "${kafka.consumer.topic_one}:com.example.custom.KafkaSomethiungDeserializer"
  1. Any chance that spring boot can warn about this or make those propeties its own so they can be autoconfigured?

  2. On the same Topic the value of the spring.kafka.value.serialization.bytopic.config is a comma separted list of "topicregex:some.com.package.hard.to.read.and.maintain.if.there.is.more.than.one.serializer"
    this list becomes hard to read/maintain.
    Beeing able to provide this list as "list" or even as map via yaml would be nice.

  3. To add some typesafety a Bean of ConsumerTopicToDeserializer or something similiar which autoconfiguration picks up to do it right and save us fools some time :-)

we used the customizer to add it before we found the solution up top

    @Bean
    fun customizeKafkaConsumerFactory(): DefaultKafkaConsumerFactoryCustomizer {
        return DefaultKafkaConsumerFactoryCustomizer {
            @Suppress("UNCHECKED_CAST")
            val factory = it as DefaultKafkaConsumerFactory<String, Any>
            run {
                factory.setKeyDeserializer(ErrorHandlingDeserializer(StringDeserializer()))
                factory.setValueDeserializer(
                    ErrorHandlingDeserializer(
                        DelegatingByTopicDeserializer(
                            mapOf(
                                Pattern.compile("SomeTopic") to SomerDeserializer(),
                            ),
                            JsonDeserializer<Any>(),
                        )
                    )
                )
            };
        }
    }
  1. How about some configured ErrorHandlingDeserializer
spring:
     kafka:
        ...
        consumer:
        ...
            key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            properties:
            ...
                spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer
                spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer

                spring.kafka.key.serialization.bytopic.config: "${kafka.consumer.topic_one}:org.apache.kafka.common.serialization.StringDeserializer"
                spring.kafka.value.serialization.bytopic.config: "${kafka.consumer.topic_one}:com.example.custom.KafkaSomethiungDeserializer"

Maybe you see some things which can be adressed in the Documentation and/or autoconfig.

Thanks for the great work!

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Jan 9, 2024
@wilkinsona
Copy link
Member

Thanks for the suggestions.

Any chance that spring boot can warn about this or make those properties its own so they can be auto-configured?

Boot's spring.kafka.consumer.key-deserializer maps to Kafka's key.deserializer property. The properties within spring.kafka.consumer.properties mean nothing to Spring Boot and are passed straight through. Ultimately, these properties are all in the Map<String, Object> that's used to create the Spring Kafka DefaultKafkaConsumerFactory(Map<String, Object>).

Given the above, I'm not sure that Boot is the right place to encode knowledge about the relationship between the key.deserializer property and the spring.kafka.key.serialization.bytopic.config property. It feels to me like Spring Kafka is better placed to do this as it's the project with the knowledge that you have to use a particular deserializer for spring.kafka.key.serialization.bytopic.config to have an effect. Furthermore, Spring Kafka performing this validation would also mean that it would benefit everyone, not just those using Spring Boot.

WDYT, @artembilan?

On the same topic, the value of the spring.kafka.value.serialization.bytopic.config is a comma-separated list [that] becomes hard to read/maintain. Being able to provide this list as "list" or even as map via yaml would be nice.

It's Spring Kafka's DelegatingByTopicSerialization sub-classes that process this property. They appear to support that value being either a Map or a String. The Map case opens up some possibilities for richer configuration. You could implement something today in your app using a DefaultKafkaConsumerFactoryCustomizer and DefaultKafkaConsumerFactory.updateConfigs(Map<String, Object>) to configure the spring.kafka.key.serialization.bytopic.config and spring.kafka.value.serialization.bytopic.config properties with Map values.

We could also look at providing dedicated configuration properties for this in Boot that we'd map to Map values for spring.kafka.key.serialization.bytopic.config and spring.kafka.value.serialization.bytopic.config. When set, this would then open up the possibility of also setting key.deserializer to a sensible default unless the user's also configured spring.kafka.consumer.key-deserializer and/or spring.kafka.consumer.value-deserializer. Alternatively, we could map this to the DefaultKafkaConsumerFactory constructor that takes the deserializers as constructor arguments.

To add some typesafety a Bean of ConsumerTopicToDeserializer or something similiar which autoconfiguration picks up to do it right and save us fools some time

That's an interesting idea. It certainly feels preferable to configuring class names in YAML or properties which is always error prone.

How about some configured ErrorHandlingDeserializer

I'm not sure about this one as I think it could get rather complicated. You'd need to somehow configure a delegate everywhere that the error handling deserializer may be used. With some of the other improvements discussed above, I suspect there may be less of a need for this as it would be easier to perform the error-handling decoration in your own code.

@wilkinsona wilkinsona added the status: waiting-for-internal-feedback An issue that needs input from a member or another Spring Team label Feb 9, 2024
@artembilan
Copy link
Member

OK. I see what is going on.
You try to push coding from Java back to YAML which is indeed error-prone.
I agree that simple one-stop-shop properties are OK to provide via YAML, but when we go to something more complicated or tied together, it is better to control it from the code where it is less possible to make a mistake.

That spring.kafka.value.serialization.bytopic.config property is read only from the DelegatingByTopicDeserializer, so no way to warn you that something is off.
Same for the ErrorHandlingDeserializer and its spring.deserializer.value.delegate.class.
Just because all those spring.kafka.consumer.properties are outside of configuration properties binding in Spring Boot.

What I would suggest is like a top-level end-user Deserializer bean to be injected into the KafkaAnnotationDrivenConfiguration where we would propagate it down to the DefaultKafkaConsumerFactory constructor:

	public DefaultKafkaConsumerFactory(Map<String, Object> configs,
			@Nullable Deserializer<K> keyDeserializer,
			@Nullable Deserializer<V> valueDeserializer) {

Although from here it is not clear how to distinguish between key & value deserializer beans...

This way you would not need that DefaultKafkaConsumerFactoryCustomizer, but rather would concentrate on the Deserializer configuration in the familiar Java style.

I also wonder if this YAML feature would make it a bit easier for you for time being:

properties:
        spring.kafka.value.serialization.bytopic.config:
          - ${kafka.consumer.topic_one}: com.example.custom.KafkaSomethiungDeserializer
          - ${kafka.consumer.topic_two}: org.apache.kafka.common.serialization.StringDeserializer

CC @sobychacko

@sobychacko
Copy link
Contributor

+1 to the idea from @artembilan where users provide the deserializers as beans, and then Spring Boot injects them into DefaultKafkaConsumerFactory, giving users control over how they want to configure the deserializers via code. We need to devise a way to distinguish between key and value deserializers. Maybe we can look into some naming conventions or something similar? I also like what @wilkinsona suggested for providing dedicated configuration options in Boot to make this integration with DefaultKafkaConsumerFactory transparent. This way, the users can drive this from properties rather than providing deserializer beans.

@philwebb philwebb added the for: team-meeting An issue we'd like to discuss as a team to make progress label Feb 12, 2024
@philwebb philwebb added type: enhancement A general enhancement and removed status: waiting-for-triage An issue we've not yet triaged for: team-meeting An issue we'd like to discuss as a team to make progress status: waiting-for-internal-feedback An issue that needs input from a member or another Spring Team labels Feb 22, 2024
@philwebb philwebb added this to the 3.x milestone Feb 22, 2024
@StephenFlavin
Copy link

StephenFlavin commented May 9, 2024

Hi,
I'm interested in this feature but I'm having trouble trying to configure it.

Can you show what the @KafkaListener looks like for this?

In my case I have many topics, some topics use AVRO values and some use JSON values, so I use the confluent KafkaAvroDeserializer for one and the spring JsonDeserializer for the other, however, since each topic maps to a different resulting class I don't think it works.

Edit:

I had a go at implementing this, I set the avro deserializer as the bytopic default and ended up having to use custom deserializers for each JSON topic that just extends the spring JsonDeserializer with a concrete type, I was glad to see that the Kafkalistener can just accept ConsumerRecords<Object, Object>

+1 to getting spring.kafka.value.serialization.bytopic.config configured as a map in yaml

@StephenFlavin
Copy link

StephenFlavin commented Oct 5, 2024

I raised another issue above after forgetting about this one apologies...

I've had a bit more time to think on this and I think it would be more useful for the config to live under spring.kafka.topics
I've been able to get some of this working I'll re-post the config structure here

spring.kafka.topics:
  create-topics: true                                                             # a way to disable automatic topic creation [boolean optional default=false]
  partitions: 2                                                                   # defines the number of partitions to create topics with [numeric optional default=1]
  replicas: 2                                                                     # defines the number of partitions to create topics with [numeric optional default=1]
  compact: true                                                                   # defines compact cleanup policy to create topics with [boolean optional default=false]
  topic-properties:                                                               # allows `org.apache.kafka.clients.producer.ProducerConfig` values to be used for topic creation  [map optional default={}]
    retention.ms: 720000
  by-topic-config:
    sensor-events:                                                                  # alias/user friendly name for topic
      name: ...-sensor-events                                                       # full topic name to consume from [string required]
      enabled: true                                                                 # provides a way to stop consuming from this topic in config [boolean optional default=true]
      key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer     # configures the by-topic-serializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer          # configures the by-topic-serializer
      key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer # configures the by-topic-deserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer      # configures the by-topic-deserializer
      partitions: 3                                                                 # overwrite for default above
      replicas: 3                                                                   # overwrite for default above
      compact: false                                                                # overwrite for default above
      topic-properties:                                                             # overwrite for default above
        retention.ms: 360000
    ...-events:
      name: ...
      ...
WIP Implementation:
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@AutoConfiguration
@AutoConfigureAfter(KafkaAutoConfiguration.class)
public class KafkaTopicAutoConfiguration {

  @Bean
  @ConfigurationProperties(prefix = "spring.kafka.topics")
  public TopicsConfig topicsConfig() {
      return new TopicsConfig();
  }

  @Bean
  public String[] topics(final TopicsConfig topicsConfig) {
      return topicsConfig.getByTopicConfig().values().stream()
              .filter(TopicConfig::enabled)
              .map(TopicConfig::name)
              .toArray(String[]::new);
  }

  @Bean
  public String[] realtimeTopics(final TopicsConfig topicsConfig) {
      return topicsConfig.getByTopicConfig().entrySet().stream()
              .filter(entry -> !entry.getValue().compact() && entry.getValue().enabled())
              .map(Map.Entry::getKey)
              .toArray(String[]::new);
  }

  @Bean
  public String[] compactTopics(final TopicsConfig topicsConfig) {
      return topicsConfig.getByTopicConfig().entrySet().stream()
              .filter(entry -> entry.getValue().compact() && entry.getValue().enabled())
              .map(Map.Entry::getKey)
              .toArray(String[]::new);
  }

  @Bean
  @ConditionalOnProperty("spring.kafka.topics.compact-topic-group-id")
  public Header compactTopicRecordHeader(final TopicsConfig topicsConfig) {
      return new RecordHeader("compactTopicGroupId", topicsConfig.getCompactTopicGroupId().getBytes(StandardCharsets.UTF_8));
  }

  @Bean
  @ConditionalOnProperty(value = "spring.kafka.topics.create-topics", havingValue = "true")
  public KafkaAdmin.NewTopics newTopics(final TopicsConfig topicsConfig) {
      final var newTopics = topicsConfig.getByTopicConfig().values().stream()
              .filter(TopicConfig::enabled)
              .map(topicConfig -> {
                  final var topicBuilder = TopicBuilder.name(topicConfig.name())
                          .partitions(Optional.ofNullable(topicConfig.partitions())
                                  .orElse(topicsConfig.getPartitions()))
                          .replicas(Optional.ofNullable(topicConfig.replicas())
                                  .orElse(topicsConfig.getReplicas()))
                          .configs(Optional.ofNullable(topicConfig.topicProperties())
                                  .map(topicProperties -> Stream.of(topicProperties, topicsConfig.getTopicProperties())
                                          .filter(Objects::nonNull)
                                          .map(Map::entrySet)
                                          .flatMap(Collection::stream)
                                          .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
                                                  Map.Entry::getValue,
                                                  (topicProperty, defaultTopicProperty) -> topicProperty)))
                                  .orElse(topicsConfig.getTopicProperties()));
                  if (topicConfig.compact()) {
                      topicBuilder.compact();
                  }
                  return topicBuilder.build();
              })
              .toArray(NewTopic[]::new);

      return new KafkaAdmin.NewTopics(newTopics);
  }

  @Bean
  public Map<String, KafkaTemplate<Object, Object>> byTopicKafkaTemplate(final TopicsConfig topicsConfig,
                                                                         final KafkaProperties kafkaProperties) {
      return topicsConfig.getByTopicConfig().entrySet().stream()
              .filter(entry -> entry.getValue().enabled())
              .filter(entry -> entry.getValue().keySerializer() != null || entry.getValue().valueSerializer() != null)
              .map(topicConfig -> {
                  var producerConfig = new HashMap<>(kafkaProperties.buildProducerProperties(null));

                  if (topicConfig.getValue().keySerializer() != null) {
                      producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, topicConfig.getValue().keySerializer());
                  }
                  if (topicConfig.getValue().valueDeserializer() != null) {
                      producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, topicConfig.getValue().valueDeserializer());
                  }

                  final var producerFactory = new DefaultKafkaProducerFactory<>(producerConfig);

                  final var kafkaTemplate = new KafkaTemplate<>(producerFactory);
                  kafkaTemplate.setDefaultTopic(topicConfig.getValue().name());

                  return Map.entry(topicConfig.getKey(), kafkaTemplate);
              })
              .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
                      Map.Entry::getValue));
  }

  @Bean
  @Order(Ordered.HIGHEST_PRECEDENCE)
  public DefaultKafkaConsumerFactoryCustomizer kafkaConsumerFactoryCustomizer(final TopicsConfig topicsConfig) {
      final var byTopicKeyDeserializerConfig = topicsConfig.getByTopicConfig().values().stream()
              .filter(TopicConfig::enabled)
              .filter(topicConfig -> topicConfig.keyDeserializer() != null)
              .map(topicConfig -> toByTopicPattern(topicConfig.name(), topicConfig.keyDeserializer()))
              .collect(Collectors.joining(","));

      final var byTopicValueDeserializerConfig = topicsConfig.getByTopicConfig().values().stream()
              .filter(TopicConfig::enabled)
              .filter(topicConfig -> topicConfig.valueDeserializer() != null)
              .map(topicConfig -> toByTopicPattern(topicConfig.name(), topicConfig.valueDeserializer()))
              .collect(Collectors.joining(","));

      return consumerFactory -> consumerFactory.updateConfigs(Map.of(
              "spring.kafka.key.serialization.bytopic.config", byTopicKeyDeserializerConfig,
              "spring.kafka.value.serialization.bytopic.config", byTopicValueDeserializerConfig,
              "spring.kafka.filtering.by.headers", "compactTopicGroupId:" + topicsConfig.getCompactTopicGroupId()));
  }

  private static String toByTopicPattern(String topic, String deserializer) {
      return topic + ":" + deserializer;
  }
}

Further to this I'm trying to figure out how do deal with async retries and DLT's, in my scenario as I imagine is often the case topic's are preferably centrally managed somewhere rather than generated by code so I'm hoping to only need 1 retry / dlt topic per app (still useful to autogenerate for local and integration testing), at the moment I'm exploring a way to use the generated DelegatingByTopicDeserializer and the todo DelegatingByTopicSerializer to produce to these topics as I would really like all of it to be auto configured so each app is production ready with Async retries and DLT's with the only effort being some simplified yaml config.

One of the difficulties with the retries and DLT is that since I'm using avro with schema registry the serializer needs to be called with the source topic when producing, hoping KafkaHeaders.DLT_ORIGINAL_TOPIC can be used for this.

I'll post my final solution when I get there but would appreciate some input/thoughts on this.

For me config driven isn't just removing boilerplate but I'd like to be able to split up a k8s deployment of an app that consumes from many topics where some topics are more critical and could do with having a separate auto-scaling group, additionally I'm hoping to be able use config maps to quickly pause a consumer by setting enabled to false in the by-topic-config.

@StephenFlavin
Copy link

Working implementation https://gist.github.com/StephenFlavin/f4c7dc7758a09e84fb7185ac2d44bcf8
still trying to understand how to do dlt with batch listeners and how that fits into auto configuration...

My aspiration here is that all that's required by a dev to set up after the configuration is this 1 class

    @RetryableTopic
    @KafkaListener
    public void listen(ConsumerRecord<?, ?> record) {
        System.out.println("Received: " + record);
    }

    @DltHandler
    public void dlt(ConsumerRecord<?, ?> record) {
        System.out.println("DLT Received: " + record);
    }

but maybe I'm taking it too far to be built into spring's auto config

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

7 participants