-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for issue #2590 #2685
Fix for issue #2590 #2685
Conversation
@thperchi Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
Thanks; but we can't look at your PR until you "sign" the CLA. Do you think it would be possible to create a test that exhibits the behavior without the patch and passes with it? |
@thperchi Thank you for signing the Contributor License Agreement! |
Hi ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your effort, but we don't accept changes without tests confirmation in the project code.
Doesn't look like your change is that minor to make it obvious. Plus I see there is a failing test in GH action for this PR:
TransactionalContainerTests > testRollbackRecord() FAILED
org.opentest4j.AssertionFailedError:
expected: null
but was: OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}
at [email protected]/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at [email protected]/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
at [email protected]/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at [email protected]/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
at app//org.springframework.kafka.listener.TransactionalContainerTests.testRollbackRecord(TransactionalContainerTests.java:568)
Some way it feels like this failure is related to your change since both are talking about transactions.
Thanks for understanding!
To run the tests locally, use |
Thank you for the review, I'm gonna check out how to solve this ! |
long position = this.consumer.position(tp); | ||
Long saved = this.savedPositions.get(tp); | ||
OffsetAndMetadata comitted = this.lastCommits.get(tp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.lastCommits
only contains offsets for TopicPartition
s for which records were obtained during the previous poll()
call. this.lastCommits
is cleared between poll
s.
This means:
poll
s returning no records will always trigger a commit for the complete assignment.poll
s returning records for some partitions only will trigger commits for the other partitions even when the LSO did not change since the last commit.
Should the fix try to avoid unnecessary commits or is it good enough as is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bgK I guess there is a situation where a producer might keep rolling back transactions and the lag will increase until a successful transaction appears in the log.
I must admit that I am not comfortable doing a commit for each empty poll but if the above is true, then there is probably no choice.
I really wish the kafka folks would fix the underlying problem of reporting this bogus lag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I must admit that I am not comfortable doing a commit for each empty poll but if the above is true, then there is probably no choice.
Me neither. What I was hinting at with this comment is whether Spring-Kafka should remember the last committed offsets across polls to avoid redundant commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but as I described, I believe there are cases where the lag can still increase unless we do.
I believe the proper fix is for Kafka to not report a lag if it's a bogus lag due to uncommitted, or rolled-back, records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but as I described, I believe there are cases where the lag can still increase unless we do.
I agree Spring-Kafka should commit the offsets when the consumer position changed after empty polls, to account for rollbacked transactions. What's unclear to me is what committing the same offsets multiple times when the position does not change after empty polls would accomplish. Do you have any insight?
I believe the proper fix is for Kafka to not report a lag if it's a bogus lag due to uncommitted, or rolled-back, records.
As far as I know, at this point, Kafka does not provide any other mechanism to report the consumer lag accurately when using transactional producers. Shouldn't Spring-Kafka make sure the situations where Kafka reports the lag properly work when using the Spring abstractions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't Spring-Kafka make sure the situations where Kafka reports the lag properly
My point is that the so-called "lag" is not really a lag, and shouldn't be reported as such.
Hi @garyrussell |
efa1472
to
2b4b570
Compare
Yes; that is the issue that I raised. |
Dear @garyrussell I hope finds you well. I wanted to provide you with an update regarding the known bug we discussed in the Kafka developers' mailing list. After interacting with the community, waiting for additional input, and reviewing the code, it appears that this is a known bug without a feasible solution due to the underlying handling logic. This issue is similar to other processes, such as Log Compaction. Besaids, the community is currently exploring a proposal to refactor the KafkaConsumer. I was wondering if you have any preferred channel where we can further discuss this matter. I would greatly appreciate your insights and expertise in finding ways to improve the code and document workarounds for this issue. Please let me know your thoughts and if there is a suitable platform or forum where we can continue this discussion. I look forward to collaborating with you to overcome this challenge. Thank you for your attention to this matter. Best regards |
@alograg I am not sure I can add any value to such a discussion, but feel free to use the Discussions tab above. |
Hello,
I hope this message finds you well. I am writing to submit a pull request for the spring-kafka project to address a fix for the issue #2590.
Here are the details of the pull request:
Repository: https://github.com/INTM-Group/spring-kafka
Issue: Consumer committed offsets not tracking LSO with FixTxOffsets and rollbacked transactions #2590
It has successfully resolved the issue. The changes I made adhere to the existing codebase's style and best practices.
I kindly request you to review and consider merging this pull request into the main branch. I am open to any feedback or suggestions you may have, and I am committed to addressing them promptly.
Thank you for your time and consideration. I appreciate the opportunity to contribute to the spring-kafka project.
Thibault Perchicot