Skip to content

Commit

Permalink
Some cosmetic improvements of the reference - part 2
Browse files Browse the repository at this point in the history
* tweak as per PR comments
  • Loading branch information
NathanQingyangXu authored Nov 2, 2023
1 parent 9794c58 commit b1e6756
Show file tree
Hide file tree
Showing 14 changed files with 67 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ The time to process a batch of records plus this value must be less than the `ma

|[[idleEventInterval]]<<idleEventInterval,`idleEventInterval`>>
|`null`
|When set, enables publication of `ListenerContainerIdleEvent` s, see xref:kafka/events.adoc[Application Events] and xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers].
|When set, enables publication of `ListenerContainerIdleEvent`+++s+++, see xref:kafka/events.adoc[Application Events] and xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers].
Also see `idleBeforeDataMultiplier`.

|[[idlePartitionEventInterval]]<<idlePartitionEventInterval,`idlePartitionEventInterval`>>
|`null`
|When set, enables publication of `ListenerContainerIdlePartitionEvent` s, see xref:kafka/events.adoc[Application Events] and xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers].
|When set, enables publication of `ListenerContainerIdlePartitionEvent`+++s+++, see xref:kafka/events.adoc[Application Events] and xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers].

|[[kafkaConsumerProperties]]<<kafkaConsumerProperties,`kafkaConsumerProperties`>>
|None
Expand Down Expand Up @@ -274,7 +274,7 @@ See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Han

|[[listenerId]]<<listenerId,`listenerId`>>
|See desc.
|The bean name for user-configured containers or the `id` attribute of `@KafkaListener` s.
|The bean name for user-configured containers or the `id` attribute of `@KafkaListener`+++s+++.

|[[listenerInfo]]<<listenerInfo,`listenerInfo`>>
|null
Expand Down Expand Up @@ -333,22 +333,22 @@ Also see `interceptBeforeTx`.

|[[assignedPartitions2]]<<assignedPartitions2,`assignedPartitions`>>
|(read only)
|The aggregate of partitions currently assigned to this container's child `KafkaMessageListenerContainer` s (explicitly or not).
|The aggregate of partitions currently assigned to this container's child `KafkaMessageListenerContainer`+++s+++ (explicitly or not).

|[[assignedPartitionsByClientId2]]<<assignedPartitionsByClientId2,`assignedPartitionsByClientId`>>
|(read only)
|The partitions currently assigned to this container's child `KafkaMessageListenerContainer` s (explicitly or not), keyed by the child container's consumer's `client.id` property.
|The partitions currently assigned to this container's child `KafkaMessageListenerContainer`+++s+++ (explicitly or not), keyed by the child container's consumer's `client.id` property.

|[[concurrency]]<<concurrency,`concurrency`>>
|1
|The number of child `KafkaMessageListenerContainer` s to manage.
|The number of child `KafkaMessageListenerContainer`+++s+++ to manage.

|[[containerPaused2]]<<containerPaused2,`containerPaused`>>
|n/a
|True if pause has been requested and all child containers' consumer has actually paused.

|[[containers]]<<containers,`containers`>>
|n/a
|A reference to all child `KafkaMessageListenerContainer` s.
|A reference to all child `KafkaMessageListenerContainer`+++s+++.
|===

Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

The following Spring application events are published by listener containers and their consumers:

* `ConsumerStartingEvent` - published when a consumer thread is first started, before it starts polling.
* `ConsumerStartedEvent` - published when a consumer is about to start polling.
* `ConsumerFailedToStartEvent` - published if no `ConsumerStartingEvent` is published within the `consumerStartTimeout` container property.
* `ConsumerStartingEvent`: published when a consumer thread is first started, before it starts polling.
* `ConsumerStartedEvent`: published when a consumer is about to start polling.
* `ConsumerFailedToStartEvent`: published if no `ConsumerStartingEvent` is published within the `consumerStartTimeout` container property.
This event might signal that the configured task executor has insufficient threads to support the containers it is used in and their concurrency.
An error message is also logged when this condition occurs.
* `ListenerContainerIdleEvent`: published when no messages have been received in `idleInterval` (if configured).
Expand Down Expand Up @@ -85,7 +85,7 @@ The `ConsumerRetryAuthEvent` event has the following properties:

* `source`: The listener container instance that published the event.
* `container`: The listener container or the parent listener container, if the source container is a child.
* `reason`
* `reason`:
** `AUTHENTICATION` - the event was published because of an authentication exception.
** `AUTHORIZATION` - the event was published because of an authorization exception.

Expand All @@ -99,7 +99,7 @@ For a parent container, the source and container properties are identical.

In addition, the `ConsumerStoppedEvent` has the following additional property:

* `reason`
* `reason`:
** `NORMAL` - the consumer stopped normally (container was stopped).
** `ERROR` - a `java.lang.Error` was thrown.
** `FENCED` - the transactional producer was fenced and the `stopContainerWhenFenced` container property is `true`.
Expand Down Expand Up @@ -135,7 +135,7 @@ public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFac
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
----
Expand Down Expand Up @@ -174,7 +174,7 @@ You can also use `@EventListener`, introduced in Spring Framework 4.2.

The next example combines `@KafkaListener` and `@EventListener` into a single class.
You should understand that the application listener gets events for all containers, so you may need to check the listener ID if you want to take specific action based on which container is idle.
You can also use the `@EventListener` `condition` for this purpose.
You can also use the `@EventListener`+++'+++s `condition` for this purpose.

See xref:kafka/events.adoc[Application Events] for information about event properties.

Expand Down Expand Up @@ -214,5 +214,5 @@ You should stop the concurrent container instead.
=== Current Positions when Idle

Note that you can obtain the current positions when idle is detected by implementing `ConsumerSeekAware` in your listener.
See `onIdleContainer()` in <<seek>>.
See `onIdleContainer()` in xref:kafka/seek.adoc[seek].

Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ Starting with version 2.0, if you also annotate a `@KafkaListener` with a `@Send

