Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaHeaders.DELIVERY_ATTEMPT is not added for batch listeners #3407

Closed
lm231290 opened this issue Aug 5, 2024 · 9 comments · Fixed by #3539
Closed

KafkaHeaders.DELIVERY_ATTEMPT is not added for batch listeners #3407

lm231290 opened this issue Aug 5, 2024 · 9 comments · Fixed by #3539

Comments

@lm231290
Copy link

lm231290 commented Aug 5, 2024

Version 3.3.2

When using blocking retries for a batch listener, the attempt number is not coming in DELIVERY_ATTEMPT header even if setting setDeliveryAttemptHeader(true)

The method KafkaMessageListenerContainer.ListenerConsumer.internalHeaders is called only in
ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg) which is used for a regular listener, and is not called in
ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg) which is used for a batch listener

@lm231290
Copy link
Author

lm231290 commented Aug 6, 2024

If the whole batch fails, the retry behavior would be pretty much same as if the first record in a batch fails, is my assumption correct?
So If the header is added for batchListenerFailedException, I think it can also be added for any other (whole batch) errors as it would for batchListenerFailedException.index = 0

@artembilan
Copy link
Member

I'm not sure what you mean with a BatchListenerFailedException analogy, but I'd be happy to review some contribution for better understanding how this request could be fixed.
Thanks

@artembilan
Copy link
Member

The logic around this header is not that simple as it sounds.
See FailedRecordTracker for details.

@lm231290
Copy link
Author

Seems like my analogy with BatchListenerFailedException is incorrect.

So my suggestion would be to call
KafkaMessageListenerContainer.internalHeaders or
KafkaMessageListenerContainer.checkEarlyIntercept from
KafkaMessageListenerContainer.doInvokeBatchListener and
KafkaMessageListenerContainer.invokeBatchListenerInTx.

Currently KafkaMessageListenerContainer.checkEarlyIntercept is called only from
KafkaMessageListenerContainer.invokeRecordListenerInTx and
KafkaMessageListenerContainer.doInvokeWithRecords

@artembilan
Copy link
Member

That's correct. But the problem with the rest of missing API that we deal with the whole batch.
Therefore it is not clear how to inject that KafkaHeaders.DELIVERY_ATTEMPT header and handle it respectively.

If you have anything in mind, feel free to open pull request and we gladly review it.

@lm231290
Copy link
Author

I am not so deep into the code, are there any contributors who could take care of it? Maybe the issue should be promoted as a feature request?

@artembilan
Copy link
Member

Sure! Any one can take an issue for contribution.
I am personally, don't see a quick and easy solution, and with limited amount of time resources on my side, not willing to dive deep to the problem which was note advertised before anyway.

@sobychacko
Copy link
Contributor

@lm231290 We will keep this in the backlog for the time being. If there is an urgent need for anyone in the community, they can design a solution and contribute a PR. If not, we will take a look at this as time permits.

@chickenchickenlove
Copy link
Contributor

Hi, @artembilan , @sobychacko !
I created PR to solve this issue.

When you have time, please take a look 🙇‍♂️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants