Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2800: Make it possible to route messages to custom DLT based on exceptions thrown #2929

Merged
merged 9 commits into from
Feb 1, 2024

Conversation

breader124
Copy link
Contributor

This PR is related to #2800. It introduces changes making it easy for developers to route messages to different DLTs when the previously configured exception is thrown.

@sobychacko
Copy link
Contributor

@breader124 Thanks for the PR. I am going to take a look reviewing the changes. One general comment. Could you add some docs snippets related to the change? This seems like a bigger change that deserves some notes in the reference docs.

@breader124
Copy link
Contributor Author

breader124 commented Dec 12, 2023

Sure, I was thinking about modifications in the reference docs as well. The only thing I'd like to wait for until I start working on it is the confirmation of the API part of changes. Having the API stable I'll propose docs changes

@sobychacko
Copy link
Contributor

sobychacko commented Dec 12, 2023

Since this PR is large in terms of the changes, could you provide a brief overview of the changes in a few sentences? Please provide a flow of the changes, the APIs you added, etc. It doesn't have to be very elaborate, but brief.

@breader124
Copy link
Contributor Author

The main difference that I introduced is the possibility to configure the routing to the custom DLTs based on the exceptions thrown during the message processing. This configuration can be provided in 2 ways (and these are the only new APIs):

  • Using the org.springframework.kafka.annotation.RetryableTopic#exceptionBasedDltRouting. There you need to pass the array of routing rules (org.springframework.kafka.retrytopic.ExceptionBasedDestinationDlt). Each of them specifies what suffix extension should be used when creating the custom DLT and which exceptions should be handled by it
  • Using the org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder#dltRoutingRules method, where you pass the Map describing the custom DLTs routing rules.

When the configuration is provided, then depending on the autoCreateTopics config property new DLTs might be created together with rest of the topic automatically. When these DLTs are being created, then their properties are placed before the properties of the generic purpose DLT inside the org.springframework.kafka.retrytopic.DestinationTopicPropertiesFactory#createPropertiesForDefaultTopicStrategy method. Thanks to this change they are considered earlier for routing than the generic purpose DLT is considered.

Lastly, when it comes to the actual routing part, I've modified the org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver. There I try to match the exception, which has been thrown during the message processing with the ones configured for the particular custom DLT. This process traverse causes and it's not configurable now. When I'm not able to find any custom, dedicated DLT, then finally the processing comes to the generic purpose DLT and message will land there.

Besides all changes described above I also provided tests for the changed behaviour.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds reasonable, but unfortunately it introduces some breaking changes to public API. To avoid immediate upgrade in the target projects we cannot accept this change for the current patch version.

We are fully OK to schedule this for the next 3.2 generation, but not earlier.

Therefore your PR will sit here for a while.

Thanks for understanding.

@sobychacko
Copy link
Contributor

sobychacko commented Dec 13, 2023

That's a great point, Artem. @ breader124 If you still want to go into the patch release, could you still make these changes without breaking an existing API? It might still be okay to add new API's as we have done in some other existing PR's.

Update: It is better to postpone this PR for 3.2.0, as Artem mentioned, due to the scope of the changes in the PR.

@sobychacko sobychacko modified the milestone: 3.2.0-M1 Dec 13, 2023
@breader124
Copy link
Contributor Author

It's totally fine for me to wait for a while @artembilan, @sobychacko. I'll update the @since tags in the Javadocs then to make them point to the 3.2.0 version instead of the 3.1.1