The `@SendTo` value can have several forms:

* `@SendTo("someTopic")` routes to the literal topic
* `@SendTo("someTopic")` routes to the literal topic.
* `+@SendTo("#{someExpression}")+` routes to the topic determined by evaluating the expression once during application context initialization.
* `+@SendTo("!{someExpression}")+` routes to the topic determined by evaluating the expression at runtime.
The `#root` object for the evaluation has three properties:
** `request`: The inbound `ConsumerRecord` (or `ConsumerRecords` object for a batch listener))
** `request`: The inbound `ConsumerRecord` (or `ConsumerRecords` object for a batch listener).
** `source`: The `org.springframework.messaging.Message<?>` converted from the `request`.
** `result`: The method return result.
* `@SendTo` (no properties): This is treated as `!{source.headers['kafka_replyTopic']}` (since version 2.1.3).
Expand Down Expand Up @@ -60,7 +60,7 @@ public class MultiListenerSendTo {

IMPORTANT: In order to support `@SendTo`, the listener container factory must be provided with a `KafkaTemplate` (in its `replyTemplate` property), which is used to send the reply.
This should be a `KafkaTemplate` and not a `ReplyingKafkaTemplate` which is used on the client-side for request/reply processing.
When using Spring Boot, boot will auto-configure the template into the factory; when configuring your own factory, it must be set as shown in the examples below.
When using Spring Boot, it will auto-configure the template into the factory; when configuring your own factory, it must be set as shown in the examples below.

Starting with version 2.2, you can add a `ReplyHeadersConfigurer` to the listener container factory.
This is consulted to determine which headers you want to set in the reply message.
Expand Down Expand Up @@ -108,9 +108,9 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerCon
----

When you use `@SendTo`, you must configure the `ConcurrentKafkaListenerContainerFactory` with a `KafkaTemplate` in its `replyTemplate` property to perform the send.
Spring Boot will automatically wire in its auto configured template (or any if a single instance is present).
Spring Boot will automatically wire in its auto-configured template (or any if a single instance is present).

NOTE: Unless you use xref:kafka/sending-messages.adoc#replying-template[request/reply semantics] only the simple `send(topic, value)` method is used, so you may wish to create a subclass to generate the partition or key.
NOTE: Unless you use xref:kafka/sending-messages.adoc#replying-template[request/reply semantics], only the simple `send(topic, value)` method is used, so you may wish to create a subclass to generate the partition or key.
The following example shows how to do so:

[source, java]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered.
The framework cannot know whether such a message has been processed or not.
That is an application-level function.
This is known as the https://www.enterpriseintegrationpatterns.com/patterns/messaging/IdempotentReceiver.html[Idempotent Receiver] pattern and Spring Integration provides an https://docs.spring.io/spring-integration/reference/html/#idempotent-receiver[implementation of it].
This is known as the https://www.enterpriseintegrationpatterns.com/patterns/messaging/IdempotentReceiver.html[Idempotent Receiver] pattern and Spring Integration provides an https://docs.spring.io/spring-integration/reference/html/#idempotent-receiver[implementation] of it.

The Spring for Apache Kafka project also provides some assistance by means of the `FilteringMessageListenerAdapter` class, which can wrap your `MessageListener`.
This class takes an implementation of `RecordFilterStrategy` in which you implement the `filter` method to signal that a message is a duplicate and should be discarded.
Expand All @@ -13,7 +13,7 @@ It is `false` by default.

When you use `@KafkaListener`, set the `RecordFilterStrategy` (and optionally `ackDiscarded`) on the container factory so that the listener is wrapped in the appropriate filtering adapter.

In addition, a `FilteringBatchMessageListenerAdapter` is provided, for when you use a batch <<message-listeners, message listener>>.
In addition, a `FilteringBatchMessageListenerAdapter` is provided, for when you use a batch xref:kafka/receiving-messages/message-listeners.adoc[message listener].

IMPORTANT: The `FilteringBatchMessageListenerAdapter` is ignored if your `@KafkaListener` receives a `ConsumerRecords<?, ?>` instead of `List<ConsumerRecord<?, ?>>`, because `ConsumerRecords` is immutable.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,17 @@ The following example shows how to do so:
[source, java]
----
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
----
Expand Down Expand Up @@ -386,13 +386,12 @@ The following example shows how to do so:

[source, java]
----
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
groupId = "#{__x.topic}.group")
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
----

