-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Polish KafkaMessageListenerContainer #3090
Polish KafkaMessageListenerContainer #3090
Conversation
…ontainerProperties.ackTime` have init value. * change `ListenerConsumer.offsets` type to Map<TopicPartition, Long>. * replace `ListenerConsumer.setupSubBatchPerPartition` to `ContainerProperties.isSubBatchPerPartition`. * add method `commitOffsets` and `commitOffsetsInTransactions` * remove properties this.commitRecovered. * refactor method `processCommits`.
* minor modify `ackCurrent`
@Wzy19930507 There are test failures in the PR: https://github.com/spring-projects/spring-kafka/actions/runs/8115263868/job/22182761295?pr=3090. Could you take a look? |
Introduced a unit test bug from #2287, sorry for it. When |
AckMode ackMode = this.containerProperties.getAckMode(); | ||
if (!this.isManualImmediateAck) { | ||
if (!this.isManualAck) { | ||
updatePendingOffsets(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where did this code (updatePendingOffsets
) move to? Do we not need this any longer? I see that you removed the method, but where is this logic now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updatePendingOffsets()
feel like duplicate in processAck()
Lines 2024 to 2034 in 5beb8fe
if (this.isManualImmediateAck) { | |
try { | |
ackImmediate(cRecord); | |
} | |
catch (@SuppressWarnings(UNUSED) WakeupException e) { | |
// ignore - not polling | |
} | |
} | |
else { | |
addOffset(cRecord); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But processAck()
is not called from the hierarchy of processCommits
where updatePendingOffsets()
was originally called. I see that there are some duplicates, but still wondering whether it changes the logic a little bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Wzy19930507 Any updates on this? This is the only pending item that needs an explanation before merging the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for late.
I see that there are some duplicates, but still wondering whether it changes the logic a little bit?
handleAcks
has three scenarios and include updatePendingOffsets
logic:
!Thread.currentThread().equals(this.consumerThread)
--> keepisManualImmediateAck
thread safe.isManualImmediateAck
--> callackImmediate
- not
isManualImmediateAck
--> calladdOffset
updatePendingOffsets
logic same as scenarios three and see:
if (!this.isManualImmediateAck) {
if (!this.isManualAck) {
updatePendingOffsets();
}
private void updatePendingOffsets() {
ConsumerRecord<K, V> cRecord = this.acks.poll();
while (cRecord != null) {
addOffset(cRecord);
cRecord = this.acks.poll();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification. One last question on this. In the processCommits()
method, we have this logic:
if (!this.isManualImmediateAck) {
if (!this.isManualAck) {
updatePendingOffsets();
}
So updatePendingOffsets()
is only called when we have both !manualImmediateAck
and !manualAck
. Whereas in the processAck
, we have this:
else {
if (this.isManualImmediateAck) {
try {
ackImmediate(cRecord);
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// ignore - not polling
}
}
else {
addOffset(cRecord);
}
}
which means that we always call addOffset
as long as !imanualImmediateAck
including when ackManual
is true. The way I understand this is that, we were always calling addOffset
if the ack mode was not manual-immediate via the first call in handleAcks
. But then, after this is done, in the processCommits
method, we have the updatePendingOffset
call, which checks for !manualAck
in addition to !manualImmediateAck
. But by now, the addOffset is already called via the handleAcks
method. So, you are just cleaning up this second unnecessary call. Is that understanding correct? I want to ensure we are not introducing any regression by removing this logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, you are just cleaning up this second unnecessary call. Is that understanding correct?
Yes! Exactly what I was want to say and more clear!
checkAckMode
becauseContainerProperties.ackCount
andContainerProperties.ackTime
have init value.ListenerConsumer.offsets
type to Map<TopicPartition, Long>.ListenerConsumer.setupSubBatchPerPartition
toContainerProperties.isSubBatchPerPartition
.ListenerConsumer.commitOffsets
andListenerConsumer.commitOffsetsInTransactions
ListenerConsumer.this.commitRecovered
.ListenerConsumer.processCommits
.ListenerConsumer.ackCurrent
.