From 29f8d18c81349bda7ab200ff862fba5c3c4531a6 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 5 Mar 2024 15:06:26 -0500 Subject: [PATCH] GH-3078: Compute Seek Position from Current Offset Fixes: #3078 * Provide a new API method in `ConsumerSeekCallback` to seek to an offset based on the current offset. This is accomplished by a user-defined function where the user can make decision on the offset to seek to based on the current offset which is available via the function's input. * Adding tests, docs. * Addressing PR review --- .../antora/modules/ROOT/pages/kafka/seek.adoc | 7 + .../antora/modules/ROOT/pages/whats-new.adoc | 6 + .../kafka/listener/ConsumerSeekAware.java | 21 ++- .../KafkaMessageListenerContainer.java | 13 ++ ...rentMessageListenerContainerMockTests.java | 177 ++++++++++++++++++ 5 files changed, 223 insertions(+), 1 deletion(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc index 243521445d..42653c51d6 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc @@ -34,6 +34,8 @@ The callback has the following methods: ---- void seek(String topic, int partition, long offset); +void seek(String topic, int partition, Function offsetComputeFunction); + void seekToBeginning(String topic, int partition); void seekToBeginning(Collection partitions); @@ -49,6 +51,11 @@ void seekToTimestamp(String topic, int partition, long timestamp); void seekToTimestamp(Collection topicPartitions, long timestamp); ---- +The two different variants of the `seek` methods provide a way to seek to an arbitrary offset. +The method that takes a `Function` as an argument to compute the offset was added in version 3.2 of the framework. +This function provides access to the current offset (the current position returned by the consumer, which is the next offset to be fetched). +The user can decide what offset to seek to based on the current offset in the consumer as part of the function definition. + `seekRelative` was added in version 2.3, to perform relative seeks. * `offset` negative and `toCurrent` `false` - seek relative to the end of the partition. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 9eafe650e4..56d6394f70 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -51,3 +51,9 @@ See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Proc === Change @RetryableTopic SameIntervalTopicReuseStrategy default value Change `@RetryableTopic` property `SameIntervalTopicReuseStrategy` default value to `SINGLE_TOPIC`. See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topic for maxInterval Exponential Delay]. + +[[x32-seek-offset-compute-fn]] +=== New API method to seek to an offset based on a user provided function +`ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument. +See xref:kafka/seek.adoc#seek[Seek API Docs] for more details. + diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java index 2b48e09778..ae4c33cc6e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.Map; +import java.util.function.Function; import org.apache.kafka.common.TopicPartition; @@ -27,6 +28,7 @@ * seek operation. * * @author Gary Russell + * @author Soby Chacko * @since 1.1 * */ @@ -105,6 +107,23 @@ interface ConsumerSeekCallback { */ void seek(String topic, int partition, long offset); + /** + * Perform a seek operation based on the given function to compute the offset to seek to. + * The function provides the user with access to the current offset in the consumer which + * is the current position, i.e, the next offset to be fetched. + * When called from {@link ConsumerSeekAware#onPartitionsAssigned(Map, ConsumerSeekCallback)} + * or from {@link ConsumerSeekAware#onIdleContainer(Map, ConsumerSeekCallback)} + * perform the seek immediately on the consumer. When called from elsewhere, + * queue the seek operation to the consumer. The queued seek will occur after any + * pending offset commits. The consumer must be currently assigned the specified + * partition. + * @param topic the topic. + * @param partition the partition. + * @param offsetComputeFunction function to compute the absolute offset to seek to. + * @since 3.2.0 + */ + void seek(String topic, int partition, Function offsetComputeFunction); + /** * Perform a seek to beginning operation. When called from * {@link ConsumerSeekAware#onPartitionsAssigned(Map, ConsumerSeekCallback)} or diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index f6be9d2bb7..b455a1bbaf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -3324,6 +3324,12 @@ public void seek(String topic, int partition, long offset) { this.seeks.add(new TopicPartitionOffset(topic, partition, offset)); } + @Override + public void seek(String topic, int partition, Function offsetComputeFunction) { + this.seeks.add(new TopicPartitionOffset(topic, partition, offsetComputeFunction.apply( + this.consumer.position(new TopicPartition(topic, partition))))); + } + @Override public void seekToBeginning(String topic, int partition) { this.seeks.add(new TopicPartitionOffset(topic, partition, SeekPosition.BEGINNING)); @@ -3768,6 +3774,13 @@ public void seek(String topic, int partition, long offset) { ListenerConsumer.this.consumer.seek(new TopicPartition(topic, partition), offset); } + @Override + public void seek(String topic, int partition, Function offsetComputeFunction) { + ListenerConsumer.this.consumer.seek(new TopicPartition(topic, partition), + offsetComputeFunction.apply( + ListenerConsumer.this.consumer.position(new TopicPartition(topic, partition)))); + } + @Override public void seekToBeginning(String topic, int partition) { ListenerConsumer.this.consumer.seekToBeginning( diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java index a48a15f711..7b573bf2aa 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java @@ -291,6 +291,106 @@ void testSyncRelativeSeeks() throws InterruptedException { container.stop(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void seekOffsetFromComputeFnOnInitAssignmentAndIdleContainer() throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = mock(Consumer.class); + TestMessageListener3 listener = new TestMessageListener3(); + ConsumerRecords empty = new ConsumerRecords<>(Collections.emptyMap()); + willAnswer(invocation -> { + Thread.sleep(10); + return empty; + }).given(consumer).poll(any()); + TopicPartition tp0 = new TopicPartition("test-topic", 0); + TopicPartition tp1 = new TopicPartition("test-topic", 1); + TopicPartition tp2 = new TopicPartition("test-topic", 2); + TopicPartition tp3 = new TopicPartition("test-topic", 3); + List assignments = List.of(tp0, tp1, tp2, tp3); + willAnswer(invocation -> { + ((ConsumerRebalanceListener) invocation.getArgument(1)) + .onPartitionsAssigned(assignments); + return null; + }).given(consumer).subscribe(any(Collection.class), any()); + given(consumer.position(any())).willReturn(30L); // current offset position is always 30 + given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setGroupId("grp"); + containerProperties.setMessageListener(listener); + containerProperties.setIdleEventInterval(10L); + containerProperties.setMissingTopicsFatal(false); + ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory, + containerProperties); + container.start(); + assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + verify(consumer).seek(tp0, 20L); + verify(consumer).seek(tp1, 21L); + verify(consumer).seek(tp2, 22L); + verify(consumer).seek(tp3, 23L); + + verify(consumer).seek(tp0, 30L); + verify(consumer).seek(tp1, 30L); + verify(consumer).seek(tp2, 30L); + verify(consumer).seek(tp3, 30L); + container.stop(); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void seekOffsetFromComputeFnFromActiveListener() throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = mock(Consumer.class); + TestMessageListener4 listener = new TestMessageListener4(); + CountDownLatch latch = new CountDownLatch(2); + TopicPartition tp0 = new TopicPartition("test-topic", 0); + TopicPartition tp1 = new TopicPartition("test-topic", 1); + TopicPartition tp2 = new TopicPartition("test-topic", 2); + TopicPartition tp3 = new TopicPartition("test-topic", 3); + List assignments = List.of(tp0, tp1, tp2, tp3); + Map>> recordMap = new HashMap<>(); + recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("test-topic", 0, 0, null, "test-data"))); + recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("test-topic", 1, 0, null, "test-data"))); + recordMap.put(tp2, Collections.singletonList(new ConsumerRecord("test-topic", 2, 0, null, "test-data"))); + recordMap.put(tp3, Collections.singletonList(new ConsumerRecord("test-topic", 3, 0, null, "test-data"))); + ConsumerRecords records = new ConsumerRecords<>(recordMap); + willAnswer(invocation -> { + Thread.sleep(10); + if (listener.latch.getCount() <= 0) { + latch.countDown(); + } + return records; + }).given(consumer).poll(any()); + willAnswer(invocation -> { + ((ConsumerRebalanceListener) invocation.getArgument(1)) + .onPartitionsAssigned(assignments); + return null; + }).given(consumer).subscribe(any(Collection.class), any()); + given(consumer.position(tp0)).willReturn(30L); // current offset 30, target 20 (see hard-coded in onMessage) + given(consumer.position(tp1)).willReturn(10L); // current 10, target 21 + given(consumer.position(tp2)).willReturn(22L); // current 22, target 22 + given(consumer.position(tp3)).willReturn(22L); // current 22, target 23 + given(consumer.beginningOffsets(any())).willReturn(assignments.stream() + .collect(Collectors.toMap(tp -> tp, tp -> 0L))); + given(consumer.endOffsets(any())).willReturn(assignments.stream() + .collect(Collectors.toMap(tp -> tp, tp -> 100L))); + given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setGroupId("grp"); + containerProperties.setMessageListener(listener); + containerProperties.setMissingTopicsFatal(false); + ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory, + containerProperties); + container.start(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + verify(consumer).seek(tp0, 20L); + verify(consumer).seek(tp1, 10L); + verify(consumer).seek(tp2, 22L); + verify(consumer).seek(tp3, 22L); + container.stop(); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test @DisplayName("Seek from activeListener") @@ -1282,4 +1382,81 @@ public void onIdleContainer(Map assignments, ConsumerSeekC } + public static class TestMessageListener3 implements MessageListener, ConsumerSeekAware { + + private static final ThreadLocal callbacks = new ThreadLocal<>(); + + CountDownLatch latch = new CountDownLatch(2); + + @Override + public void onMessage(ConsumerRecord data) { + + } + + @Override + public void registerSeekCallback(ConsumerSeekCallback callback) { + callbacks.set(callback); + } + + @Override + public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { + if (latch.getCount() > 0) { + int absoluteTarget1 = 20; + int absoluteTarget2 = 21; + int absoluteTarget3 = 22; + int absoluteTarget4 = 23; + callback.seek("test-topic", 0, current -> current > absoluteTarget1 ? absoluteTarget1 : current); + callback.seek("test-topic", 1, current -> current > absoluteTarget2 ? absoluteTarget2 : current); + callback.seek("test-topic", 2, current -> current > absoluteTarget3 ? absoluteTarget3 : current); + callback.seek("test-topic", 3, current -> current > absoluteTarget4 ? absoluteTarget4 : current); + } + this.latch.countDown(); + } + + + @Override + public void onIdleContainer(Map assignments, ConsumerSeekCallback callback) { + if (latch.getCount() > 0) { + int absoluteTarget = 31; + callback.seek("test-topic", 0, current -> current > absoluteTarget ? absoluteTarget : current); + callback.seek("test-topic", 1, current -> current > absoluteTarget ? absoluteTarget : current); + callback.seek("test-topic", 2, current -> current > absoluteTarget ? absoluteTarget : current); + callback.seek("test-topic", 3, current -> current > absoluteTarget ? absoluteTarget : current); + } + this.latch.countDown(); + } + + } + + public static class TestMessageListener4 implements MessageListener, ConsumerSeekAware { + + private static final ThreadLocal callbacks = new ThreadLocal<>(); + + CountDownLatch latch = new CountDownLatch(1); + + @Override + public void onMessage(ConsumerRecord data) { + ConsumerSeekCallback callback = callbacks.get(); + if (latch.getCount() > 0) { + + int absoluteTarget1 = 20; + int absoluteTarget2 = 21; + int absoluteTarget3 = 22; + int absoluteTarget4 = 23; + + callback.seek("test-topic", 0, current -> current > absoluteTarget1 ? absoluteTarget1 : current); + callback.seek("test-topic", 1, current -> current > absoluteTarget2 ? absoluteTarget2 : current); + callback.seek("test-topic", 2, current -> current > absoluteTarget3 ? absoluteTarget3 : current); + callback.seek("test-topic", 3, current -> current > absoluteTarget4 ? absoluteTarget4 : current); + } + this.latch.countDown(); + } + + @Override + public void registerSeekCallback(ConsumerSeekCallback callback) { + callbacks.set(callback); + } + + } + }