* @see org.springframework.kafka.retrytopic.ExceptionBasedDltRouting
* @since 3.2.0
*/
public @interface ExceptionBasedDestinationDlt {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe ExceptionBasedDltDestination?

@sobychacko
Copy link
Contributor

@breader124 I did a first pass at the PR. It looks good. One general question: The corresponding issue that was created by the SO thread , originally trying to route records from deserialization exceptions. Is that use case addressed by this PR?

@breader124
Copy link
Contributor Author

breader124 commented Dec 15, 2023

Yes, it's possible to configure the @RetryableTopic the way how it was done in one of the tests. Then messages, which caused DeserializationExceptions will be routed to the different DLT

@artembilan
Copy link
Member

@breader124 ,

please, rebase your branch to the latest main.

Then we will start looking into your change to incorporate it into a new version.

Thanks

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also believe that this feature deserves some docs in the \spring-kafka\spring-kafka-docs\src\main\antora\modules\ROOT\pages\retrytopic\ dir.

But yeah, that can be done in the end when we agree with all the changes from the code perspective.

Thanks

@@ -188,13 +196,15 @@ public Properties(Properties sourceProperties, String suffix, Type type) {
* @param shouldRetryOn the exception classifications.
* @param timeout the timeout.
* @param autoStartDltHandler whether or not to start the DLT handler.
* @param usedForExceptions the exceptions which destination is intended for
* @since 2.8
*/
public Properties(long delayMs, String suffix, Type type,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to have one more public ctor.
This way we won't have a breaking change in the point version.

* @return The {@link DestinationTopic} instance corresponding to the DLT.
*/
@Nullable
DestinationTopic getDltFor(String mainListenerId, String topicName);
DestinationTopic getDltFor(String mainListenerId, String topicName, Exception exc);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot we have another method to avoid a breaking change?
It also would be great to have it as a default to avoid a immediate upgrade compilation problem.
We may have the current method deprecated then.

*
* @return the configured suffix extension
*/
String suffix();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is really required and can be just empty by default...

Also: no blank lines in the method Javadocs: https://github.com/spring-projects/spring-framework/wiki/Code-Style#javadoc-formatting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my point of view having the suffix required makes this API harder to misuse by design. exceptions is required, but when someone will be allowed to specify exceptions without the suffix, then it means that the general purpose DLT will be used even for this exc. I'm not sure when it could be an intended usage

* @see ExceptionBasedDltDestination
* @since 3.2.0
*/
public @interface ExceptionBasedDltRouting {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this annotation if @RetryableTopic could just have a property like that below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed, I'll delete it

* @since 3.2.0
*/
public RetryTopicConfigurationBuilder dltRoutingRules(Map<String, Set<Class<? extends Throwable>>> dltRoutingRules) {
this.dltRoutingRules = dltRoutingRules;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not OK. Has to be a copy.
The point is to avoid internal mutation when that external map is edited.
Well, that dltRoutingRules could be final and here you could just use putAll()

@artembilan
Copy link
Member

Still, rebased is required for this PR.

Thanks

@breader124
Copy link
Contributor Author

Thanks for the review. I've just pushed the rebased code and will apply the changes you've suggested in the review over the weekend together with the docs changes suggestions

@breader124 breader124 force-pushed the GH-2800 branch 4 times, most recently from 1777968 to c5dd055 Compare January 25, 2024 17:27
@breader124
Copy link
Contributor Author

@artembilan, @sobychacko, I've just rebased the code on top of the most recent main and pushed it. All the suggestions you had are now introduced and I think that the code is ready for the final round of the review

@@ -96,6 +131,7 @@ public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuff
this.sameIntervalTopicReuseStrategy = sameIntervalTopicReuseStrategy;
this.timeout = timeout;
this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix);
this.dltRoutingRules = dltRoutingRules;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not OK. Has to be a copy of the map.
This way we will avoid the problem when provided map is mutated somewhere outside.

Routing customization consists of the specification of the additional destinations.
Destinations in turn consist of two settings: the `suffix` and `exceptions`.
When the exception type specified in `exceptions` has been thrown, the DLT containing the `suffix` will be considered as the target topic for the message before the general purpose DLT is considered.
Take a look at the examples:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have an official neutral language in docs where it is depersonalized.
So, no phrases like you, me, take a look etc.

But... That's my personal preferences and I know there are many like that throughout the doc.

Also: this need to me mentioned in the whats-new.adoc

@sobychacko
Copy link
Contributor

@breader124 There was a recent change made via another PR that modified DestinationTopicPropertiesFactory. Could you rebase your PR branch against the latest main?

@breader124
Copy link
Contributor Author

Sure, will do it and push tomorrow the latest

@breader124
Copy link
Contributor Author

@sobychacko, it's now rebased on top of the current main, all previously existing conflicts were solved

@sobychacko sobychacko merged commit c6d1d76 into spring-projects:main Feb 1, 2024
3 checks passed
@sobychacko
Copy link
Contributor

@breader124 Thank you for this excellent PR. It is now merged upstream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants