Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into GH-3001
Browse files Browse the repository at this point in the history
# Conflicts:
#	spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc
  • Loading branch information
notizklotz committed Mar 1, 2024
2 parents 3ed24ae + 2ad8b06 commit e0a3c34
Show file tree
Hide file tree
Showing 41 changed files with 778 additions and 517 deletions.
10 changes: 5 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ plugins {
id 'base'
id 'project-report'
id 'idea'
id 'org.ajoberstar.grgit' version '5.2.1'
id 'org.ajoberstar.grgit' version '5.2.2'
id 'io.spring.nohttp' version '0.0.11'
id 'io.spring.dependency-management' version '1.1.4' apply false
id 'com.github.spotbugs' version '6.0.7'
Expand Down Expand Up @@ -63,13 +63,13 @@ ext {
kotlinCoroutinesVersion = '1.7.3'
log4jVersion = '2.22.1'
micrometerDocsVersion = '1.0.2'
micrometerVersion = '1.13.0-M1'
micrometerTracingVersion = '1.3.0-M1'
micrometerVersion = '1.13.0-SNAPSHOT'
micrometerTracingVersion = '1.3.0-SNAPSHOT'
mockitoVersion = '5.8.0'
reactorVersion = '2023.0.3'
scalaVersion = '2.13'
springBootVersion = '3.2.2' // docs module
springDataVersion = '2024.0.0-M1'
springBootVersion = '3.2.3' // docs module
springDataVersion = '2024.0.0-SNAPSHOT'
springRetryVersion = '2.0.5'
springVersion = '6.1.4'
zookeeperVersion = '3.8.3'
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pluginManagement {
}

plugins {
id 'com.gradle.enterprise' version '3.15.1'
id 'com.gradle.enterprise' version '3.16.2'
id 'io.spring.ge.conventions' version '0.0.15'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ In either case, you should NOT perform any seeks on the consumer because the con

Starting with version 2.8, the legacy `ErrorHandler` and `BatchErrorHandler` interfaces have been superseded by a new `CommonErrorHandler`.
These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener.
`CommonErrorHandler` implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated.
The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release.
`CommonErrorHandler` implementations to replace most legacy framework error handler implementations are provided.

See xref:kafka/annotation-error-handling.adoc#migrating-legacy-eh[Migrating Custom Legacy Error Handler Implementations to `CommonErrorHandler`] for information to migrate custom error handlers to `CommonErrorHandler`.

Expand Down Expand Up @@ -351,6 +350,8 @@ This is to cause the transaction to roll back (if transactions are enabled).
The `CommonDelegatingErrorHandler` can delegate to different error handlers, depending on the exception type.
For example, you may wish to invoke a `DefaultErrorHandler` for most exceptions, or a `CommonContainerStoppingErrorHandler` for others.

All delegates must share the same compatible properties (`ackAfterHandle`, `seekAfterError` ...).

[[log-eh]]
== Logging Error Handler

Expand Down Expand Up @@ -425,7 +426,7 @@ To replace any `BatchErrorHandler` implementation, you should implement `handleB
You should also implement `handleOtherException()` - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).

[[after-rollback]]
== After-rollback Processor
== After Rollback Processor

When using transactions, if the listener throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back.
By default, any unprocessed records (including the failed record) are re-fetched on the next poll.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@
See the JavaDocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options.

|[[asyncAcks]]<<asyncAcks,`asyncAcks`>>
|false
|`false`
|Enable out-of-order commits (see xref:kafka/receiving-messages/ooo-commits.adoc[Manually Committing Offsets]); the consumer is paused and commits are deferred until gaps are filled.

|[[authExceptionRetryInterval]]<<authExceptionRetryInterval,`authExceptionRetryInterval`>>
|`null`
|When not null, a `Duration` to sleep between polls when an `AuthenticationException` or `AuthorizationException` is thrown by the Kafka client.
When null, such exceptions are considered fatal and the container will stop.

|[[batchRecoverAfterRollback]]<<batchRecoverAfterRollback,`batchRecoverAfterRollback`>>
|`false`
|Set to `true` to enable batch recovery, See xref:kafka/annotation-error-handling.adoc#after-rollback[After Rollback Processor].

|[[clientId]]<<clientId,`clientId`>>
|(empty string)
|A prefix for the `client.id` consumer property.
Expand All @@ -57,10 +61,6 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize
|`null`
|When present and `syncCommits` is `false` a callback invoked after the commit completes.

|[[offsetAndMetadataProvider]]<<offsetAndMetadataProvider,`offsetAndMetadataProvider`>>
|`null`
|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata.

|[[commitLogLevel]]<<commitLogLevel,`commitLogLevel`>>
|DEBUG
|The logging level for logs pertaining to committing offsets.
Expand All @@ -69,15 +69,15 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize
|`null`
|A rebalance listener; see xref:kafka/receiving-messages/rebalance-listeners.adoc[Rebalancing Listeners].

|[[consumerStartTimout]]<<consumerStartTimout,`consumerStartTimout`>>
|[[commitRetries]]<<commitRetries,`commitRetries`>>
|3
|Set the number of retries `RetriableCommitFailedException` when using `syncCommits` set to true.
Default 3 (4-attempt total).

|[[consumerStartTimeout]]<<consumerStartTimeout,`consumerStartTimeout`>>
|30s
|The time to wait for the consumer to start before logging an error; this might happen if, say, you use a task executor with insufficient threads.

|[[consumerTaskExecutor]]<<consumerTaskExecutor,`consumerTaskExecutor`>>
|`SimpleAsyncTaskExecutor`
|A task executor to run the consumer threads.
The default executor creates threads named `<name>-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-n` where n is incremented for each child container.

|[[deliveryAttemptHeader]]<<deliveryAttemptHeader,`deliveryAttemptHeader`>>
|`false`
|See xref:kafka/annotation-error-handling.adoc#delivery-header[Delivery Attempts Header].
Expand Down Expand Up @@ -123,9 +123,18 @@ Also see `idleBeforeDataMultiplier`.
|None
|Used to override any arbitrary consumer properties configured on the consumer factory.

|[[kafkaAwareTransactionManager]]<<kafkaAwareTransactionManager,`kafkaAwareTransactionManager`>>
|`null`
|See xref:kafka/transactions.adoc[Transactions].

|[[listenerTaskExecutor]]<<listenerTaskExecutor,`listenerTaskExecutor`>>
|`SimpleAsyncTaskExecutor`
|A task executor to run the consumer threads.
The default executor creates threads named `<name>-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-n` where n is incremented for each child container.

|[[logContainerConfig]]<<logContainerConfig,`logContainerConfig`>>
|`false`
|Set to true to log at INFO level all container properties.
|Set to `true` to log at INFO level all container properties.

|[[messageListener]]<<messageListener,`messageListener`>>
|`null`
Expand All @@ -145,7 +154,7 @@ Also see `idleBeforeDataMultiplier`.

|[[missingTopicsFatal]]<<missingTopicsFatal,`missingTopicsFatal`>>
|`false`
|When true prevents the container from starting if the confifgured topic(s) are not present on the broker.
|When true prevents the container from starting if the configured topic(s) are not present on the broker.

|[[monitorInterval]]<<monitorInterval,`monitorInterval`>>
|30s
Expand All @@ -157,9 +166,21 @@ See `noPollThreshold` and `pollTimeout`.
|Multiplied by `pollTimeOut` to determine whether to publish a `NonResponsiveConsumerEvent`.
See `monitorInterval`.

|[[observationConvention]]<<observationConvention,`observationConvention`>>
|`null`
|When set, add dynamic tags to the timers and traces, based on information in the consumer records.

|[[observationEnabled]]<<observationEnabled,`observationEnabled`>>
|`false`
|Set to `true` to enable observation via Micrometer.

|[[offsetAndMetadataProvider]]<<offsetAndMetadataProvider,`offsetAndMetadataProvider`>>
|`null`
|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata.

|[[onlyLogRecordMetadata]]<<onlyLogRecordMetadata,`onlyLogRecordMetadata`>>
|`false`
|Set to false to log the complete consumer record (in error, debug logs etc) instead of just `topic-partition@offset`.
|Set to `false` to log the complete consumer record (in error, debug logs etc.) instead of just `topic-partition@offset`.

|[[pauseImmediate]]<<pauseImmediate,`pauseImmediate`>>
|`false`
Expand Down Expand Up @@ -215,7 +236,7 @@ Mutually exclusive; at least one must be provided; enforced by `ContainerPropert

|[[transactionManager]]<<transactionManager,`transactionManager`>>
|`null`
|See xref:kafka/transactions.adoc[Transactions].
|Deprecated since 3.2, see <<kafkaAwareTransactionManager>>, xref:kafka/transactions.adoc#transaction-synchronization[Other transaction managers].
|===

[[alc-props]]
Expand Down Expand Up @@ -256,14 +277,6 @@ See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Han
|`ContainerProperties`
|The container properties instance.

|[[errorHandler]]<<errorHandler,`errorHandler`>>
|See desc.
|Deprecated - see `commonErrorHandler`.

|[[genericErrorHandler]]<<genericErrorHandler,`genericErrorHandler`>>
|See desc.
|Deprecated - see `commonErrorHandler`.

|[[groupId2]]<<groupId2,`groupId`>>
|See desc.
|The `containerProperties.groupId`, if present, otherwise the `group.id` property from the consumer factory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ NOTE: With the concurrent container, timers are created for each thread and the
[[monitoring-kafkatemplate-performance]]
== Monitoring KafkaTemplate Performance

Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s for send operations, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s+++ for send operations, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the template's `micrometerEnabled` property to `false`.

Two timers are maintained - one for successful calls to the listener and one for failures.
Expand Down Expand Up @@ -95,6 +95,8 @@ Using Micrometer for observation is now supported, since version 3.0, for the `K

Set `observationEnabled` to `true` on the `KafkaTemplate` and `ContainerProperties` to enable observation; this will disable xref:kafka/micrometer.adoc[Micrometer Timers] because the timers will now be managed with each observation.

IMPORTANT: Micrometer Observation does not support batch listener; this will enable Micrometer Timers

Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information.

To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively.
Expand All @@ -109,6 +111,6 @@ Starting with version 3.0.6, you can add dynamic tags to the timers and traces,
To do so, add a custom `KafkaListenerObservationConvention` and/or `KafkaTemplateObservationConvention` to the listener container properties or `KafkaTemplate` respectively.
The `record` property in both observation contexts contains the `ConsumerRecord` or `ProducerRecord` respectively.

The sender and receiver contexts' `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`.
The sender and receiver contexts `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`.
If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual `clusterId` on the `KafkaAdmin` and inject it into `KafkaTemplate` s and listener containers.
When it is `null` (default), the admin will invoke the `describeCluster` admin operation to retrieve it from the broker.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ Starting with version 2.1.5, you can call `isPauseRequested()` to see if `pause(
However, the consumers might not have actually paused yet.
`isConsumerPaused()` returns true if all `Consumer` instances have actually paused.

In addition (also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property.
In addition(also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property.

Starting with version 2.9, a new container property `pauseImmediate`, when set to true, causes the pause to take effect after the current record is processed.
By default, the pause takes effect when all of the records from the previous poll have been processed.
See <<pauseImmediate>>.
By default, the pause takes effect when all the records from the previous poll have been processed.
See xref:kafka/container-props.adoc#pauseImmediate[pauseImmediate].

The following simple Spring Boot application demonstrates by using the container registry to get a reference to a `@KafkaListener` method's container and pausing or resuming its consumers as well as receiving the corresponding events:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ Instead, use a `KafkaTransactionManager` in the container to start the Kafka tra

See xref:tips.adoc#ex-jdbc-sync[Examples of Kafka Transactions with Other Transaction Managers] for an example application that chains JDBC and Kafka transactions.

IMPORTANT: xref:retrytopic.adoc[Non-Blocking Retries] cannot combine with xref:kafka/transactions.adoc#container-transaction-manager[Container Transactions].
When the listener code throws an exception, container transaction commit succeeds, and the record is sent to the retryable topic.

[[kafkatemplate-local-transactions]]
== `KafkaTemplate` Local Transactions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTop

IMPORTANT: Non-blocking retries are not supported with xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners].

IMPORTANT: Non-Blocking Retries cannot combine with xref:kafka/transactions.adoc#container-transaction-manager[Container Transactions].
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested.
IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations.

When using a manual `AckMode` with `asyncAcks` set to true, the `DefaultErrorHandler` must be configured with `seekAfterError` set to `false`.
Starting with versions 2.9.10, 3.0.8, this will be set to true unconditionally for such configurations.
With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `true`.
Starting with versions 2.9.10, 3.0.8, this will be set to `false` unconditionally for such configurations.
With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `false`.

[source, java]
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,3 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>

IMPORTANT: Since 2.8.3 you can use the same factory for retryable and non-retryable topics.

If you need to revert the factory configuration behavior to prior 2.8.3, you can override the `configureRetryTopicConfigurer` method of a `@Configuration` class that extends `RetryTopicConfigurationSupport` as explained in xref:retrytopic/retry-config.adoc#retry-topic-global-settings[Configuring Global Settings and Features] and set `useLegacyFactoryConfigurer` to `true`, such as:

[source, java]
----
@Override
protected Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer() {
return rtc -> rtc.useLegacyFactoryConfigurer(true);
}
----

Original file line number Diff line number Diff line change
Expand Up @@ -111,35 +111,35 @@ This "final" retry topic will be suffixed with the provided or default suffix, a

NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics.

The default behavior is to work with the number of retry topics equal to the configured `maxAttempts` minus 1 and, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic (corresponding to the `maxInterval` delay) being suffixed with an additional index.
Starting 3.2, the default behavior is reuses the retry topic for the same intervals, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic reuses for the same intervals(corresponding to the `maxInterval` delay).

For instance, when configuring the exponential backoff with `initialInterval=1_000`, `multiplier=2`, and `maxInterval=16_000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be:

* -retry-1000
* -retry-2000
* -retry-4000
* -retry-8000
* -retry-16000-0
* -retry-16000-1
* -retry-16000-2
* ...
* -retry-16000-224
* -retry-16000

When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be:
When using the strategy that work with the number of retry topics equal to the configured `maxAttempts` minus 1, the last retry topic (corresponding to the `maxInterval` delay) being suffixed with an additional index would be:

* -retry-1000
* -retry-2000
* -retry-4000
* -retry-8000
* -retry-16000
* -retry-16000-0
* -retry-16000-1
* -retry-16000-2
* ...
* -retry-16000-224

This will be the default in a future release.
If multiple topics are required, then that can be done using the following configuration.

[source, java]
----
@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
Expand Down
Loading

0 comments on commit e0a3c34

Please sign in to comment.