Starting with version 2.2.4, you can specify Kafka consumer properties directly on the annotation, these will override any properties with the same name configured in the consumer factory. You **cannot** specify the `group.id` and `client.id` properties this way; they will be ignored; use the `groupId` and `clientIdPrefix` annotation properties for those.

The properties are specified as individual strings with the normal Java `Properties` file format: `foo:bar`, `foo=bar`, or `foo bar`.
The properties are specified as individual strings with the normal Java `Properties` file format: `foo:bar`, `foo=bar`, or `foo bar`, as the following example shows:

[source, java]
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ Alternatively, you can access the group id in a method parameter.
[source, java]
----
@KafkaListener(id = "id", topicPattern = "someTopic")
public void listener(@Payload String payload,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
...
public void listener(@Payload String payload, @Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The `ConcurrentMessageListenerContainer` delegates to one or more `KafkaMessageL
Starting with version 2.2.7, you can add a `RecordInterceptor` to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
If the interceptor returns null, the listener is not called.
Starting with version 2.7, it has additional methods which are called after the listener exits (normally, or by throwing an exception).
Also, starting with version 2.7, there is now a `BatchInterceptor`, providing similar functionality for <<batch-listeners>>.
Also, starting with version 2.7, there is now a `BatchInterceptor`, providing similar functionality for xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners].
In addition, the `ConsumerAwareRecordInterceptor` (and `BatchInterceptor`) provide access to the `Consumer<?, ?>`.
This might be used, for example, to access the consumer metrics in the interceptor.

Expand All @@ -24,7 +24,7 @@ The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to

By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.
You can set the listener container's `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead.
Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager` s.
Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager`+++s+++.
This allows, for example, the interceptor to participate in a JDBC transaction started by the container.

Starting with versions 2.3.8, 2.4.6, the `ConcurrentMessageListenerContainer` now supports https://kafka.apache.org/documentation/#static_membership[Static Membership] when the concurrency is greater than one.
Expand Down Expand Up @@ -155,7 +155,7 @@ org.apache.kafka.clients.consumer.RoundRobinAssignor
=====
====

When the container properties are configured with `TopicPartitionOffset` s, the `ConcurrentMessageListenerContainer` distributes the `TopicPartitionOffset` instances across the delegate `KafkaMessageListenerContainer` instances.
When the container properties are configured with `TopicPartitionOffset`+++s+++, the `ConcurrentMessageListenerContainer` distributes the `TopicPartitionOffset` instances across the delegate `KafkaMessageListenerContainer` instances.

If, say, six `TopicPartitionOffset` instances are provided and the `concurrency` is `3`; each container gets two partitions.
For five `TopicPartitionOffset` instances, two containers get two partitions, and the third gets one.
Expand Down Expand Up @@ -196,14 +196,14 @@ After that, the same semantics as `BATCH` are applied.

When using xref:kafka/transactions.adoc[transactions], the offset(s) are sent to the transaction and the semantics are equivalent to `RECORD` or `BATCH`, depending on the listener type (record or batch).

NOTE: `MANUAL`, and `MANUAL_IMMEDIATE` require the listener to be an `AcknowledgingMessageListener` or a `BatchAcknowledgingMessageListener`.
See <<message-listeners, Message Listeners>>.
NOTE: `MANUAL` and `MANUAL_IMMEDIATE` require the listener to be an `AcknowledgingMessageListener` or a `BatchAcknowledgingMessageListener`.
See xref:kafka/receiving-messages/message-listeners.adoc[Message Listeners].

Depending on the `syncCommits` container property, the `commitSync()` or `commitAsync()` method on the consumer is used.
`syncCommits` is `true` by default; also see `setSyncCommitTimeout`.
See `setCommitCallback` to get the results of asynchronous commits; the default callback is the `LoggingCommitCallback` which logs errors (and successes at debug level).

Because the listener container has it's own mechanism for committing offsets, it prefers the Kafka `ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG` to be `false`.
Because the listener container has its own mechanism for committing offsets, it prefers the Kafka `ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG` to be `false`.
Starting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container's consumer property overrides.

The `Acknowledgment` has the following method:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@ Access to the `Consumer` object is provided.
IMPORTANT: The `Consumer` object is not thread-safe.
You must only invoke its methods on the thread that calls the listener.

IMPORTANT: You should not execute any `Consumer<?, ?>` methods that affect the consumer's positions and or committed offsets in your listener; the container needs to manage such information.
IMPORTANT: You should not execute any `Consumer<?, ?>` methods that affect the consumer's positions or committed offsets in your listener; the container needs to manage such information.

Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListe
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
Expand All @@ -54,7 +54,7 @@ containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListe
IMPORTANT: Starting with version 2.4, a new method `onPartitionsLost()` has been added (similar to a method with the same name in `ConsumerRebalanceLister`).
The default implementation on `ConsumerRebalanceLister` simply calls `onPartionsRevoked`.
The default implementation on `ConsumerAwareRebalanceListener` does nothing.
When supplying the listener container with a custom listener (of either type), it is important that your implementation not call `onPartitionsRevoked` from `onPartitionsLost`.
When supplying the listener container with a custom listener (of either type), it is important that your implementation does not call `onPartitionsRevoked` from `onPartitionsLost`.
If you implement `ConsumerRebalanceListener` you should override the default method.
This is because the listener container will call its own `onPartitionsRevoked` from its implementation of `onPartitionsLost` after calling the method on your implementation.
If you implementation delegates to the default behavior, `onPartitionsRevoked` will be called twice each time the `Consumer` calls that method on the container's listener.
Expand Down
Loading

0 comments on commit b1e6756

Please sign in to comment.