diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc index 6b0ae3905f..5ecb73dc47 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc @@ -197,7 +197,7 @@ When using a POJO batch listener (e.g. `List`), and you don't have the fu ---- @KafkaListener(id = "recovering", topics = "someTopic") public void listen(List things) { - for (int i = 0; i < records.size(); i++) { + for (int i = 0; i < things.size(); i++) { try { process(things.get(i)); } diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc index b5b49e5a80..3d5f32ebac 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc @@ -89,7 +89,7 @@ The `ConsumerRetryAuthEvent` event has the following properties: ** `AUTHENTICATION` - the event was published because of an authentication exception. ** `AUTHORIZATION` - the event was published because of an authorization exception. -The `ConsumerStartingEvent`, `ConsumerStartingEvent`, `ConsumerFailedToStartEvent`, `ConsumerStoppedEvent`, `ConsumerRetryAuthSuccessfulEvent` and `ContainerStoppedEvent` events have the following properties: +The `ConsumerStartingEvent`, `ConsumerStartedEvent`, `ConsumerFailedToStartEvent`, `ConsumerStoppedEvent`, `ConsumerRetryAuthSuccessfulEvent` and `ContainerStoppedEvent` events have the following properties: * `source`: The listener container instance that published the event. * `container`: The listener container or the parent listener container, if the source container is a child. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc index ca6c876973..f719d178b9 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc @@ -54,7 +54,7 @@ public class KafkaConfig { @Bean public Map consumerConfigs() { Map props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); ... return props; } diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc index 19a33384bc..e64ddbfd2a 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc @@ -166,7 +166,7 @@ future.whenComplete((result, ex) -> { `SendResult` has two properties, a `ProducerRecord` and `RecordMetadata`. See the Kafka API documentation for information about those objects. -The `Throwable` can be cast to a `KafkaProducerException`; its `failedProducerRecord` property contains the failed record. +The `Throwable` can be cast to a `KafkaProducerException`; its `producerRecord` property contains the failed record. If you wish to block the sending thread to await the result, you can invoke the future's `get()` method; using the method with a timeout is recommended. If you have set a `linger.ms`, you may wish to invoke `flush()` before waiting or, for convenience, the template has a constructor with an `autoFlush` parameter that causes the template to `flush()` on each send. @@ -216,7 +216,7 @@ public void sendToKafka(final MyOutputData data) { ---- ==== -Note that the cause of the `ExecutionException` is `KafkaProducerException` with the `failedProducerRecord` property. +Note that the cause of the `ExecutionException` is `KafkaProducerException` with the `producerRecord` property. [[routing-template]] == Using `RoutingKafkaTemplate` diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc index d770528bda..c2f6315ec6 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc @@ -463,7 +463,7 @@ The `SmartMessageConverter.toMessage()` method is called to create a new outboun Similarly, in the `KafkaMessageConverter.toMessage()` method, after the converter has created a new `Message` from the `ConsumerRecord`, the `SmartMessageConverter.fromMessage()` method is called and then the final inbound message is created with the newly converted payload. In either case, if the `SmartMessageConverter` returns `null`, the original message is used. -When the default converter is used in the `KafkaTemplate` and listener container factory, you configure the `SmartMessageConverter` by calling `setMessagingConverter()` on the template and via the `contentMessageConverter` property on `@KafkaListener` methods. +When the default converter is used in the `KafkaTemplate` and listener container factory, you configure the `SmartMessageConverter` by calling `setMessagingConverter()` on the template and via the `contentTypeConverter` property on `@KafkaListener` methods. Examples: @@ -664,7 +664,7 @@ Consider using a `DelegatingByTypeSerializer` configured to use a `ByteArraySeri Starting with version 3.1, you can add a `Validator` to the `ErrorHandlingDeserializer`. If the delegate `Deserializer` successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring. This allows the original raw data to be passed to the error handler. -WHen creating the deserializer yourself, simply call `setValidator`; if you configure the serializer using properties, set the consumer configuration property `ErrorHandlingDeserializer.VALIDATOR_CLASS` to the class or fully qualified class name for your `Validator`. +When creating the deserializer yourself, simply call `setValidator`; if you configure the serializer using properties, set the consumer configuration property `ErrorHandlingDeserializer.VALIDATOR_CLASS` to the class or fully qualified class name for your `Validator`. When using Spring Boot, this property name is `spring.kafka.consumer.properties.spring.deserializer.validator.class`. [[payload-conversion-with-batch]] diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc index 30a301ebfa..d7000c310c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/features.adoc @@ -33,7 +33,7 @@ public void processMessage(MyPojo message) { public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder .newInstance() - .fixedBackoff(3_000) + .fixedBackOff(3_000) .maxAttempts(4) .create(template); } @@ -47,7 +47,7 @@ You can also provide a custom implementation of Spring Retry's `SleepingBackOffP public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder .newInstance() - .customBackOff(new MyCustomBackOffPolicy()) + .customBackoff(new MyCustomBackOffPolicy()) .maxAttempts(5) .create(template); } @@ -81,7 +81,7 @@ public void processMessage(MyPojo message) { public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder .newInstance() - .fixedBackoff(2_000) + .fixedBackOff(2_000) .timeoutAfter(5_000) .create(template); } @@ -212,7 +212,7 @@ If your broker version is earlier than 2.4, you will need to set an explicit val [[retry-headers]] == Failure Header Management -When considering how to manage failure headers (original headers and exception headers), the framework delegates to the `DeadLetterPublishingRecover` to decide whether to append or replace the headers. +When considering how to manage failure headers (original headers and exception headers), the framework delegates to the `DeadLetterPublishingRecoverer` to decide whether to append or replace the headers. By default, it explicitly sets `appendOriginalHeaders` to `false` and leaves `stripPreviousExceptionHeaders` to the default used by the `DeadLetterPublishingRecover`. @@ -221,7 +221,7 @@ This is to avoid creation of excessively large messages (due to the stack trace See xref:kafka/annotation-error-handling.adoc#dlpr-headers[Managing Dead Letter Record Headers] for more information. -To reconfigure the framework to use different settings for these properties, configure a `DeadLetterPublishingRecoverer` customizer by overriding the `configureCustomizers` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport`. +To reconfigure the framework to use different settings for these properties, configure a `DeadLetterPublishingRecovererer` customizer by overriding the `configureCustomizers` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport`. See xref:retrytopic/retry-config.adoc#retry-topic-global-settings[Configuring Global Settings and Features] for more details. [source, java] diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/topic-naming.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/topic-naming.adoc index 493d598c07..1d97f7a26c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/topic-naming.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/topic-naming.adoc @@ -92,7 +92,7 @@ public void processMessage(MyPojo message) { public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder .newInstance() - .fixedBackoff(3_000) + .fixedBackOff(3_000) .maxAttempts(5) .useSingleTopicForFixedDelays() .create(template); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java index e6bf35aee7..783865f7b8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -408,7 +408,7 @@ public RequestReplyFuture sendAndReceive(ProducerRecord record) { @Override public RequestReplyFuture sendAndReceive(ProducerRecord record, @Nullable Duration replyTimeout) { - Assert.state(this.running, "Template has not been start()ed"); // NOSONAR (sync) + Assert.state(this.running, "Template has not been started"); // NOSONAR (sync) Duration timeout = replyTimeout; if (timeout == null) { timeout = this.defaultReplyTimeout; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/BackOffValuesGenerator.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/BackOffValuesGenerator.java index 9e451a49a7..6784857947 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/BackOffValuesGenerator.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/BackOffValuesGenerator.java @@ -53,7 +53,7 @@ public class BackOffValuesGenerator { public BackOffValuesGenerator(int providedMaxAttempts, BackOffPolicy providedBackOffPolicy) { this.numberOfValuesToCreate = getMaxAttempts(providedMaxAttempts) - 1; BackOffPolicy policy = providedBackOffPolicy != null ? providedBackOffPolicy : DEFAULT_BACKOFF_POLICY; - checkBackOffPolicyTipe(policy); + checkBackOffPolicyType(policy); this.backOffPolicy = policy; } @@ -69,7 +69,7 @@ public List generateValues() { : generateFromSleepingBackOffPolicy(this.numberOfValuesToCreate, this.backOffPolicy); } - private void checkBackOffPolicyTipe(BackOffPolicy providedBackOffPolicy) { + private void checkBackOffPolicyType(BackOffPolicy providedBackOffPolicy) { if (!(SleepingBackOffPolicy.class.isAssignableFrom(providedBackOffPolicy.getClass()) || NoBackOffPolicy.class.isAssignableFrom(providedBackOffPolicy.getClass()))) { throw new IllegalArgumentException("Either a SleepingBackOffPolicy or a NoBackOffPolicy must be provided. " + diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java index 433e7f20da..7e46d2befe 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java @@ -116,7 +116,7 @@ * public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, MyPojo> template) { * return RetryTopicConfigurationBuilder * .newInstance() - * .fixedBackoff(3000) + * .fixedBackOff(3000) * .maxAttempts(5) * .includeTopics("my-topic", "my-other-topic") * .create(template);