-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-3542: Adds the ability to add record interceptors instead of override them #3937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
GH-3542: Adds the ability to add record interceptors instead of override them #3937
Conversation
…nstead of override them Fixes: spring-projects#3542 Issue link: spring-projects#3542 What? Change `RecordInterceptor` to `List<RecordInterceptor>` in `MessageListenerContainer`. Why? To allow adding multiple `RecordInterceptor` instances instead of overriding the existing one. Currently, only a single `RecordInterceptor` is supported. Users may want to register multiple `RecordInterceptors`. There are some workarounds, but they are not clean or ideal solutions. By supporting `List<RecordInterceptor`>, users can add their own interceptors via `setRecordInterceptor(...)`. Signed-off-by: Sanghyeok An <[email protected]>
Signed-off-by: Sanghyeok An <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have this: https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java#L51.
So, I would expect done change over there . This is really not a container responsibility. Plus, you made too many braking changes. And setter contract is to override , not add. For add we would implement adder.
At a glance a cannot accept the change .
Thank you for understanding!
Signed-off-by: Sanghyeok An <[email protected]>
Signed-off-by: Sanghyeok An <[email protected]>
Signed-off-by: Sanghyeok An <[email protected]>
Sorry to bother you. I made new commits to use |
*/ | ||
public void addRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) { | ||
if (this.recordInterceptor instanceof CompositeRecordInterceptor<K, V> compositeRecordInterceptor) { | ||
compositeRecordInterceptor.addRecordInterceptor(recordInterceptor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if the composite is an external and shared, calling this from one container would mutate all others. I understand a desire to have everything container related in one place, but consequences might not be good.
Or we just leave responsibility only in the composite, or bite a bullet and always have a composite here internally as a value of interceptor property. This way any add on the container would not effect the rest of application . The awkward situation when composite is provided, but I don’t see any other way to protect from race condition .
Of course, I’d prefer to have add only in the composite, but I’m opened for any other arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right.
I missed that case. (Many users already create it as bean and use it)
As you said, Adding a new API addRecordInterceptor(...)
to only CompositeRecordInterceptor
makes sense to me 👍 . Also, I don't like useless recursive usage (Composite -> Composite -> ... -> Composite) 😄.
Signed-off-by: Sanghyeok An <[email protected]>
Signed-off-by: Sanghyeok An <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your name to the @author
list of this affected class?
spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java
Show resolved
Hide resolved
Signed-off-by: Sanghyeok An <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we also need a getRecordInterceptor()
in the container. See issue for more info.
As we have just discussed the add
in the container is not OK, but get
, compose manually and then set
should be clear enough without consequences .
Signed-off-by: Sanghyeok An <[email protected]>
...kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chickenchickenlove Changes look good to me. As @artembilan suggested, please make the getter in AMLC public (add since tag etc.). I would also suggest adding some type of unit testing for the changes involved. Apart from that, I think we may need some documentation updates as well as a small indicator for this in the whats-new
docs. Thanks!
Signed-off-by: Sanghyeok An <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checkstyle violation:
Error: eckstyle] [ERROR] /home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java:3865: Line has leading space characters; indentation should be performed with tabs only. [RegexpSinglelineJava]
Use gradlew check
locally before push, please.
Signed-off-by: Sanghyeok An <[email protected]>
I'm so sorry. I forgot it. 🙇♂️ |
spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc
Outdated
Show resolved
Hide resolved
Signed-off-by: Sanghyeok An <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just couple docs suggestions.
Thanks
=== Configure additional `RecordInterceptor` | ||
|
||
The `KafkaMessageListenerContainer` and `ConcurrentMessageListenerContainer` support `getRecordInterceptor()`. | ||
If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This technical detail is not required to be present in What's New
.
.../src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc
Show resolved
Hide resolved
Signed-off-by: Sanghyeok An <[email protected]>
Signed-off-by: Sanghyeok An <[email protected]>
if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor interceptor) { | ||
compositeInterceptor = interceptor; | ||
} else { | ||
compositeInterceptor = new CompositeRecordInterceptor<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like just returned interceptor is not preserved in a new composite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for that.
I added a new commit to fix that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, look into my comment again .
that interceptor
variable is out of use after this. I believe it has to be propagated to ctor of new composite .
if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor interceptor) { | ||
compositeInterceptor = interceptor; | ||
} else { | ||
compositeInterceptor = new CompositeRecordInterceptor<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, look into my comment again .
that interceptor
variable is out of use after this. I believe it has to be propagated to ctor of new composite .
Signed-off-by: Sanghyeok An <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Paging @sobychacko for final approval .
Thank you!
Fixes: #3542
Issue link: #3542
What?
Change
RecordInterceptor
toList<RecordInterceptor>
inMessageListenerContainer
.Why?
To allow adding multiple
RecordInterceptor
instances instead of overriding the existing one. Currently, only a singleRecordInterceptor
is supported. Users may want to register multipleRecordInterceptors
. There are some workarounds, but they are not clean or ideal solutions.By supporting
List<RecordInterceptor
>, users can add their own interceptors viasetRecordInterceptor(...)
.