-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer committed offsets not tracking LSO with FixTxOffsets and rollbacked transactions #2590
Comments
There is nothing we can do about this; the consumer has no knowledge that there is a rolled back record in the log; the container can only "fix" the offset if the consumer reports the position properly and, obviously, in this case, it does not. As I have stated many times in many different forums, this bogus lag problem needs to be fixed in Kafka itself. There are other corner cases where See https://issues.apache.org/jira/browse/KAFKA-10683 But the Kafka folks seem to have no interest in fixing it; I suggest you comment there and vote it up. |
Thanks for your answer. It seems to me that in the case of this issue, the position is correctly reported by the Kafka consumer. When I patch Spring-Kafka to always commit the consumer position even when no records are returned by Hacky patch for v2.8.11: diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
index a787e0f5..480ff2cc 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2022 the original author or authors.
+ * Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -1363,13 +1363,13 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
resumePartitionsIfNecessary();
}
+ savePositionsIfNeeded();
invokeIfHaveRecords(records);
}
private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
if (records != null && records.count() > 0) {
this.receivedSome = true;
- savePositionsIfNeeded(records);
notIdle();
notIdlePartitions(records.partitions());
invokeListener(records);
@@ -1441,29 +1441,31 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
}
}
- private void savePositionsIfNeeded(ConsumerRecords<K, V> records) {
+ private void savePositionsIfNeeded() {
if (this.fixTxOffsets) {
this.savedPositions.clear();
- records.partitions().forEach(tp -> this.savedPositions.put(tp, this.consumer.position(tp)));
+ this.consumer.assignment().forEach(tp -> this.savedPositions.put(tp, this.consumer.position(tp)));
}
}
@SuppressWarnings("rawtypes")
private void fixTxOffsetsIfNeeded() {
if (this.fixTxOffsets) {
+
try {
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
- this.lastCommits.forEach((tp, oamd) -> {
+ this.savedPositions.forEach((tp, saved) -> {
long position = this.consumer.position(tp);
- Long saved = this.savedPositions.get(tp);
+ OffsetAndMetadata comitted = this.lastCommits.get(tp);
if (saved != null && saved.longValue() != position) {
this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; "
+ "saved: " + this.savedPositions + ", "
- + "comitted: " + oamd + ", "
+ + "comitted: " + comitted + ", "
+ "current: " + tp + "@" + position);
return;
}
- if (position > oamd.offset()) {
+
+ if (comitted == null || position > comitted.offset()) {
toFix.put(tp, createOffsetAndMetadata(position));
}
}); |
Interesting; I didn't test it; I just assumed this was another case where the position is incorrectly reported (as commented on that JIRA). |
Note that 2.8.x is no longer supported as OSS. |
Hi there, I work for a company that contributes to the open-source ecosystem, and I would like to help resolve this issue (#2590). However, despite the explanations provided in the issue, I am unable to reproduce the bug. @bgK Would it be possible to provide a code snippet that is not working (and your configuration if you think that it may somehow has any interaction with the bug) so that I can start investigating the issue and hopefully find a solution? Thanks! |
Hi @thperchi, this is a minimal reproduction case for the issue: spring-kafka-2590-repro.zip. |
Hi @bgK ! |
In what version(s) of Spring for Apache Kafka are you seeing this issue?
2.8.11
Describe the bug
With a
MessageListenerContainer
configured withFixTxOffsets
set totrue
, when all the available messages have been read, the committed offsets do not always match the Last Stable Offsets.MessageListenerContainer
commits the position of the consumer when records are returned bypoll()
for a given partition. If a transaction is then rollbacked on that partition and no other records are available,MessageListenerContainer
does not commit the offsets consumed by the rollbacked transaction, resulting in observable "lag".To Reproduce
Create and start a
MessageListenerContainer
withFixTxOffsets
set to true subscribed to a test topic with a single partition. Create a transactional producer, send a message, wait for the ack and rollback the transaction. The commited offset will be 0, the last stable offset will be 2.Expected behavior
When all the messages have been consumed, with a
MessageListenerContainer
configured withFixTxOffsets
set totrue
the committed offsets should track the LSO even when some transactions are rollbacked.The text was updated successfully, but these errors were encountered: