Skip to content

Commit

Permalink
Fix bug in ConsumerSeekAware.seekRelative
Browse files Browse the repository at this point in the history
Seek relative to the beginning of the partition.

(cherry picked from commit 02c6383)
  • Loading branch information
Wzy19930507 authored and spring-builds committed Feb 20, 2024
1 parent 6cc3eee commit a1f4e26
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3090,36 +3090,36 @@ private void processSeeks() {
traceSeek(offset);
try {
SeekPosition position = offset.getPosition();
TopicPartition topicPartition = offset.getTopicPartition();
Long whereTo = offset.getOffset();
if (position == null) {
if (offset.isRelativeToCurrent()) {
whereTo += this.consumer.position(offset.getTopicPartition());
whereTo += this.consumer.position(topicPartition);
whereTo = Math.max(whereTo, 0);
}
this.consumer.seek(offset.getTopicPartition(), whereTo);
this.consumer.seek(topicPartition, whereTo);
}
else if (position.equals(SeekPosition.BEGINNING)) {
this.consumer.seekToBeginning(Collections.singletonList(offset.getTopicPartition()));
if (whereTo != null) {
this.consumer.seek(offset.getTopicPartition(), whereTo);
}
}
else if (position.equals(SeekPosition.TIMESTAMP)) {
else if (SeekPosition.TIMESTAMP.equals(position)) {
// possible late addition since the grouped processing above
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer
.offsetsForTimes(
Collections.singletonMap(offset.getTopicPartition(), offset.getOffset()));
Collections.singletonMap(topicPartition, offset.getOffset()));
offsetsForTimes.forEach((tp, ot) -> {
if (ot != null) {
this.consumer.seek(tp, ot.offset());
}
});
}
else {
this.consumer.seekToEnd(Collections.singletonList(offset.getTopicPartition()));
if (SeekPosition.BEGINNING.equals(position)) {
this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
}
else {
this.consumer.seekToEnd(Collections.singletonList(topicPartition));
}
if (whereTo != null) {
whereTo += this.consumer.position(offset.getTopicPartition());
this.consumer.seek(offset.getTopicPartition(), whereTo);
whereTo += this.consumer.position(topicPartition);
this.consumer.seek(topicPartition, whereTo);
}
}
}
Expand Down Expand Up @@ -3353,7 +3353,7 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
@Override
public void seekRelative(String topic, int partition, long offset, boolean toCurrent) {
if (toCurrent) {
this.seeks.add(new TopicPartitionOffset(topic, partition, offset, toCurrent));
this.seeks.add(new TopicPartitionOffset(topic, partition, offset, true));
}
else if (offset >= 0) {
this.seeks.add(new TopicPartitionOffset(topic, partition, offset, SeekPosition.BEGINNING));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,6 +78,8 @@

/**
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.2.4
*
*/
Expand Down Expand Up @@ -343,7 +345,7 @@ void testAsyncRelativeSeeks() throws InterruptedException {
verify(consumer).seekToEnd(Collections.singletonList(tp2));
verify(consumer).seek(tp2, 70L); // position - 30 (seekToEnd ignored by mock)
verify(consumer).seekToBeginning(Collections.singletonList(tp3));
verify(consumer).seek(tp3, 30L);
verify(consumer).seek(tp3, 130L); // position + 30 (seekToBeginning ignored by mock)
container.stop();
}

Expand Down

0 comments on commit a1f4e26

Please sign in to comment.