diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index f18559968a..05a598f4b9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -3090,25 +3090,20 @@ private void processSeeks() { traceSeek(offset); try { SeekPosition position = offset.getPosition(); + TopicPartition topicPartition = offset.getTopicPartition(); Long whereTo = offset.getOffset(); if (position == null) { if (offset.isRelativeToCurrent()) { - whereTo += this.consumer.position(offset.getTopicPartition()); + whereTo += this.consumer.position(topicPartition); whereTo = Math.max(whereTo, 0); } - this.consumer.seek(offset.getTopicPartition(), whereTo); + this.consumer.seek(topicPartition, whereTo); } - else if (position.equals(SeekPosition.BEGINNING)) { - this.consumer.seekToBeginning(Collections.singletonList(offset.getTopicPartition())); - if (whereTo != null) { - this.consumer.seek(offset.getTopicPartition(), whereTo); - } - } - else if (position.equals(SeekPosition.TIMESTAMP)) { + else if (SeekPosition.TIMESTAMP.equals(position)) { // possible late addition since the grouped processing above Map offsetsForTimes = this.consumer .offsetsForTimes( - Collections.singletonMap(offset.getTopicPartition(), offset.getOffset())); + Collections.singletonMap(topicPartition, offset.getOffset())); offsetsForTimes.forEach((tp, ot) -> { if (ot != null) { this.consumer.seek(tp, ot.offset()); @@ -3116,10 +3111,15 @@ else if (position.equals(SeekPosition.TIMESTAMP)) { }); } else { - this.consumer.seekToEnd(Collections.singletonList(offset.getTopicPartition())); + if (SeekPosition.BEGINNING.equals(position)) { + this.consumer.seekToBeginning(Collections.singletonList(topicPartition)); + } + else { + this.consumer.seekToEnd(Collections.singletonList(topicPartition)); + } if (whereTo != null) { - whereTo += this.consumer.position(offset.getTopicPartition()); - this.consumer.seek(offset.getTopicPartition(), whereTo); + whereTo += this.consumer.position(topicPartition); + this.consumer.seek(topicPartition, whereTo); } } } @@ -3353,7 +3353,7 @@ public void seekToEnd(Collection partitions) { @Override public void seekRelative(String topic, int partition, long offset, boolean toCurrent) { if (toCurrent) { - this.seeks.add(new TopicPartitionOffset(topic, partition, offset, toCurrent)); + this.seeks.add(new TopicPartitionOffset(topic, partition, offset, true)); } else if (offset >= 0) { this.seeks.add(new TopicPartitionOffset(topic, partition, offset, SeekPosition.BEGINNING)); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java index 2b82126b81..aabb8c4c35 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,6 +78,8 @@ /** * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.2.4 * */ @@ -343,7 +345,7 @@ void testAsyncRelativeSeeks() throws InterruptedException { verify(consumer).seekToEnd(Collections.singletonList(tp2)); verify(consumer).seek(tp2, 70L); // position - 30 (seekToEnd ignored by mock) verify(consumer).seekToBeginning(Collections.singletonList(tp3)); - verify(consumer).seek(tp3, 30L); + verify(consumer).seek(tp3, 130L); // position + 30 (seekToBeginning ignored by mock) container.stop(); }