diff --git a/build.gradle b/build.gradle index d1df1c8481..f573d40675 100644 --- a/build.gradle +++ b/build.gradle @@ -19,7 +19,7 @@ plugins { id 'base' id 'project-report' id 'idea' - id 'org.ajoberstar.grgit' version '5.2.1' + id 'org.ajoberstar.grgit' version '5.2.2' id 'io.spring.nohttp' version '0.0.11' id 'io.spring.dependency-management' version '1.1.4' apply false id 'com.github.spotbugs' version '6.0.7' @@ -63,13 +63,13 @@ ext { kotlinCoroutinesVersion = '1.7.3' log4jVersion = '2.22.1' micrometerDocsVersion = '1.0.2' - micrometerVersion = '1.13.0-M1' - micrometerTracingVersion = '1.3.0-M1' + micrometerVersion = '1.13.0-SNAPSHOT' + micrometerTracingVersion = '1.3.0-SNAPSHOT' mockitoVersion = '5.8.0' reactorVersion = '2023.0.3' scalaVersion = '2.13' - springBootVersion = '3.2.2' // docs module - springDataVersion = '2024.0.0-M1' + springBootVersion = '3.2.3' // docs module + springDataVersion = '2024.0.0-SNAPSHOT' springRetryVersion = '2.0.5' springVersion = '6.1.4' zookeeperVersion = '3.8.3' diff --git a/settings.gradle b/settings.gradle index 272037ac85..e8c819ec47 100644 --- a/settings.gradle +++ b/settings.gradle @@ -7,7 +7,7 @@ pluginManagement { } plugins { - id 'com.gradle.enterprise' version '3.15.1' + id 'com.gradle.enterprise' version '3.16.2' id 'io.spring.ge.conventions' version '0.0.15' } diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc index 52f9ec06cc..6b0ae3905f 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc @@ -65,8 +65,7 @@ In either case, you should NOT perform any seeks on the consumer because the con Starting with version 2.8, the legacy `ErrorHandler` and `BatchErrorHandler` interfaces have been superseded by a new `CommonErrorHandler`. These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener. -`CommonErrorHandler` implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated. -The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release. +`CommonErrorHandler` implementations to replace most legacy framework error handler implementations are provided. See xref:kafka/annotation-error-handling.adoc#migrating-legacy-eh[Migrating Custom Legacy Error Handler Implementations to `CommonErrorHandler`] for information to migrate custom error handlers to `CommonErrorHandler`. @@ -351,6 +350,8 @@ This is to cause the transaction to roll back (if transactions are enabled). The `CommonDelegatingErrorHandler` can delegate to different error handlers, depending on the exception type. For example, you may wish to invoke a `DefaultErrorHandler` for most exceptions, or a `CommonContainerStoppingErrorHandler` for others. +All delegates must share the same compatible properties (`ackAfterHandle`, `seekAfterError` ...). + [[log-eh]] == Logging Error Handler @@ -425,7 +426,7 @@ To replace any `BatchErrorHandler` implementation, you should implement `handleB You should also implement `handleOtherException()` - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors). [[after-rollback]] -== After-rollback Processor +== After Rollback Processor When using transactions, if the listener throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back. By default, any unprocessed records (including the failed record) are re-fetched on the next poll. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-props.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-props.adoc index b126974ebe..bb5fe5a577 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-props.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-props.adoc @@ -30,7 +30,7 @@ See the JavaDocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options. |[[asyncAcks]]<> -|false +|`false` |Enable out-of-order commits (see xref:kafka/receiving-messages/ooo-commits.adoc[Manually Committing Offsets]); the consumer is paused and commits are deferred until gaps are filled. |[[authExceptionRetryInterval]]<> @@ -38,6 +38,10 @@ See the JavaDocs for `ContainerProperties.AssignmentCommitOption` for more infor |When not null, a `Duration` to sleep between polls when an `AuthenticationException` or `AuthorizationException` is thrown by the Kafka client. When null, such exceptions are considered fatal and the container will stop. +|[[batchRecoverAfterRollback]]<> +|`false` +|Set to `true` to enable batch recovery, See xref:kafka/annotation-error-handling.adoc#after-rollback[After Rollback Processor]. + |[[clientId]]<> |(empty string) |A prefix for the `client.id` consumer property. @@ -57,10 +61,6 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize |`null` |When present and `syncCommits` is `false` a callback invoked after the commit completes. -|[[offsetAndMetadataProvider]]<> -|`null` -|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata. - |[[commitLogLevel]]<> |DEBUG |The logging level for logs pertaining to committing offsets. @@ -69,15 +69,15 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize |`null` |A rebalance listener; see xref:kafka/receiving-messages/rebalance-listeners.adoc[Rebalancing Listeners]. -|[[consumerStartTimout]]<> +|[[commitRetries]]<> +|3 +|Set the number of retries `RetriableCommitFailedException` when using `syncCommits` set to true. +Default 3 (4-attempt total). + +|[[consumerStartTimeout]]<> |30s |The time to wait for the consumer to start before logging an error; this might happen if, say, you use a task executor with insufficient threads. -|[[consumerTaskExecutor]]<> -|`SimpleAsyncTaskExecutor` -|A task executor to run the consumer threads. -The default executor creates threads named `-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-n` where n is incremented for each child container. - |[[deliveryAttemptHeader]]<> |`false` |See xref:kafka/annotation-error-handling.adoc#delivery-header[Delivery Attempts Header]. @@ -123,9 +123,18 @@ Also see `idleBeforeDataMultiplier`. |None |Used to override any arbitrary consumer properties configured on the consumer factory. +|[[kafkaAwareTransactionManager]]<> +|`null` +|See xref:kafka/transactions.adoc[Transactions]. + +|[[listenerTaskExecutor]]<> +|`SimpleAsyncTaskExecutor` +|A task executor to run the consumer threads. +The default executor creates threads named `-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-n` where n is incremented for each child container. + |[[logContainerConfig]]<> |`false` -|Set to true to log at INFO level all container properties. +|Set to `true` to log at INFO level all container properties. |[[messageListener]]<> |`null` @@ -145,7 +154,7 @@ Also see `idleBeforeDataMultiplier`. |[[missingTopicsFatal]]<> |`false` -|When true prevents the container from starting if the confifgured topic(s) are not present on the broker. +|When true prevents the container from starting if the configured topic(s) are not present on the broker. |[[monitorInterval]]<> |30s @@ -157,9 +166,21 @@ See `noPollThreshold` and `pollTimeout`. |Multiplied by `pollTimeOut` to determine whether to publish a `NonResponsiveConsumerEvent`. See `monitorInterval`. +|[[observationConvention]]<> +|`null` +|When set, add dynamic tags to the timers and traces, based on information in the consumer records. + +|[[observationEnabled]]<> +|`false` +|Set to `true` to enable observation via Micrometer. + +|[[offsetAndMetadataProvider]]<> +|`null` +|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata. + |[[onlyLogRecordMetadata]]<> |`false` -|Set to false to log the complete consumer record (in error, debug logs etc) instead of just `topic-partition@offset`. +|Set to `false` to log the complete consumer record (in error, debug logs etc.) instead of just `topic-partition@offset`. |[[pauseImmediate]]<> |`false` @@ -215,7 +236,7 @@ Mutually exclusive; at least one must be provided; enforced by `ContainerPropert |[[transactionManager]]<> |`null` -|See xref:kafka/transactions.adoc[Transactions]. +|Deprecated since 3.2, see <>, xref:kafka/transactions.adoc#transaction-synchronization[Other transaction managers]. |=== [[alc-props]] @@ -256,14 +277,6 @@ See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Han |`ContainerProperties` |The container properties instance. -|[[errorHandler]]<> -|See desc. -|Deprecated - see `commonErrorHandler`. - -|[[genericErrorHandler]]<> -|See desc. -|Deprecated - see `commonErrorHandler`. - |[[groupId2]]<> |See desc. |The `containerProperties.groupId`, if present, otherwise the `group.id` property from the consumer factory. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc index cdfd498422..7b2bb36502 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc @@ -24,7 +24,7 @@ NOTE: With the concurrent container, timers are created for each thread and the [[monitoring-kafkatemplate-performance]] == Monitoring KafkaTemplate Performance -Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s for send operations, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context. +Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s+++ for send operations, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context. The timers can be disabled by setting the template's `micrometerEnabled` property to `false`. Two timers are maintained - one for successful calls to the listener and one for failures. @@ -95,6 +95,8 @@ Using Micrometer for observation is now supported, since version 3.0, for the `K Set `observationEnabled` to `true` on the `KafkaTemplate` and `ContainerProperties` to enable observation; this will disable xref:kafka/micrometer.adoc[Micrometer Timers] because the timers will now be managed with each observation. +IMPORTANT: Micrometer Observation does not support batch listener; this will enable Micrometer Timers + Refer to https://micrometer.io/docs/tracing[Micrometer Tracing] for more information. To add tags to timers/traces, configure a custom `KafkaTemplateObservationConvention` or `KafkaListenerObservationConvention` to the template or listener container, respectively. @@ -109,6 +111,6 @@ Starting with version 3.0.6, you can add dynamic tags to the timers and traces, To do so, add a custom `KafkaListenerObservationConvention` and/or `KafkaTemplateObservationConvention` to the listener container properties or `KafkaTemplate` respectively. The `record` property in both observation contexts contains the `ConsumerRecord` or `ProducerRecord` respectively. -The sender and receiver contexts' `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`. +The sender and receiver contexts `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`. If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual `clusterId` on the `KafkaAdmin` and inject it into `KafkaTemplate` s and listener containers. When it is `null` (default), the admin will invoke the `describeCluster` admin operation to retrieve it from the broker. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume.adoc index c06436ba35..58f55aa34d 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume.adoc @@ -13,11 +13,11 @@ Starting with version 2.1.5, you can call `isPauseRequested()` to see if `pause( However, the consumers might not have actually paused yet. `isConsumerPaused()` returns true if all `Consumer` instances have actually paused. -In addition (also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property. +In addition(also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property. Starting with version 2.9, a new container property `pauseImmediate`, when set to true, causes the pause to take effect after the current record is processed. -By default, the pause takes effect when all of the records from the previous poll have been processed. -See <>. +By default, the pause takes effect when all the records from the previous poll have been processed. +See xref:kafka/container-props.adoc#pauseImmediate[pauseImmediate]. The following simple Spring Boot application demonstrates by using the container registry to get a reference to a `@KafkaListener` method's container and pausing or resuming its consumers as well as receiving the corresponding events: diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc index 9bf7b81a96..aae4b186c4 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc @@ -79,6 +79,9 @@ Instead, use a `KafkaTransactionManager` in the container to start the Kafka tra See xref:tips.adoc#ex-jdbc-sync[Examples of Kafka Transactions with Other Transaction Managers] for an example application that chains JDBC and Kafka transactions. +IMPORTANT: xref:retrytopic.adoc[Non-Blocking Retries] cannot combine with xref:kafka/transactions.adoc#container-transaction-manager[Container Transactions]. +When the listener code throws an exception, container transaction commit succeeds, and the record is sent to the retryable topic. + [[kafkatemplate-local-transactions]] == `KafkaTemplate` Local Transactions diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc index 08d02f3f2f..67dc567a9b 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc @@ -9,3 +9,4 @@ Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTop IMPORTANT: Non-blocking retries are not supported with xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners]. +IMPORTANT: Non-Blocking Retries cannot combine with xref:kafka/transactions.adoc#container-transaction-manager[Container Transactions]. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/how-the-pattern-works.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/how-the-pattern-works.adoc index 7db1f32f64..40abbd8659 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/how-the-pattern-works.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/how-the-pattern-works.adoc @@ -16,8 +16,8 @@ IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested. IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations. When using a manual `AckMode` with `asyncAcks` set to true, the `DefaultErrorHandler` must be configured with `seekAfterError` set to `false`. -Starting with versions 2.9.10, 3.0.8, this will be set to true unconditionally for such configurations. -With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `true`. +Starting with versions 2.9.10, 3.0.8, this will be set to `false` unconditionally for such configurations. +With earlier versions, it was necessary to override the `RetryConfigurationSupport.configureCustomizers()` method to set the property to `false`. [source, java] ---- diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-topic-lcf.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-topic-lcf.adoc index 75ef37e208..72e31bf304 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-topic-lcf.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-topic-lcf.adoc @@ -35,13 +35,3 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate IMPORTANT: Since 2.8.3 you can use the same factory for retryable and non-retryable topics. -If you need to revert the factory configuration behavior to prior 2.8.3, you can override the `configureRetryTopicConfigurer` method of a `@Configuration` class that extends `RetryTopicConfigurationSupport` as explained in xref:retrytopic/retry-config.adoc#retry-topic-global-settings[Configuring Global Settings and Features] and set `useLegacyFactoryConfigurer` to `true`, such as: - -[source, java] ----- -@Override -protected Consumer configureRetryTopicConfigurer() { - return rtc -> rtc.useLegacyFactoryConfigurer(true); -} ----- - diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/topic-naming.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/topic-naming.adoc index 2754d28d8d..493d598c07 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/topic-naming.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/topic-naming.adoc @@ -111,7 +111,7 @@ This "final" retry topic will be suffixed with the provided or default suffix, a NOTE: By opting to use a single topic for the retries with the `maxInterval` delay, it may become more viable to configure an exponential retry policy that keeps retrying for a long time, because in this approach you do not need a large amount of topics. -The default behavior is to work with the number of retry topics equal to the configured `maxAttempts` minus 1 and, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic (corresponding to the `maxInterval` delay) being suffixed with an additional index. +Starting 3.2, the default behavior is reuses the retry topic for the same intervals, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic reuses for the same intervals(corresponding to the `maxInterval` delay). For instance, when configuring the exponential backoff with `initialInterval=1_000`, `multiplier=2`, and `maxInterval=16_000`, in order to keep trying for one hour, one would need to configure `maxAttempts` as 229, and by default the needed retry topics would be: @@ -119,27 +119,27 @@ For instance, when configuring the exponential backoff with `initialInterval=1_0 * -retry-2000 * -retry-4000 * -retry-8000 -* -retry-16000-0 -* -retry-16000-1 -* -retry-16000-2 -* ... -* -retry-16000-224 +* -retry-16000 -When using the strategy that reuses the retry topic for the same intervals, in the same configuration above the needed retry topics would be: +When using the strategy that work with the number of retry topics equal to the configured `maxAttempts` minus 1, the last retry topic (corresponding to the `maxInterval` delay) being suffixed with an additional index would be: * -retry-1000 * -retry-2000 * -retry-4000 * -retry-8000 -* -retry-16000 +* -retry-16000-0 +* -retry-16000-1 +* -retry-16000-2 +* ... +* -retry-16000-224 -This will be the default in a future release. +If multiple topics are required, then that can be done using the following configuration. [source, java] ---- @RetryableTopic(attempts = 230, backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000), - sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC) + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS) @KafkaListener(topics = "my-annotated-topic") public void processMessage(MyPojo message) { // ... message processing diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc index fd68c121d8..4fbd3f84a2 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc @@ -363,8 +363,6 @@ public T retrieveQueryableStore(String storeName, QueryableStoreType stor When calling this method, the user can specifially ask for the proper state store type, as we have done in the above example. -NOTE: `KafkaStreamsInteractiveQueryService` API in Spring for Apache Kafka only supports providing access to local key-value stores at the moment. - === Retrying State Store Retrieval When trying to retrieve the state store using the `KafkaStreamsInteractiveQueryService`, there is a chance that the state store might not be found for various reasons. @@ -388,6 +386,49 @@ public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(S } ---- +=== Querying Remote State Stores + +The API shown above for retrieving the state store - `retrieveQueryableStore` is intended for locally available key-value state stores. +In productions settings, Kafka Streams applications are most likely distributed based on the number of partitions. +If a topic has four partitions and there are four instances of the same Kafka Streams processor running, then each instance maybe responsible for processing a single partition from the topic. +In this scenario, calling `retrieveQueryableStore` may not give the correct result that an instance is looking for, although it might return a valid store. +Let's assume that the topic with four partitions has data about various keys and a single partition is always responsible for a specific key. +If the instance that is calling `retrieveQueryableStore` is looking for information about a key that this instance does not host, then it will not receive any data. +This is because the current Kafka Streams instance does not know anything about this key. +To fix this, the calling instance first needs to make sure that they have the host information for the Kafka Streams processor instance where the particular key is hosted. +This can be retrieved from any Kafka Streams instance under the same `application.id` as below. + +[source, java] +---- +@Autowired +private KafkaStreamsInteractiveQueryService interactiveQueryService; + +HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer()); +---- + +In the example code above, the calling instance is querying for a particular key `12345` from the state-store named `app-store`. +The API also needs a corresponding key serializer, which in this case is the `IntegerSerializer`. +Kafka Streams looks through all it's instances under the same `application.id` and tries to find which instance hosts this particular key, +Once found, it returns that host information as a `HostInfo` object. + +This is how the API looks like: + +[source, java] +---- +public HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer serializer) +---- + +When using multiple instances of the Kafka Streams processors of the same `application.id` in a distributed way like this, the application is supposed to provide an RPC layer where the state stores can be queried over an RPC endpoint such as a REST one. +See this https://kafka.apache.org/36/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app[article] for more details on this. +When using Spring for Apache Kafka, it is very easy to add a Spring based REST endpoint by using the spring-web technologies. +Once there is a REST endpoint, then that can be used to query the state stores from any Kafka Streams instance, given the `HostInfo` where the key is hosted is known to the instance. + +If the key hosting the instance is the current instance, then the application does not need to call the RPC mechanism, but rather make an in-JVM call. +However, the trouble is that an application may not know that the instance that is making the call is where the key is hosted because a particular server may lose a partition due to a consumer rebalance. +To fix this issue, `KafkaStreamsInteractiveQueryService` provides a convenient API for querying the current host information via an API method `getCurrentKafkaStreamsApplicationHostInfo()` that returns the current `HostInfo`. +The idea is that the application can first acquire information about where the key is held, and then compare the `HostInfo` with the one about the current instance. +If the `HostInfo` data matches, then it can proceed with a simple JVM call via the `retrieveQueryableStore`, otherwise go with the RPC option. + [[kafka-streams-example]] == Kafka Streams Example diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index e2914c3d98..81bb77aa02 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -36,12 +36,23 @@ Rules for the redirection are set either via the `RetryableTopic.exceptionBasedD Custom DLTs are created automatically as well as other retry and dead-letter topics. See xref:retrytopic/features.adoc#exc-based-custom-dlt-routing[Routing of messages to custom DLTs based on thrown exceptions] for more information. +[[x32-cp-ptm]] +=== Deprecating ContainerProperties transactionManager property + +Deprecating the `transactionManager` property in `ContainerProperties` in favor of `KafkaAwareTransactionManager`, a narrower type compared to the general `PlatformTransactionManager`. See xref:kafka/container-props.adoc#kafkaAwareTransactionManager[ContainerProperties] and xref:kafka/transactions.adoc#transaction-synchronization[Transaction Synchronization]. + [[x32-after-rollback-processing]] === After Rollback Processing A new `AfterRollbackProcessor` API `processBatch` is provided. See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information. +[[x32-retry-topic]] +=== Change @RetryableTopic SameIntervalTopicReuseStrategy default value +Change `@RetryableTopic` property `SameIntervalTopicReuseStrategy` default value to `SINGLE_TOPIC`. +See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topic for maxInterval Exponential Delay]. + + [[x32-default-clientid-prefix]] === Spring Boot application name as default client ID prefix @@ -68,4 +79,4 @@ as a default prefix for auto-generated client IDs for these client types: |admin |adminclient-1 |myapp-admin-1 -|=== \ No newline at end of file +|=== diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index be371ece64..f66a61f6e5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -41,6 +41,8 @@ * @author Fabio da Silva Jr. * @author João Lima * @author Adrian Chlebosz + * @author Wang Zhiyang + * * @since 2.7 * * @see org.springframework.kafka.retrytopic.RetryTopicConfigurer @@ -191,10 +193,12 @@ /** * Topic reuse strategy for sequential attempts made with a same backoff interval. + * Starting 3.2, change default behavior to {@code SameIntervalTopicReuseStrategy.SINGLE_TOPIC}. + * * @return the strategy. * @since 3.0.4 */ - SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS; + SameIntervalTopicReuseStrategy sameIntervalTopicReuseStrategy() default SameIntervalTopicReuseStrategy.SINGLE_TOPIC; /** * Whether or not create a DLT, and redeliver to the DLT if delivery fails or just give up. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 9fe9e75574..698d3ac895 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -839,17 +839,17 @@ private Callback buildCallback(final ProducerRecord producerRecord, final } } catch (Exception e) { - KafkaTemplate.this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback"); + this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback"); } try { if (exception == null) { successTimer(sample, producerRecord); observation.stop(); future.complete(new SendResult<>(producerRecord, metadata)); - if (KafkaTemplate.this.producerListener != null) { - KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata); + if (this.producerListener != null) { + this.producerListener.onSuccess(producerRecord, metadata); } - KafkaTemplate.this.logger.trace(() -> "Sent ok: " + KafkaUtils.format(producerRecord) + this.logger.trace(() -> "Sent ok: " + KafkaUtils.format(producerRecord) + ", metadata: " + metadata); } else { @@ -858,17 +858,14 @@ private Callback buildCallback(final ProducerRecord producerRecord, final observation.stop(); future.completeExceptionally( new KafkaProducerException(producerRecord, "Failed to send", exception)); - if (KafkaTemplate.this.producerListener != null) { - KafkaTemplate.this.producerListener.onError(producerRecord, metadata, exception); + if (this.producerListener != null) { + this.producerListener.onError(producerRecord, metadata, exception); } - KafkaTemplate.this.logger.debug(exception, () -> "Failed to send: " - + KafkaUtils.format(producerRecord)); + this.logger.debug(exception, () -> "Failed to send: " + KafkaUtils.format(producerRecord)); } } finally { - if (!KafkaTemplate.this.transactional) { - closeProducer(producer, false); - } + closeProducer(producer, this.transactional); } }; } @@ -985,7 +982,6 @@ public void destroy() { } } - @SuppressWarnings("serial") private static final class SkipAbortException extends RuntimeException { SkipAbortException(Throwable cause) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java index d510701f35..af937e78cc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ * * @author Gary Russell * @author Adrian Chlebosz + * @author Antonin Arquey * @since 2.8 * */ @@ -65,6 +66,7 @@ public CommonDelegatingErrorHandler(CommonErrorHandler defaultErrorHandler) { * Set the delegate error handlers; a {@link LinkedHashMap} argument is recommended so * that the delegates are searched in a known order. * @param delegates the delegates. + * @throws IllegalArgumentException if any of the delegates is not compatible with the default error handler. */ public void setErrorHandlers(Map, CommonErrorHandler> delegates) { Assert.notNull(delegates, "'delegates' cannot be null"); @@ -109,6 +111,7 @@ public void setAckAfterHandle(boolean ack) { * Add a delegate to the end of the current collection. * @param throwable the throwable for this handler. * @param handler the handler. + * @throws IllegalArgumentException if the handler is not compatible with the default error handler. */ public void addDelegate(Class throwable, CommonErrorHandler handler) { Map, CommonErrorHandler> delegatesToCheck = new LinkedHashMap<>(this.delegates); @@ -118,13 +121,12 @@ public void addDelegate(Class throwable, CommonErrorHandler this.delegates.putAll(delegatesToCheck); } - @SuppressWarnings("deprecation") private void checkDelegatesAndUpdateClassifier(Map, CommonErrorHandler> delegatesToCheck) { boolean ackAfterHandle = this.defaultErrorHandler.isAckAfterHandle(); boolean seeksAfterHandling = this.defaultErrorHandler.seeksAfterHandling(); - this.delegates.values().forEach(handler -> { + delegatesToCheck.values().forEach(handler -> { Assert.isTrue(ackAfterHandle == handler.isAckAfterHandle(), "All delegates must return the same value when calling 'isAckAfterHandle()'"); Assert.isTrue(seeksAfterHandling == handler.seeksAfterHandling(), diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 45d9dd85a6..b8883ad935 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -121,12 +120,12 @@ public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) { public List> getContainers() { this.lifecycleLock.lock(); try { - return Collections.unmodifiableList(new ArrayList<>(this.containers)); + return List.copyOf(this.containers); } finally { this.lifecycleLock.unlock(); } -} + } @Override public MessageListenerContainer getContainerFor(String topic, int partition) { @@ -157,7 +156,7 @@ public Collection getAssignedPartitions() { .map(KafkaMessageListenerContainer::getAssignedPartitions) .filter(Objects::nonNull) .flatMap(Collection::stream) - .collect(Collectors.toList()); + .toList(); } finally { this.lifecycleLock.unlock(); @@ -259,7 +258,6 @@ protected void doStart() { } } - @SuppressWarnings("deprecation") private void configureChildContainer(int index, KafkaMessageListenerContainer container) { String beanName = getBeanName(); beanName = (beanName == null ? "consumer" : beanName) + "-" + index; @@ -308,13 +306,17 @@ private KafkaMessageListenerContainer constructContainer(ContainerProperti return container; } + @Nullable private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProperties, int index) { TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions(); + if (topicPartitions == null) { + return null; + } if (this.concurrency == 1) { - return topicPartitions; // NOSONAR + return topicPartitions; } else { - int numPartitions = topicPartitions.length; // NOSONAR + int numPartitions = topicPartitions.length; if (numPartitions == this.concurrency) { return new TopicPartitionOffset[] { topicPartitions[index] }; } @@ -389,7 +391,7 @@ && getContainerProperties().isRestartAfterAuthExceptions() if (exec == null) { exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart"); } - exec.execute(() -> start()); + exec.execute(this::start); } } @@ -477,10 +479,15 @@ public boolean isPartitionPaused(TopicPartition topicPartition) { public boolean isInExpectedState() { this.lifecycleLock.lock(); try { - return (isRunning() || isStoppedNormally()) && this.containers - .stream() - .map(container -> container.isInExpectedState()) - .allMatch(bool -> Boolean.TRUE.equals(bool)); + boolean isInExpectedState = isRunning() || isStoppedNormally(); + if (isInExpectedState) { + for (KafkaMessageListenerContainer container : this.containers) { + if (!container.isInExpectedState()) { + return false; + } + } + } + return isInExpectedState; } finally { this.lifecycleLock.unlock(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 616db9bea3..f627d2be43 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention; +import org.springframework.kafka.transaction.KafkaAwareTransactionManager; import org.springframework.lang.Nullable; import org.springframework.scheduling.TaskScheduler; import org.springframework.transaction.PlatformTransactionManager; @@ -257,8 +258,11 @@ public enum EOSMode { private double idleBeforeDataMultiplier = DEFAULT_IDLE_BEFORE_DATA_MULTIPLIER; + @Deprecated(since = "3.2") private PlatformTransactionManager transactionManager; + private KafkaAwareTransactionManager kafkaAwareTransactionManager; + private boolean batchRecoverAfterRollback = false; private int monitorInterval = DEFAULT_MONITOR_INTERVAL; @@ -371,7 +375,7 @@ public void setMessageListener(Object messageListener) { * calling thread and sometimes not. * * @param ackMode the {@link AckMode}; default BATCH. - * @see #setTransactionManager(PlatformTransactionManager) + * @see #setKafkaAwareTransactionManager(KafkaAwareTransactionManager) */ public void setAckMode(AckMode ackMode) { Assert.notNull(ackMode, "'ackMode' cannot be null"); @@ -525,6 +529,7 @@ public Long getIdlePartitionEventInterval() { return this.idlePartitionEventInterval; } + @Deprecated(since = "3.2", forRemoval = true) @Nullable public PlatformTransactionManager getTransactionManager() { return this.transactionManager; @@ -542,10 +547,25 @@ public PlatformTransactionManager getTransactionManager() { * @since 1.3 * @see #setAckMode(AckMode) */ + @Deprecated(since = "3.2", forRemoval = true) public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) { this.transactionManager = transactionManager; } + @Nullable + public KafkaAwareTransactionManager getKafkaAwareTransactionManager() { + return this.kafkaAwareTransactionManager; + } + + /** + * Set the transaction manager to start a transaction; replace {@link #setTransactionManager}. + * @param kafkaAwareTransactionManager the transaction manager. + * @since 3.2 + */ + public void setKafkaAwareTransactionManager(@Nullable KafkaAwareTransactionManager kafkaAwareTransactionManager) { + this.kafkaAwareTransactionManager = kafkaAwareTransactionManager; + } + /** * Recover batch records after rollback if true. * @return true to recover. @@ -857,8 +877,8 @@ public TransactionDefinition getTransactionDefinition() { /** * Set a transaction definition with properties (e.g. timeout) that will be copied to * the container's transaction template. Note that this is only generally useful when - * used with a {@link #setTransactionManager(PlatformTransactionManager) - * PlatformTransactionManager} that supports a custom definition; this does NOT + * used with a {@link #setKafkaAwareTransactionManager(KafkaAwareTransactionManager) + * KafkaAwareTransactionManager} that supports a custom definition; this does NOT * include the {@link org.springframework.kafka.transaction.KafkaTransactionManager} * which has no concept of transaction timeout. It can be useful to start, for example * a database transaction, in the container, rather than using {@code @Transactional} @@ -866,7 +886,7 @@ public TransactionDefinition getTransactionDefinition() { * can participate in the transaction. * @param transactionDefinition the definition. * @since 2.5.4 - * @see #setTransactionManager(PlatformTransactionManager) + * @see #setKafkaAwareTransactionManager(KafkaAwareTransactionManager) */ public void setTransactionDefinition(@Nullable TransactionDefinition transactionDefinition) { this.transactionDefinition = transactionDefinition; @@ -1059,6 +1079,7 @@ public String toString() { + "\n ackMode=" + this.ackMode + "\n ackCount=" + this.ackCount + "\n ackTime=" + this.ackTime + + "\n consumerStartTimeout=" + this.consumerStartTimeout + "\n messageListener=" + this.messageListener + (this.listenerTaskExecutor != null ? "\n listenerTaskExecutor=" + this.listenerTaskExecutor @@ -1071,19 +1092,27 @@ public String toString() { + (this.transactionManager != null ? "\n transactionManager=" + this.transactionManager : "") + + (this.kafkaAwareTransactionManager != null + ? "\n kafkaAwareTransactionManager=" + this.kafkaAwareTransactionManager + : "") + "\n monitorInterval=" + this.monitorInterval + (this.scheduler != null ? "\n scheduler=" + this.scheduler : "") + "\n noPollThreshold=" + this.noPollThreshold + + "\n pauseImmediate=" + this.pauseImmediate + "\n pollTimeoutWhilePaused=" + this.pollTimeoutWhilePaused + "\n subBatchPerPartition=" + this.subBatchPerPartition + "\n assignmentCommitOption=" + this.assignmentCommitOption + "\n deliveryAttemptHeader=" + this.deliveryAttemptHeader + + "\n batchRecoverAfterRollback=" + this.batchRecoverAfterRollback + "\n eosMode=" + this.eosMode + "\n transactionDefinition=" + this.transactionDefinition + "\n stopContainerWhenFenced=" + this.stopContainerWhenFenced + "\n stopImmediate=" + this.stopImmediate + "\n asyncAcks=" + this.asyncAcks + + "\n logContainerConfig=" + this.logContainerConfig + + "\n missingTopicsFatal=" + this.missingTopicsFatal + "\n idleBeforeDataMultiplier=" + this.idleBeforeDataMultiplier + + "\n idleBetweenPolls=" + this.idleBetweenPolls + "\n micrometerEnabled=" + this.micrometerEnabled + "\n observationEnabled=" + this.observationEnabled + (this.observationConvention != null diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java index 3ee7da9f90..622ec87459 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -229,49 +229,53 @@ private ConsumerRecords seekOrRecover(Exception thrownException, @N remaining.add(datum); } } - if (offsets.size() > 0) { - commit(consumer, container, offsets); - } - if (isSeekAfterError()) { - if (remaining.size() > 0) { - SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false, - getFailureTracker(), this.logger, getLogLevel()); - ConsumerRecord recovered = remaining.get(0); - commit(consumer, container, - Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), - ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1))); - if (remaining.size() > 1) { - throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); - } + try { + if (offsets.size() > 0) { + commit(consumer, container, offsets); } - return ConsumerRecords.empty(); } - else { - if (remaining.size() > 0) { - try { - if (getFailureTracker().recovered(remaining.get(0), thrownException, container, - consumer)) { - remaining.remove(0); + finally { + if (isSeekAfterError()) { + if (remaining.size() > 0) { + SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false, + getFailureTracker(), this.logger, getLogLevel()); + ConsumerRecord recovered = remaining.get(0); + commit(consumer, container, + Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), + ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1))); + if (remaining.size() > 1) { + throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); } } - catch (Exception e) { - if (SeekUtils.isBackoffException(thrownException)) { - this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0)) - + " included in remaining due to retry back off " + thrownException); + return ConsumerRecords.empty(); + } + else { + if (remaining.size() > 0) { + try { + if (getFailureTracker().recovered(remaining.get(0), thrownException, container, + consumer)) { + remaining.remove(0); + } } - else { - this.logger.error(e, KafkaUtils.format(remaining.get(0)) - + " included in remaining due to " + thrownException); + catch (Exception e) { + if (SeekUtils.isBackoffException(thrownException)) { + this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0)) + + " included in remaining due to retry back off " + thrownException); + } + else { + this.logger.error(e, KafkaUtils.format(remaining.get(0)) + + " included in remaining due to " + thrownException); + } } } + if (remaining.isEmpty()) { + return ConsumerRecords.empty(); + } + Map>> remains = new HashMap<>(); + remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), + tp -> new ArrayList<>()).add((ConsumerRecord) rec)); + return new ConsumerRecords<>(remains); } - if (remaining.isEmpty()) { - return ConsumerRecords.empty(); - } - Map>> remains = new HashMap<>(); - remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), - tp -> new ArrayList<>()).add((ConsumerRecord) rec)); - return new ConsumerRecords<>(remains); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 0b57ac238f..d02faa72db 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -305,8 +305,7 @@ public boolean isContainerPaused() { @Override public boolean isPartitionPaused(TopicPartition topicPartition) { - return this.listenerConsumer != null && this.listenerConsumer - .isPartitionPaused(topicPartition); + return this.listenerConsumer != null && this.listenerConsumer.isPartitionPaused(topicPartition); } @Override @@ -317,33 +316,28 @@ public boolean isInExpectedState() { @Override public void enforceRebalance() { this.thisOrParentContainer.enforceRebalanceRequested.set(true); - KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; - if (consumer != null) { - consumer.wakeIfNecessary(); - } + consumerWakeIfNecessary(); } @Override public void pause() { super.pause(); - KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; - if (consumer != null) { - consumer.wakeIfNecessary(); - } + consumerWakeIfNecessary(); } @Override public void resume() { super.resume(); - KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; - if (consumer != null) { - consumer.wakeIfNecessary(); - } + consumerWakeIfNecessary(); } @Override public void resumePartition(TopicPartition topicPartition) { super.resumePartition(topicPartition); + consumerWakeIfNecessary(); + } + + private void consumerWakeIfNecessary() { KafkaMessageListenerContainer.ListenerConsumer consumer = this.listenerConsumer; if (consumer != null) { consumer.wakeIfNecessary(); @@ -422,15 +416,11 @@ private void checkAckMode(ContainerProperties containerProperties) { } private ListenerType determineListenerType(GenericMessageListener listener) { - ListenerType listenerType = ListenerUtils.determineListenerType(listener); - if (listener instanceof DelegatingMessageListener) { - Object delegating = listener; - while (delegating instanceof DelegatingMessageListener dml) { - delegating = dml.getDelegate(); - } - listenerType = ListenerUtils.determineListenerType(delegating); + Object delegating = listener; + while (delegating instanceof DelegatingMessageListener dml) { + delegating = dml.getDelegate(); } - return listenerType; + return ListenerUtils.determineListenerType(delegating); } @Override @@ -696,12 +686,16 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final CommonErrorHandler commonErrorHandler; - private final PlatformTransactionManager transactionManager = this.containerProperties.getTransactionManager(); + @Deprecated(since = "3.2", forRemoval = true) + @SuppressWarnings("removal") + private final PlatformTransactionManager transactionManager = + this.containerProperties.getKafkaAwareTransactionManager() != null ? + this.containerProperties.getKafkaAwareTransactionManager() : + this.containerProperties.getTransactionManager(); - @SuppressWarnings(RAWTYPES) - private final KafkaAwareTransactionManager kafkaTxManager = - this.transactionManager instanceof KafkaAwareTransactionManager - ? ((KafkaAwareTransactionManager) this.transactionManager) : null; + private final KafkaAwareTransactionManager kafkaTxManager = + this.transactionManager instanceof KafkaAwareTransactionManager kafkaAwareTransactionManager ? + kafkaAwareTransactionManager : null; private final TransactionTemplate transactionTemplate; @@ -758,6 +752,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final MicrometerHolder micrometerHolder; + private final boolean observationEnabled; + private final AtomicBoolean polling = new AtomicBoolean(); private final boolean subBatchPerPartition; @@ -912,6 +908,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume this.isBatchListener = true; this.wantsFullRecords = this.batchListener.wantsPollResult(); this.pollThreadStateProcessor = setUpPollProcessor(true); + this.observationEnabled = false; } else if (listener instanceof MessageListener) { this.listener = (MessageListener) listener; @@ -919,6 +916,7 @@ else if (listener instanceof MessageListener) { this.isBatchListener = false; this.wantsFullRecords = false; this.pollThreadStateProcessor = setUpPollProcessor(false); + this.observationEnabled = this.containerProperties.isObservationEnabled(); } else { throw new IllegalArgumentException("Listener must be one of 'MessageListener', " @@ -999,7 +997,7 @@ private Object determineBootstrapServers(Properties consumerProperties) { @Nullable private KafkaAdmin obtainAdmin() { KafkaAdmin customAdmin = KafkaMessageListenerContainer.this.thisOrParentContainer.getKafkaAdmin(); - if (customAdmin == null && this.containerProperties.isObservationEnabled()) { + if (customAdmin == null && this.observationEnabled) { ApplicationContext applicationContext = getApplicationContext(); if (applicationContext != null) { KafkaAdmin admin = applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique(); @@ -1279,7 +1277,7 @@ private MicrometerHolder obtainMicrometerHolder() { MicrometerHolder holder = null; try { if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled() - && !this.containerProperties.isObservationEnabled()) { + && !this.observationEnabled) { Function> mergedProvider = cr -> this.containerProperties.getMicrometerTags(); @@ -1578,7 +1576,7 @@ private void fixTxOffsetsIfNeeded() { this.lastCommits.forEach((tp, oamd) -> { long position = this.consumer.position(tp); Long saved = this.savedPositions.get(tp); - if (saved != null && saved.longValue() != position) { + if (saved != null && saved != position) { this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; " + "saved: " + this.savedPositions + ", " + "committed: " + oamd + ", " @@ -1601,9 +1599,7 @@ private void fixTxOffsetsIfNeeded() { } else { this.transactionTemplate.executeWithoutResult(status -> { - doSendOffsets(((KafkaResourceHolder) TransactionSynchronizationManager - .getResource(this.kafkaTxManager.getProducerFactory())) - .getProducer(), toFix); + doSendOffsets(getTxProducer(), toFix); }); } } @@ -2187,9 +2183,7 @@ private void invokeBatchListenerInTx(final ConsumerRecords records, @Override public void doInTransactionWithoutResult(TransactionStatus s) { if (ListenerConsumer.this.kafkaTxManager != null) { - ListenerConsumer.this.producer = ((KafkaResourceHolder) TransactionSynchronizationManager - .getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())) - .getProducer(); // NOSONAR nullable + ListenerConsumer.this.producer = getTxProducer(); } RuntimeException aborted = doInvokeBatchListener(records, recordList); if (aborted != null) { @@ -2508,7 +2502,6 @@ private void invokeRecordListener(final ConsumerRecords records) { * Invoke the listener with each record in a separate transaction. * @param records the records. */ - @SuppressWarnings(RAWTYPES) // NOSONAR complexity private void invokeRecordListenerInTx(final ConsumerRecords records) { Iterator> iterator = records.iterator(); while (iterator.hasNext()) { @@ -2553,9 +2546,7 @@ private void invokeInTransaction(Iterator> iterator, final @Override public void doInTransactionWithoutResult(TransactionStatus s) { if (ListenerConsumer.this.kafkaTxManager != null) { - ListenerConsumer.this.producer = ((KafkaResourceHolder) TransactionSynchronizationManager - .getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())) - .getProducer(); // NOSONAR + ListenerConsumer.this.producer = getTxProducer(); } RuntimeException aborted = doInvokeRecordListener(cRecord, iterator); if (aborted != null) { @@ -2658,6 +2649,7 @@ private ConsumerRecords checkEarlyIntercept(ConsumerRecords nextArg) catch (InterruptedException e) { Thread.currentThread().interrupt(); } + this.earlyBatchInterceptor.success(nextArg, this.consumer); } } return next; @@ -2673,6 +2665,8 @@ private ConsumerRecord checkEarlyIntercept(ConsumerRecord recordArg) this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + KafkaUtils.format(recordArg)); ackCurrent(recordArg); + this.earlyRecordInterceptor.success(recordArg, this.consumer); + this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer); } } return cRecord; @@ -2744,6 +2738,13 @@ private void pauseForNackSleep() { this.nackSleepDurationMillis = -1; } + @SuppressWarnings(RAWTYPES) + private Producer getTxProducer() { + return ((KafkaResourceHolder) TransactionSynchronizationManager + .getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())) + .getProducer(); // NOSONAR + } + /** * Actually invoke the listener. * @param cRecord the record. @@ -2761,37 +2762,37 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco DefaultKafkaListenerObservationConvention.INSTANCE, () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId), this.observationRegistry); - return observation.observe(() -> { - try { + try { + observation.observe(() -> { invokeOnMessage(cRecord); successTimer(sample, cRecord); recordInterceptAfter(cRecord, null); + }); + } + catch (RuntimeException e) { + failureTimer(sample, cRecord); + recordInterceptAfter(cRecord, e); + if (this.commonErrorHandler == null) { + throw e; } - catch (RuntimeException e) { - failureTimer(sample, cRecord); - recordInterceptAfter(cRecord, e); - if (this.commonErrorHandler == null) { - throw e; - } - try { - invokeErrorHandler(cRecord, iterator, e); - commitOffsetsIfNeededAfterHandlingError(cRecord); - } - catch (KafkaException ke) { - ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); - return ke; - } - catch (RuntimeException ee) { - this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); - return ee; - } - catch (Error er) { // NOSONAR - this.logger.error(er, "Error handler threw an error"); - throw er; - } + try { + invokeErrorHandler(cRecord, iterator, e); + commitOffsetsIfNeededAfterHandlingError(cRecord); } - return null; - }); + catch (KafkaException ke) { + ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); + return ke; + } + catch (RuntimeException ee) { + this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); + return ee; + } + catch (Error er) { // NOSONAR + this.logger.error(er, "Error handler threw an error"); + throw er; + } + } + return null; } private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord cRecord) { @@ -3027,7 +3028,6 @@ private void sendOffsetsToTransaction() { doSendOffsets(this.producer, commits); } - @SuppressWarnings("deprecation") private void doSendOffsets(Producer prod, Map commits) { prod.sendOffsetsToTransaction(commits, this.consumer.groupMetadata()); if (this.fixTxOffsets) { @@ -3087,25 +3087,20 @@ private void processSeeks() { traceSeek(offset); try { SeekPosition position = offset.getPosition(); + TopicPartition topicPartition = offset.getTopicPartition(); Long whereTo = offset.getOffset(); if (position == null) { if (offset.isRelativeToCurrent()) { - whereTo += this.consumer.position(offset.getTopicPartition()); + whereTo += this.consumer.position(topicPartition); whereTo = Math.max(whereTo, 0); } - this.consumer.seek(offset.getTopicPartition(), whereTo); - } - else if (position.equals(SeekPosition.BEGINNING)) { - this.consumer.seekToBeginning(Collections.singletonList(offset.getTopicPartition())); - if (whereTo != null) { - this.consumer.seek(offset.getTopicPartition(), whereTo); - } + this.consumer.seek(topicPartition, whereTo); } - else if (position.equals(SeekPosition.TIMESTAMP)) { + else if (SeekPosition.TIMESTAMP.equals(position)) { // possible late addition since the grouped processing above Map offsetsForTimes = this.consumer .offsetsForTimes( - Collections.singletonMap(offset.getTopicPartition(), offset.getOffset())); + Collections.singletonMap(topicPartition, offset.getOffset())); offsetsForTimes.forEach((tp, ot) -> { if (ot != null) { this.consumer.seek(tp, ot.offset()); @@ -3113,10 +3108,15 @@ else if (position.equals(SeekPosition.TIMESTAMP)) { }); } else { - this.consumer.seekToEnd(Collections.singletonList(offset.getTopicPartition())); + if (SeekPosition.BEGINNING.equals(position)) { + this.consumer.seekToBeginning(Collections.singletonList(topicPartition)); + } + else { + this.consumer.seekToEnd(Collections.singletonList(topicPartition)); + } if (whereTo != null) { - whereTo += this.consumer.position(offset.getTopicPartition()); - this.consumer.seek(offset.getTopicPartition(), whereTo); + whereTo += this.consumer.position(topicPartition); + this.consumer.seek(topicPartition, whereTo); } } } @@ -3350,7 +3350,7 @@ public void seekToEnd(Collection partitions) { @Override public void seekRelative(String topic, int partition, long offset, boolean toCurrent) { if (toCurrent) { - this.seeks.add(new TopicPartitionOffset(topic, partition, offset, toCurrent)); + this.seeks.add(new TopicPartitionOffset(topic, partition, offset, true)); } else if (offset >= 0) { this.seeks.add(new TopicPartitionOffset(topic, partition, offset, SeekPosition.BEGINNING)); @@ -3874,20 +3874,13 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti } - private static final class OffsetMetadata { - - final Long offset; // NOSONAR - - final boolean relativeToCurrent; // NOSONAR - - final SeekPosition seekPosition; // NOSONAR - - OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) { - this.offset = offset; - this.relativeToCurrent = relativeToCurrent; - this.seekPosition = seekPosition; - } - + /** + * Offset metadata record. + * @param offset current offset. + * @param relativeToCurrent relative to current. + * @param seekPosition seek position strategy. + */ + private record OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) { } private class StopCallback implements BiConsumer { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java b/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java index 05919ff952..2c011d6c24 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java @@ -16,13 +16,20 @@ package org.springframework.kafka.streams; +import java.util.Properties; + +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreType; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.lang.Nullable; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** * Provide a wrapper API around the interactive query stores in Kafka Streams. @@ -75,20 +82,80 @@ public void setRetryTemplate(RetryTemplate retryTemplate) { * @return queryable store. */ public T retrieveQueryableStore(String storeName, QueryableStoreType storeType) { + populateKafkaStreams(); + StoreQueryParameters storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType); + + return this.retryTemplate.execute(context -> { + try { + return this.kafkaStreams.store(storeQueryParams); + } + catch (Exception e) { + throw new IllegalStateException("Error retrieving state store: " + storeName, e); + } + }); + } + + private void populateKafkaStreams() { if (this.kafkaStreams == null) { this.kafkaStreams = this.streamsBuilderFactoryBean.getKafkaStreams(); } Assert.notNull(this.kafkaStreams, "KafkaStreams cannot be null. " + "Make sure that the corresponding StreamsBuilderFactoryBean has started properly."); - StoreQueryParameters storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType); + } + + /** + * Retrieve the current {@link HostInfo} where this Kafka Streams application is running on. + * This {link @HostInfo} is different from the Kafka `bootstrap.server` property, and is based on + * the Kafka Streams configuration property `application.server` where user-defined REST + * endpoints can be invoked per each Kafka Streams application instance. + * If this property - `application.server` - is not available from the end-user application, then null is returned. + * @return the current {@link HostInfo} + */ + @Nullable + public HostInfo getCurrentKafkaStreamsApplicationHostInfo() { + Properties streamsConfiguration = this.streamsBuilderFactoryBean + .getStreamsConfiguration(); + if (streamsConfiguration != null && streamsConfiguration.containsKey("application.server")) { + String applicationServer = (String) streamsConfiguration.get("application.server"); + String[] appServerComponents = StringUtils.split(applicationServer, ":"); + if (appServerComponents != null) { + return new HostInfo(appServerComponents[0], Integer.parseInt(appServerComponents[1])); + } + } + return null; + } + /** + * Retrieve the {@link HostInfo} where the provided store and key are hosted on. This may + * not be the current host that is running the application. Kafka Streams will look + * through all the consumer instances under the same application id and retrieves the + * proper host. Note that the end user applications must provide `application.server` as a + * configuration property for all the application instances when calling this method. + * If this is not available, then null maybe returned. + * @param generic type for key + * @param store store name + * @param key key to look for + * @param serializer {@link Serializer} for the key + * @return the {@link HostInfo} where the key for the provided store is hosted currently + */ + public HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer serializer) { + populateKafkaStreams(); return this.retryTemplate.execute(context -> { + Throwable throwable = null; try { - return this.kafkaStreams.store(storeQueryParams); + KeyQueryMetadata keyQueryMetadata = this.kafkaStreams.queryMetadataForKey(store, key, serializer); + if (keyQueryMetadata != null) { + return keyQueryMetadata.activeHost(); + } } catch (Exception e) { - throw new IllegalStateException("Error retrieving state store: " + storeName, e); + throwable = e; } + // In addition to the obvious case of a valid exception above, if keyQueryMetadata was null for any + // transient reasons, let the retry kick in by forcing an exception below. + throw new IllegalStateException( + "Error when retrieving state store.", throwable != null ? throwable : + new Throwable("KeyQueryMetadata is not yet available.")); }); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index 610cd0f852..a2c3a04687 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -381,7 +381,7 @@ public void testNestedTxProducerIsCached() throws Exception { latch.countDown(); }); KafkaTransactionManager tm = new KafkaTransactionManager<>(pfTx); - containerProps.setTransactionManager(tm); + containerProps.setKafkaAwareTransactionManager(tm); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.start(); @@ -432,7 +432,7 @@ public void testNestedTxProducerIsFixed() throws Exception { latch.countDown(); }); KafkaTransactionManager tm = new KafkaTransactionManager<>(pfTx); - containerProps.setTransactionManager(tm); + containerProps.setKafkaAwareTransactionManager(tm); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.start(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java index 7de877420a..226947ed50 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -120,7 +120,7 @@ void testLatestOnlyTx() throws InterruptedException { latch.countDown(); return null; }).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class)); - props.setTransactionManager(tm); + props.setKafkaAwareTransactionManager(tm); this.registry.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); } @@ -135,8 +135,7 @@ void testLatestOnlyNoTx() throws InterruptedException { KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); Producer producer = mock(Producer.class); given(pf.createProducer(any())).willReturn(producer); - CountDownLatch latch = new CountDownLatch(1); - props.setTransactionManager(tm); + props.setKafkaAwareTransactionManager(tm); this.registry.start(); assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); verify(producer, never()).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class)); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java index 1a25253a41..c58b25da18 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,9 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -39,6 +41,7 @@ * * @author Gary Russell * @author Adrian Chlebosz + * @author Antonin Arquey * @since 2.8 * */ @@ -173,6 +176,48 @@ void testDefaultDelegateIsApplied() { verify(defaultHandler).handleRemaining(any(), any(), any(), any()); } + @Test + void testAddIncompatibleAckAfterHandleDelegate() { + var defaultHandler = mock(CommonErrorHandler.class); + given(defaultHandler.isAckAfterHandle()).willReturn(true); + var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler); + var delegate = mock(CommonErrorHandler.class); + given(delegate.isAckAfterHandle()).willReturn(false); + + assertThatThrownBy(() -> delegatingErrorHandler.addDelegate(IllegalStateException.class, delegate)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("All delegates must return the same value when calling 'isAckAfterHandle()'"); + } + + @Test + void testAddIncompatibleSeeksAfterHandlingDelegate() { + var defaultHandler = mock(CommonErrorHandler.class); + given(defaultHandler.seeksAfterHandling()).willReturn(true); + var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler); + var delegate = mock(CommonErrorHandler.class); + given(delegate.seeksAfterHandling()).willReturn(false); + + assertThatThrownBy(() -> delegatingErrorHandler.addDelegate(IllegalStateException.class, delegate)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("All delegates must return the same value when calling 'seeksAfterHandling()'"); + } + + @Test + void testAddMultipleDelegatesWithOneIncompatible() { + var defaultHandler = mock(CommonErrorHandler.class); + given(defaultHandler.seeksAfterHandling()).willReturn(true); + var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler); + var one = mock(CommonErrorHandler.class); + given(one.seeksAfterHandling()).willReturn(true); + var two = mock(CommonErrorHandler.class); + given(one.seeksAfterHandling()).willReturn(false); + Map, CommonErrorHandler> delegates = Map.of(IllegalStateException.class, one, IOException.class, two); + + assertThatThrownBy(() -> delegatingErrorHandler.setErrorHandlers(delegates)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("All delegates must return the same value when calling 'seeksAfterHandling()'"); + } + private Exception wrap(Exception ex) { return new ListenerExecutionFailedException("test", ex); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java index 2b82126b81..a48a15f711 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -73,11 +72,13 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager; import org.springframework.lang.Nullable; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.support.TransactionSynchronizationManager; /** * @author Gary Russell + * @author Wang Zhiyang + * @author Soby Chacko + * * @since 2.2.4 * */ @@ -130,7 +131,7 @@ else if (event instanceof ConsumerFailedToStartEvent) { exec.destroy(); } - @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void testCorrectContainerForConsumerError() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); @@ -200,10 +201,8 @@ void delayedIdleEvent() throws InterruptedException { containerProperties); CountDownLatch latch1 = new CountDownLatch(1); CountDownLatch latch2 = new CountDownLatch(2); - AtomicReference eventTime = new AtomicReference<>(); container.setApplicationEventPublisher(event -> { if (event instanceof ListenerContainerIdleEvent) { - eventTime.set(System.currentTimeMillis()); latch1.countDown(); latch2.countDown(); } @@ -263,7 +262,7 @@ void testSyncRelativeSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -304,7 +303,7 @@ void testAsyncRelativeSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); Map>> recordMap = new HashMap<>(); recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar"))); recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); @@ -343,7 +342,7 @@ void testAsyncRelativeSeeks() throws InterruptedException { verify(consumer).seekToEnd(Collections.singletonList(tp2)); verify(consumer).seek(tp2, 70L); // position - 30 (seekToEnd ignored by mock) verify(consumer).seekToBeginning(Collections.singletonList(tp3)); - verify(consumer).seek(tp3, 30L); + verify(consumer).seek(tp3, 130L); // position + 30 (seekToBeginning ignored by mock) container.stop(); } @@ -363,7 +362,7 @@ void testSyncTimestampSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -410,7 +409,7 @@ void testAsyncTimestampSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); Map>> recordMap = new HashMap<>(); recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar"))); recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); @@ -508,7 +507,9 @@ void testBatchInterceptBeforeTx1() throws InterruptedException { } @SuppressWarnings({ "rawtypes", "unchecked" }) - void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batch) throws InterruptedException { + void testIntercept(boolean beforeTx, @Nullable AssignmentCommitOption option, boolean batch) + throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); final Consumer consumer = mock(Consumer.class); TopicPartition tp0 = new TopicPartition("foo", 0); @@ -523,7 +524,7 @@ void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batc Thread.sleep(10); return firstOrSecondPoll.incrementAndGet() < 3 ? records : empty; }).given(consumer).poll(any()); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -560,7 +561,7 @@ void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batc given(tm.getProducerFactory()).willReturn(pf); Producer producer = mock(Producer.class); given(pf.createProducer()).willReturn(producer); - containerProperties.setTransactionManager(tm); + containerProperties.setKafkaAwareTransactionManager(tm); List order = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(option == null ? 2 : 3); willAnswer(inv -> { @@ -661,104 +662,6 @@ public void failure(ConsumerRecords records, Exception exception, Consumer consu } } - @Test - @SuppressWarnings({ "rawtypes", "unchecked" }) - void testInterceptInTxNonKafkaTM() throws InterruptedException { - ConsumerFactory consumerFactory = mock(ConsumerFactory.class); - final Consumer consumer = mock(Consumer.class); - TopicPartition tp0 = new TopicPartition("foo", 0); - ConsumerRecord record1 = new ConsumerRecord("foo", 0, 0L, "bar", "baz"); - ConsumerRecords records = new ConsumerRecords( - Collections.singletonMap(tp0, Collections.singletonList(record1))); - ConsumerRecords empty = new ConsumerRecords(Collections.emptyMap()); - AtomicInteger firstOrSecondPoll = new AtomicInteger(); - willAnswer(invocation -> { - Thread.sleep(10); - return firstOrSecondPoll.incrementAndGet() < 2 ? records : empty; - }).given(consumer).poll(any()); - List assignments = Arrays.asList(tp0); - willAnswer(invocation -> { - ((ConsumerRebalanceListener) invocation.getArgument(1)) - .onPartitionsAssigned(assignments); - return null; - }).given(consumer).subscribe(any(Collection.class), any()); - given(consumer.position(any())).willReturn(0L); - given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides())) - .willReturn(consumer); - ContainerProperties containerProperties = new ContainerProperties("foo"); - containerProperties.setGroupId("grp"); - AtomicReference>> received = new AtomicReference<>(); - containerProperties.setMessageListener((MessageListener) rec -> { - }); - containerProperties.setMissingTopicsFatal(false); - List order = new ArrayList<>(); - AtomicReference latch = new AtomicReference<>(new CountDownLatch(2)); - PlatformTransactionManager tm = mock(PlatformTransactionManager.class); - willAnswer(inv -> { - order.add("tx"); - latch.get().countDown(); - return null; - }).given(tm).getTransaction(any()); - containerProperties.setTransactionManager(tm); - ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory, - containerProperties); - AtomicReference successCalled = new AtomicReference<>(new CountDownLatch(1)); - container.setRecordInterceptor(new RecordInterceptor() { - - @Override - @Nullable - public ConsumerRecord intercept(ConsumerRecord rec, Consumer consumer) { - order.add("interceptor"); - latch.get().countDown(); - return rec; - } - - @Override - public void success(ConsumerRecord record, Consumer consumer) { - order.add("success"); - successCalled.get().countDown(); - } - - }); - container.setBatchInterceptor(new BatchInterceptor() { - - @Override - @Nullable - public ConsumerRecords intercept(ConsumerRecords recs, Consumer consumer) { - order.add("b.interceptor"); - latch.get().countDown(); - return new ConsumerRecords(Collections.singletonMap(tp0, Collections.singletonList(record1))); - } - - @Override - public void success(ConsumerRecords records, Consumer consumer) { - order.add("b.success"); - successCalled.get().countDown(); - } - - }); - container.setInterceptBeforeTx(false); - container.start(); - try { - assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(successCalled.get().await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(order).containsExactly("tx", "interceptor", "success"); - container.stop(); - latch.set(new CountDownLatch(2)); - successCalled.set(new CountDownLatch(1)); - container.getContainerProperties().setMessageListener((BatchMessageListener) recs -> { - }); - firstOrSecondPoll.set(0); - container.start(); - assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(successCalled.get().await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(order).containsExactly("tx", "interceptor", "success", "tx", "b.interceptor", "b.success"); - } - finally { - container.stop(); - } - } - @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void testNoCommitOnAssignmentWithEarliest() throws InterruptedException { @@ -771,7 +674,7 @@ void testNoCommitOnAssignmentWithEarliest() throws InterruptedException { return records; }).given(consumer).poll(any()); TopicPartition tp0 = new TopicPartition("foo", 0); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -814,7 +717,7 @@ private void testInitialCommitIBasedOnCommitted(boolean committed) throws Interr return records; }).given(consumer).poll(any()); TopicPartition tp0 = new TopicPartition("foo", 0); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -865,7 +768,7 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept return null; }).given(consumer).pause(any()); TopicPartition tp0 = new TopicPartition("foo", 0); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); AtomicReference rebal = new AtomicReference<>(); willAnswer(invocation -> { rebal.set(invocation.getArgument(1)); @@ -911,14 +814,14 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLegacyAssi TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List allAssignments = Arrays.asList(tp0, tp1, tp2, tp3); + List allAssignments = List.of(tp0, tp1, tp2, tp3); Map>> allRecordMap = new HashMap<>(); allRecordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar"))); allRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar"))); allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List afterRevokeAssignments = Arrays.asList(tp1, tp3); + List afterRevokeAssignments = List.of(tp1, tp3); Map>> afterRevokeRecordMap = new HashMap<>(); afterRevokeRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); afterRevokeRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar"))); @@ -979,10 +882,11 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(allAssignments); rebal.get().onPartitionsAssigned(afterRevokeAssignments); rebalLatch.countDown(); @@ -991,11 +895,13 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, return ConsumerRecords.empty(); } return afterRevokeRecords; - default: + } + default -> { if (paused.get()) { return ConsumerRecords.empty(); } return afterRevokeRecords; + } } }).given(consumer).poll(any()); container.start(); @@ -1023,7 +929,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List allAssignments = Arrays.asList(tp0, tp1, tp2, tp3); + List allAssignments = List.of(tp0, tp1, tp2, tp3); Map>> allRecordMap = new LinkedHashMap<>(); ConsumerRecord record0 = new ConsumerRecord("foo", 0, 0, null, "bar"); ConsumerRecord record1 = new ConsumerRecord("foo", 1, 0, null, "bar"); @@ -1032,7 +938,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar"))); allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List revokedAssignments = Arrays.asList(tp0, tp2); + List revokedAssignments = List.of(tp0, tp2); AtomicInteger pollPhase = new AtomicInteger(); Consumer consumer = mock(Consumer.class); @@ -1044,9 +950,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign return null; }).given(consumer).subscribe(any(Collection.class), any()); CountDownLatch pauseLatch = new CountDownLatch(1); - AtomicBoolean paused = new AtomicBoolean(); willAnswer(inv -> { - paused.set(true); pauseLatch.countDown(); return null; }).given(consumer).pause(any()); @@ -1087,17 +991,20 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(revokedAssignments); rebal.get().onPartitionsAssigned(Collections.emptyList()); rebalLatch.countDown(); continueLatch.await(10, TimeUnit.SECONDS); return ConsumerRecords.empty(); - default: + } + default -> { return ConsumerRecords.empty(); + } } }).given(consumer).poll(any()); container.start(); @@ -1128,14 +1035,14 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws InterruptedException { TopicPartition tp0 = new TopicPartition("foo", 0); TopicPartition tp1 = new TopicPartition("foo", 1); - List allAssignments = Arrays.asList(tp0, tp1); + List allAssignments = List.of(tp0, tp1); Map>> allRecordMap = new HashMap<>(); allRecordMap.put(tp0, List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar"))); allRecordMap.put(tp1, List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List afterRevokeAssignments = Arrays.asList(tp1); + List afterRevokeAssignments = List.of(tp1); AtomicInteger pollPhase = new AtomicInteger(); Consumer consumer = mock(Consumer.class); @@ -1147,9 +1054,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I return null; }).given(consumer).subscribe(any(Collection.class), any()); CountDownLatch pauseLatch = new CountDownLatch(1); - AtomicBoolean paused = new AtomicBoolean(); willAnswer(inv -> { - paused.set(true); pauseLatch.countDown(); return null; }).given(consumer).pause(any()); @@ -1171,17 +1076,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(allAssignments); rebal.get().onPartitionsAssigned(afterRevokeAssignments); rebalLatch.countDown(); continueLatch.await(10, TimeUnit.SECONDS); return ConsumerRecords.empty(); - default: + } + default -> { return ConsumerRecords.empty(); + } } }).given(consumer).poll(any()); container.start(); @@ -1206,14 +1114,13 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws InterruptedException { TopicPartition tp0 = new TopicPartition("foo", 0); TopicPartition tp1 = new TopicPartition("foo", 1); - List allAssignments = Arrays.asList(tp0, tp1); + List allAssignments = List.of(tp0, tp1); Map>> allRecordMap = new HashMap<>(); allRecordMap.put(tp0, List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar"))); allRecordMap.put(tp1, List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List afterRevokeAssignments = Arrays.asList(tp1); AtomicInteger pollPhase = new AtomicInteger(); Consumer consumer = mock(Consumer.class); @@ -1225,9 +1132,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int return null; }).given(consumer).subscribe(any(Collection.class), any()); CountDownLatch pauseLatch = new CountDownLatch(1); - AtomicBoolean paused = new AtomicBoolean(); willAnswer(inv -> { - paused.set(true); pauseLatch.countDown(); return null; }).given(consumer).pause(any()); @@ -1249,17 +1154,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(List.of(tp0)); rebal.get().onPartitionsAssigned(List.of(new TopicPartition("foo", 2))); rebalLatch.countDown(); continueLatch.await(10, TimeUnit.SECONDS); return ConsumerRecords.empty(); - default: + } + default -> { return ConsumerRecords.empty(); + } } }).given(consumer).poll(any()); container.start(); @@ -1285,7 +1193,7 @@ private AcknowledgingMessageListener ackOffset1() { @Override public void onMessage(ConsumerRecord rec, @Nullable Acknowledgment ack) { - if (rec.offset() == 1) { + if (rec.offset() == 1 && ack != null) { ack.acknowledge(); } } @@ -1299,7 +1207,7 @@ public void onMessage(Object data) { public static class TestMessageListener1 implements MessageListener, ConsumerSeekAware { - private static ThreadLocal callbacks = new ThreadLocal<>(); + private static final ThreadLocal callbacks = new ThreadLocal<>(); CountDownLatch latch = new CountDownLatch(1); @@ -1335,7 +1243,7 @@ public void onIdleContainer(Map assignments, ConsumerSeekC public static class TestMessageListener2 implements MessageListener, ConsumerSeekAware { - private static ThreadLocal callbacks = new ThreadLocal<>(); + private static final ThreadLocal callbacks = new ThreadLocal<>(); CountDownLatch latch = new CountDownLatch(1); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java index 5cd79076b9..22f5c38d21 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,11 @@ package org.springframework.kafka.listener; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -34,16 +37,19 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RebalanceInProgressException; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.springframework.core.log.LogAccessor; import org.springframework.data.util.DirectFieldAccessFallbackBeanWrapper; +import org.springframework.kafka.KafkaException; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff; /** * @author Gary Russell + * @author Francois Rosiere * @since 3.0.3 * */ @@ -52,15 +58,6 @@ public class FailedBatchProcessorTests { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void indexOutOfBounds() { - class TestFBP extends FailedBatchProcessor { - - TestFBP(BiConsumer, Exception> recoverer, BackOff backOff, - CommonErrorHandler fallbackHandler) { - - super(recoverer, backOff, fallbackHandler); - } - - } CommonErrorHandler mockEH = mock(CommonErrorHandler.class); willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any()); @@ -83,15 +80,6 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void recordNotPresent() { - class TestFBP extends FailedBatchProcessor { - - TestFBP(BiConsumer, Exception> recoverer, BackOff backOff, - CommonErrorHandler fallbackHandler) { - - super(recoverer, backOff, fallbackHandler); - } - - } CommonErrorHandler mockEH = mock(CommonErrorHandler.class); willThrow(new IllegalStateException("fallback")).given(mockEH).handleBatch(any(), any(), any(), any(), any()); @@ -114,4 +102,35 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab assertThat(output).contains("Record not found in batch: topic-42@123;"); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void testExceptionDuringCommit() { + CommonErrorHandler mockEH = mock(CommonErrorHandler.class); + willThrow(new IllegalStateException("ise")).given(mockEH).handleBatch(any(), any(), any(), any(), any()); + + ConsumerRecord rec1 = new ConsumerRecord("topic", 0, 0L, null, null); + ConsumerRecord rec2 = new ConsumerRecord("topic", 0, 1L, null, null); + ConsumerRecord rec3 = new ConsumerRecord("topic", 0, 2L, null, null); + + ConsumerRecords records = new ConsumerRecords(Map.of(new TopicPartition("topic", 0), List.of(rec1, rec2, rec3))); + TestFBP testFBP = new TestFBP((rec, ex) -> { }, new FixedBackOff(2L, 2L), mockEH); + final Consumer consumer = mock(Consumer.class); + willThrow(new RebalanceInProgressException("rebalance in progress")).given(consumer).commitSync(anyMap(), any()); + final MessageListenerContainer mockMLC = mock(MessageListenerContainer.class); + willReturn(new ContainerProperties("topic")).given(mockMLC).getContainerProperties(); + assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + testFBP.handle(new BatchListenerFailedException("topic", rec2), + records, consumer, mockMLC, mock(Runnable.class)) + ).withMessage("Seek to current after exception"); + } + + static class TestFBP extends FailedBatchProcessor { + + TestFBP(BiConsumer, Exception> recoverer, BackOff backOff, + CommonErrorHandler fallbackHandler) { + + super(recoverer, backOff, fallbackHandler); + } + + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 7909189e78..3c8207a69f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -125,10 +125,10 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.ContainerTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.kafka.transaction.KafkaAwareTransactionManager; import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; -import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.backoff.FixedBackOff; /** @@ -3898,6 +3898,9 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early containerProps.setMessageListener((MessageListener) msg -> { }); containerProps.setClientId("clientId"); + if (early) { + containerProps.setKafkaAwareTransactionManager(mock(KafkaAwareTransactionManager.class)); + } RecordInterceptor recordInterceptor = spy(new RecordInterceptor() { @@ -3922,7 +3925,7 @@ public ConsumerRecord intercept(ConsumerRecord inOrder.verify(recordInterceptor).setupThreadState(eq(consumer)); inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer)); - if (ackMode.equals(AckMode.RECORD)) { + if (AckMode.RECORD.equals(ackMode)) { inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))), any(Duration.class)); } @@ -3930,15 +3933,25 @@ public ConsumerRecord intercept(ConsumerRecord verify(consumer, never()).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))), any(Duration.class)); } + inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer)); + inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer)); inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer)); - inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))), - any(Duration.class)); + if (AckMode.RECORD.equals(ackMode)) { + inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))), + any(Duration.class)); + } + inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer)); + inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer)); + if (AckMode.BATCH.equals(ackMode)) { + inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))), + any(Duration.class)); + } container.stop(); } @ParameterizedTest(name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}") @ValueSource(booleans = { true, false }) - @SuppressWarnings({"unchecked"}) + @SuppressWarnings({ "unchecked", "deprecation" }) public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); @@ -3968,8 +3981,8 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception containerProps.setMessageListener((BatchMessageListener) msgs -> { }); containerProps.setClientId("clientId"); - if (!early) { - containerProps.setTransactionManager(mock(PlatformTransactionManager.class)); + if (early) { + containerProps.setKafkaAwareTransactionManager(mock(KafkaAwareTransactionManager.class)); } BatchInterceptor interceptor = spy(new BatchInterceptor() { @@ -3995,6 +4008,7 @@ public ConsumerRecords intercept(ConsumerRecords(cf, containerProps); container.start(); assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.subBatchPerPartition")) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java index 1d3cd8dab7..0e7c30c32d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -220,7 +220,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(AckMode.BATCH); - factory.getContainerProperties().setTransactionManager(tm()); + factory.getContainerProperties().setKafkaAwareTransactionManager(tm()); factory.setBatchListener(true); factory.getContainerProperties().setSubBatchPerPartition(true); return factory; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java index 859bfc25be..6f8d1fe57a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -206,7 +206,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(AckMode.BATCH); - factory.getContainerProperties().setTransactionManager(tm()); + factory.getContainerProperties().setKafkaAwareTransactionManager(tm()); factory.getContainerProperties().setSubBatchPerPartition(true); factory.setBatchListener(true); return factory; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TestOOMError.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TestOOMError.java index 0b8c7627db..29f0bafc94 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TestOOMError.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TestOOMError.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,7 +77,8 @@ public void testOOMCMLC() throws Exception { containerProps.setClientId("clientId"); ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(cf, containerProps); - CountDownLatch stopLatch = new CountDownLatch(1); + // concurrent container publishes one time, child container publishes concurrency time. + CountDownLatch stopLatch = new CountDownLatch(2); container.setApplicationEventPublisher(e -> { if (e instanceof ContainerStoppedEvent) { stopLatch.countDown(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 8d2a13b364..cd339423dd 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -95,16 +95,14 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.messaging.MessageHeaders; import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.support.AbstractPlatformTransactionManager; import org.springframework.transaction.support.DefaultTransactionDefinition; -import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.util.backoff.FixedBackOff; /** * @author Gary Russell * @author Artem Bilan * @author Wang Zhiyang + * @author Soby Chacko * * @since 1.3 * @@ -112,7 +110,7 @@ @EmbeddedKafka(topics = { TransactionalContainerTests.topic1, TransactionalContainerTests.topic2, TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4, TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7, - TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT }, + TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9}, brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" }) public class TransactionalContainerTests { @@ -221,7 +219,7 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a ContainerProperties props = new ContainerProperties("foo"); props.setAckMode(ackMode); props.setGroupId("group"); - props.setTransactionManager(tm); + props.setKafkaAwareTransactionManager(tm); props.setAssignmentCommitOption(AssignmentCommitOption.ALWAYS); props.setEosMode(eosMode); props.setStopContainerWhenFenced(stopWhenFenced); @@ -329,7 +327,7 @@ public void testConsumeAndProduceTransactionRollback() throws Exception { ContainerProperties props = new ContainerProperties(new TopicPartitionOffset("foo", 0), new TopicPartitionOffset("foo", 1)); props.setGroupId("group"); - props.setTransactionManager(tm); + props.setKafkaAwareTransactionManager(tm); props.setDeliveryAttemptHeader(true); final KafkaTemplate template = new KafkaTemplate(pf); AtomicReference
delivery = new AtomicReference(); @@ -400,7 +398,7 @@ public void testConsumeAndProduceTransactionRollbackBatch() throws Exception { ContainerProperties props = new ContainerProperties(new TopicPartitionOffset("foo", 0), new TopicPartitionOffset("foo", 1)); props.setGroupId("group"); - props.setTransactionManager(tm); + props.setKafkaAwareTransactionManager(tm); props.setSubBatchPerPartition(false); final KafkaTemplate template = new KafkaTemplate(pf); props.setMessageListener((BatchMessageListener) recordlist -> { @@ -462,7 +460,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception { given(pf.createProducer(isNull())).willReturn(producer); ContainerProperties props = new ContainerProperties(new TopicPartitionOffset("foo", 0)); props.setGroupId("group"); - props.setTransactionManager(new SomeOtherTransactionManager()); + props.setKafkaAwareTransactionManager(new KafkaTransactionManager<>(pf)); final KafkaTemplate template = new KafkaTemplate(pf); ConsumerGroupMetadata meta = mock(ConsumerGroupMetadata.class); props.setMessageListener((MessageListener) m -> { @@ -529,7 +527,7 @@ public void testRollbackRecord() throws Exception { @SuppressWarnings({ "rawtypes" }) KafkaTransactionManager tm = new KafkaTransactionManager(pf); - containerProps.setTransactionManager(tm); + containerProps.setKafkaAwareTransactionManager(tm); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testRollbackRecord"); @@ -595,12 +593,7 @@ public void testFixLagKTM() throws InterruptedException { testFixLagGuts(topic6, 1); } - @Test - public void testFixLagOtherTM() throws InterruptedException { - testFixLagGuts(topic7, 2); - } - - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked"}) private void testFixLagGuts(String topic, int whichTm) throws InterruptedException { Map props = KafkaTestUtils.consumerProps("txTest2", "false", embeddedKafka); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); @@ -617,10 +610,8 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti case 0: break; case 1: - containerProps.setTransactionManager(new KafkaTransactionManager<>(pf)); + containerProps.setKafkaAwareTransactionManager(new KafkaTransactionManager<>(pf)); break; - case 2: - containerProps.setTransactionManager(new SomeOtherTransactionManager()); } final KafkaTemplate template = new KafkaTemplate<>(pf); @@ -656,7 +647,7 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti pf.destroy(); } - @SuppressWarnings({ "unchecked"}) + @SuppressWarnings("unchecked") @Test public void testMaxFailures() throws Exception { String group = "groupInARBP"; @@ -682,7 +673,7 @@ public void testMaxFailures() throws Exception { @SuppressWarnings({ "rawtypes" }) KafkaTransactionManager tm = new KafkaTransactionManager(pf); - containerProps.setTransactionManager(tm); + containerProps.setKafkaAwareTransactionManager(tm); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testMaxFailures"); @@ -787,9 +778,8 @@ public void testBatchListenerMaxFailuresOnRecover() throws Exception { } }); - @SuppressWarnings({ "rawtypes" }) - KafkaTransactionManager tm = new KafkaTransactionManager(pf); - containerProps.setTransactionManager(tm); + KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); + containerProps.setKafkaAwareTransactionManager(tm); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testBatchListenerMaxFailures"); @@ -908,7 +898,7 @@ public void testRollbackProcessorCrash() throws Exception { @SuppressWarnings({ "rawtypes" }) KafkaTransactionManager tm = new KafkaTransactionManager(pf); - containerProps.setTransactionManager(tm); + containerProps.setKafkaAwareTransactionManager(tm); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testRollbackNoRetries"); @@ -944,10 +934,10 @@ public void testRollbackProcessorCrash() throws Exception { assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); } - @SuppressWarnings("unchecked") @Test public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Exception { - Map props = KafkaTestUtils.consumerProps("testBatchListenerRollbackNoRetries", "false", embeddedKafka); + String group = "testBatchListenerRollbackNoRetries"; + Map props = KafkaTestUtils.consumerProps(group, "false", embeddedKafka); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); @@ -960,24 +950,23 @@ public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Excepti pf.setTransactionIdPrefix("batchListener.noRetries."); final KafkaTemplate template = new KafkaTemplate<>(pf); final CountDownLatch latch = new CountDownLatch(1); - AtomicReference data = new AtomicReference<>(); + AtomicReference>> data = new AtomicReference<>(); containerProps.setMessageListener((BatchMessageListener) recordList -> { for (ConsumerRecord record : recordList) { - data.set(record.value()); if (record.offset() == 0) { throw new BatchListenerFailedException("fail for no retry", record); } - latch.countDown(); } + data.set(recordList); + latch.countDown(); }); - @SuppressWarnings({ "rawtypes" }) - KafkaTransactionManager tm = new KafkaTransactionManager(pf); - containerProps.setTransactionManager(tm); + KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); + containerProps.setKafkaAwareTransactionManager(tm); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testBatchListenerRollbackNoRetries"); - final KafkaOperations dlTemplate = spy(new KafkaTemplate<>(pf)); + final KafkaOperations dlTemplate = new KafkaTemplate<>(pf); AtomicBoolean recovererShouldFail = new AtomicBoolean(true); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(dlTemplate) { @Override @@ -989,7 +978,7 @@ public void accept(ConsumerRecord record, Consumer consumer, Excepti }; DefaultAfterRollbackProcessor afterRollbackProcessor = - spy(new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(0L, 0L), dlTemplate, true)); + new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(0L, 0L), dlTemplate, true); container.setAfterRollbackProcessor(afterRollbackProcessor); final CountDownLatch stopLatch = new CountDownLatch(1); container.setApplicationEventPublisher(e -> { @@ -1009,8 +998,16 @@ public void accept(ConsumerRecord record, Consumer consumer, Excepti template.sendDefault(0, 0, "qux"); return null; }); + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); - assertThat(data.get()).isEqualTo("qux"); + assertThat(data.get()).isNotNull(); + ConsumerRecord crBaz = data.get().get(0); + ConsumerRecord crQux = data.get().get(1); + assertThat(crBaz.offset()).isEqualTo(2L); + assertThat(crBaz.value()).isEqualTo("baz"); + assertThat(crQux.offset()).isEqualTo(3L); + assertThat(crQux.value()).isEqualTo("qux"); + container.stop(); pf.destroy(); assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); @@ -1049,7 +1046,7 @@ void testNoAfterRollbackWhenFenced() throws Exception { ContainerProperties props = new ContainerProperties(new TopicPartitionOffset("foo", 0), new TopicPartitionOffset("foo", 1)); props.setGroupId("group"); - props.setTransactionManager(tm); + props.setKafkaAwareTransactionManager(tm); DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setTimeout(42); def.setName("myTx"); @@ -1085,29 +1082,4 @@ void testNoAfterRollbackWhenFenced() throws Exception { assertThatIllegalStateException().isThrownBy(container::start); } - @SuppressWarnings("serial") - public static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager { - - @Override - protected Object doGetTransaction() throws TransactionException { - return new Object(); - } - - @Override - protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { - //noop - } - - @Override - protected void doCommit(DefaultTransactionStatus status) throws TransactionException { - //noop - } - - @Override - protected void doRollback(DefaultTransactionStatus status) throws TransactionException { - //noop - } - - } - } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java index d8b7157252..a515aa6978 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,6 +65,8 @@ /** * @author Tomaz Fernandes + * @author Wang Zhiyang + * * @since 2.8.4 */ @SpringJUnitConfig @@ -247,9 +249,7 @@ static class FrameworkFatalTopicListener { @Autowired CountDownLatchContainer container; - @SuppressWarnings("deprecation") - @RetryableTopic(sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC, - backoff = @Backoff(50)) + @RetryableTopic(backoff = @Backoff(50)) @KafkaListener(topics = FRAMEWORK_FATAL_EXCEPTION_TOPIC) public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { container.fatalFrameworkLatch.countDown(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java index 1f32c923c9..15ca2bdc68 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -384,6 +384,7 @@ static class FourthTopicListener { CountDownLatchContainer container; @RetryableTopic(dltStrategy = DltStrategy.NO_DLT, attempts = "4", backoff = @Backoff(300), + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, kafkaTemplate = "${kafka.template}") @KafkaListener(topics = FOURTH_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) public void listenNoDlt(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { @@ -410,6 +411,7 @@ static class FifthTopicListener1 { numPartitions = "2", retryTopicSuffix = "-listener1", dltTopicSuffix = "-listener1-dlt", topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, kafkaTemplate = "${kafka.template}") @KafkaListener(id = "fifthTopicId1", topicPartitions = {@TopicPartition(topic = TWO_LISTENERS_TOPIC, partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}, @@ -442,6 +444,7 @@ static class FifthTopicListener2 { numPartitions = "2", retryTopicSuffix = "-listener2", dltTopicSuffix = "-listener2-dlt", topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS, kafkaTemplate = "${kafka.template}") @KafkaListener(id = "fifthTopicId2", topicPartitions = {@TopicPartition(topic = TWO_LISTENERS_TOPIC, partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))}, @@ -468,7 +471,8 @@ static class SixthTopicDefaultDLTListener { @Autowired CountDownLatchContainer container; - @RetryableTopic(attempts = "4", backoff = @Backoff(50)) + @RetryableTopic(attempts = "4", backoff = @Backoff(50), + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS) @KafkaListener(id = "manual", topics = MANUAL_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) public void listenNoDlt(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic, @SuppressWarnings("unused") Acknowledgment ack) { @@ -511,8 +515,7 @@ static class FirstReuseRetryTopicListener { @Autowired CountDownLatchContainer container; - @RetryableTopic(attempts = "2", backoff = @Backoff(50), - sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC) + @RetryableTopic(attempts = "2", backoff = @Backoff(50)) @KafkaListener(id = "reuseRetry1", topics = FIRST_REUSE_RETRY_TOPIC, containerFactory = "retryTopicListenerContainerFactory") public void listen1(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { @@ -532,8 +535,7 @@ static class SecondReuseRetryTopicListener { @Autowired CountDownLatchContainer container; - @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 30, maxDelay = 100, multiplier = 2), - sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC) + @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 30, maxDelay = 100, multiplier = 2)) @KafkaListener(id = "reuseRetry2", topics = SECOND_REUSE_RETRY_TOPIC, containerFactory = "retryTopicListenerContainerFactory") public void listen2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { @@ -553,8 +555,7 @@ static class ThirdReuseRetryTopicListener { @Autowired CountDownLatchContainer container; - @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1, maxDelay = 5, multiplier = 1.4), - sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC) + @RetryableTopic(attempts = "5", backoff = @Backoff(delay = 1, maxDelay = 5, multiplier = 1.4)) @KafkaListener(id = "reuseRetry3", topics = THIRD_REUSE_RETRY_TOPIC, containerFactory = "retryTopicListenerContainerFactory") public void listen3(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java index 3473c8a6c7..af5684a843 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,6 +62,8 @@ /** * @author Tomaz Fernandes * @author Cenk Akin + * @author Wang Zhiyang + * * @since 2.8.3 */ @SpringJUnitConfig @@ -121,7 +123,8 @@ static class FirstRetryableKafkaListener { attempts = "4", backoff = @Backoff(delay = 1000, multiplier = 2.0), autoCreateTopics = "false", - topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE) + topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE, + sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS) @KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.FIRST_TOPIC) public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { countDownLatchContainer.countDownLatchFirstRetryable.countDown(); @@ -142,7 +145,7 @@ static class SecondRetryableKafkaListener { @Autowired CountDownLatchContainer countDownLatchContainer; - @RetryableTopic + @RetryableTopic(sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS) @KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC) public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { countDownLatchContainer.countDownLatchSecondRetryable.countDown(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java index 2c887cf8d1..e4509713ed 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java @@ -52,6 +52,8 @@ * @author Tomaz Fernandes * @author Gary Russell * @author Adrian Chlebosz + * @author Wang Zhiyang + * * @since 2.7 */ @SuppressWarnings("deprecation") @@ -314,15 +316,13 @@ void shouldCreateFixedBackoff() { // then List destinationTopicProperties = configuration.getDestinationTopicProperties(); + assertThat(destinationTopicProperties.size()).isEqualTo(3); DestinationTopic destinationTopic = new DestinationTopic("", destinationTopicProperties.get(0)); assertThat(destinationTopic.getDestinationDelay()).isEqualTo(0); DestinationTopic destinationTopic2 = new DestinationTopic("", destinationTopicProperties.get(1)); assertThat(destinationTopic2.getDestinationDelay()).isEqualTo(1000); DestinationTopic destinationTopic3 = new DestinationTopic("", destinationTopicProperties.get(2)); - assertThat(destinationTopic3.getDestinationDelay()).isEqualTo(1000); - DestinationTopic destinationTopic4 = new DestinationTopic("", destinationTopicProperties.get(3)); - assertThat(destinationTopic4.getDestinationDelay()).isEqualTo(0); - + assertThat(destinationTopic3.getDestinationDelay()).isEqualTo(0); } @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java index e59ddcc89c..5865133e44 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java @@ -27,9 +27,12 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; @@ -40,6 +43,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.WallclockTimestampExtractor; +import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.junit.jupiter.api.Test; @@ -108,11 +112,7 @@ class KafkaStreamsInteractiveQueryServiceTests { @Test void retrieveQueryableStore() throws Exception { - this.kafkaTemplate.sendDefault(123, "123"); - this.kafkaTemplate.flush(); - - ConsumerRecord result = resultFuture.get(600, TimeUnit.SECONDS); - assertThat(result).isNotNull(); + ensureKafkaStreamsProcessorIsUpAndRunning(); ReadOnlyKeyValueStore objectObjectReadOnlyKeyValueStore = this.interactiveQueryService .retrieveQueryableStore(STATE_STORE, @@ -121,14 +121,18 @@ void retrieveQueryableStore() throws Exception { assertThat((Long) objectObjectReadOnlyKeyValueStore.get(123)).isGreaterThanOrEqualTo(1); } - @SuppressWarnings("unchecked") - @Test - void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception { + private void ensureKafkaStreamsProcessorIsUpAndRunning() throws InterruptedException, ExecutionException, TimeoutException { this.kafkaTemplate.sendDefault(123, "123"); this.kafkaTemplate.flush(); ConsumerRecord result = resultFuture.get(600, TimeUnit.SECONDS); assertThat(result).isNotNull(); + } + + @SuppressWarnings("unchecked") + @Test + void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception { + ensureKafkaStreamsProcessorIsUpAndRunning(); assertThat(this.streamsBuilderFactoryBean.getKafkaStreams()).isNotNull(); KafkaStreams kafkaStreams = spy(this.streamsBuilderFactoryBean.getKafkaStreams()); @@ -148,6 +152,55 @@ void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception { verify(kafkaStreams, times(3)).store(any(StoreQueryParameters.class)); } + @Test + void currentHostInfo() { + HostInfo currentKafkaStreamsApplicationHostInfo = + this.interactiveQueryService.getCurrentKafkaStreamsApplicationHostInfo(); + assertThat(currentKafkaStreamsApplicationHostInfo.host()).isEqualTo("localhost"); + assertThat(currentKafkaStreamsApplicationHostInfo.port()).isEqualTo(8080); + } + + @Test + void hostInfoForKeyAndStore() throws Exception { + ensureKafkaStreamsProcessorIsUpAndRunning(); + + HostInfo kafkaStreamsApplicationHostInfo = + this.interactiveQueryService.getKafkaStreamsApplicationHostInfo(STATE_STORE, 123, + new IntegerSerializer()); + // In real applications, the above call may return a different server than what is configured + // via application.server on the Kafka Streams where the call was invoked. However, in the case + // of this test, we only have a single Kafka Streams instance and even there, we provide a mock + // value for application.server (localhost:8080). Because of that, that is what we are verifying against. + assertThat(kafkaStreamsApplicationHostInfo.host()).isEqualTo("localhost"); + assertThat(kafkaStreamsApplicationHostInfo.port()).isEqualTo(8080); + } + + @Test + void hostInfoForNonExistentKeyAndStateStore() throws Exception { + ensureKafkaStreamsProcessorIsUpAndRunning(); + + assertThat(this.streamsBuilderFactoryBean.getKafkaStreams()).isNotNull(); + KafkaStreams kafkaStreams = spy(this.streamsBuilderFactoryBean.getKafkaStreams()); + assertThat(kafkaStreams).isNotNull(); + + Field kafkaStreamsField = KafkaStreamsInteractiveQueryService.class.getDeclaredField("kafkaStreams"); + kafkaStreamsField.setAccessible(true); + kafkaStreamsField.set(interactiveQueryService, kafkaStreams); + + IntegerSerializer serializer = new IntegerSerializer(); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(() -> { + this.interactiveQueryService.getKafkaStreamsApplicationHostInfo(NON_EXISTENT_STORE, 12345, + serializer); + }) + .withMessageContaining("Error when retrieving state store."); + + verify(kafkaStreams, times(3)).queryMetadataForKey(NON_EXISTENT_STORE, 12345, + serializer); + } + + @Configuration @EnableKafka @EnableKafkaStreams @@ -204,6 +257,7 @@ public KafkaStreamsConfiguration kStreamsConfigs() { props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080"); return new KafkaStreamsConfiguration(props); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index d67396aa8b..b5caf4e51e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.junit.jupiter.api.Test; @@ -54,7 +55,6 @@ import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -85,14 +85,20 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Wang Zhiyang * * @since 3.0 */ @SpringJUnitConfig -@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" }) +@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "observation.testT3", + ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR}) @DirtiesContext public class ObservationTests { + public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception"; + + public final static String OBSERVATION_ERROR = "observation.error"; + @Test void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate template, @Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler, @@ -106,8 +112,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate hdr.value()).isEqualTo("some foo value".getBytes()); - assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes()); + assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes()); + assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes()); Deque spans = tracer.getSpans(); assertThat(spans).hasSize(4); SimpleSpan span = spans.poll(); @@ -148,14 +154,15 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) } }); + rler.getListenerContainer("obs1").stop(); rler.getListenerContainer("obs1").start(); template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS); assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(listener.record).isNotNull(); headers = listener.record.headers(); - assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes()); - assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes()); + assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes()); + assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes()); assertThat(spans).hasSize(4); span = spans.poll(); assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template"); @@ -230,6 +237,48 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) .doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException")); } + @Test + void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, + @Autowired @Qualifier("throwableTemplate") KafkaTemplate runtimeExceptionTemplate, + @Autowired KafkaListenerEndpointRegistry endpointRegistry) + throws ExecutionException, InterruptedException, TimeoutException { + + runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS); + assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue(); + endpointRegistry.getListenerContainer("obs4").stop(); + + Deque spans = tracer.getSpans(); + assertThat(spans).hasSize(2); + SimpleSpan span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate"); + span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0"); + assertThat(span.getError().getCause()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("obs4 run time exception"); + } + + @Test + void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, + @Autowired @Qualifier("throwableTemplate") KafkaTemplate errorTemplate, + @Autowired KafkaListenerEndpointRegistry endpointRegistry) + throws ExecutionException, InterruptedException, TimeoutException { + + errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS); + assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue(); + endpointRegistry.getListenerContainer("obs5").stop(); + + Deque spans = tracer.getSpans(); + assertThat(spans).hasSize(2); + SimpleSpan span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate"); + span = spans.poll(); + assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs5-0"); + assertThat(span.getError()) + .isInstanceOf(Error.class) + .hasMessage("obs5 error"); + } + @Configuration @EnableKafka public static class Config { @@ -276,6 +325,13 @@ KafkaTemplate customTemplate(ProducerFactory p return template; } + @Bean + KafkaTemplate throwableTemplate(ProducerFactory pf) { + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setObservationEnabled(true); + return template; + } + @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory cf) { @@ -286,7 +342,7 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerF factory.getContainerProperties().setObservationEnabled(true); factory.setContainerCustomizer(container -> { if (container.getListenerId().equals("obs3")) { - ((AbstractMessageListenerContainer) container).setKafkaAdmin(this.mockAdmin); + container.setKafkaAdmin(this.mockAdmin); } }); return factory; @@ -352,6 +408,11 @@ Listener listener(KafkaTemplate template) { return new Listener(template); } + @Bean + ExceptionListener exceptionListener() { + return new ExceptionListener(); + } + } public static class Listener { @@ -387,4 +448,24 @@ void listen3(ConsumerRecord in) { } + public static class ExceptionListener { + + final CountDownLatch latch4 = new CountDownLatch(1); + + final CountDownLatch latch5 = new CountDownLatch(1); + + @KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION) + void listenRuntimeException(ConsumerRecord in) { + this.latch4.countDown(); + throw new IllegalStateException("obs4 run time exception"); + } + + @KafkaListener(id = "obs5", topics = OBSERVATION_ERROR) + void listenError(ConsumerRecord in) { + this.latch5.countDown(); + throw new Error("obs5 error"); + } + + } + }