Skip to content

Commit

Permalink
GH-3078: Compute Seek Position from Current Offset
Browse files Browse the repository at this point in the history
Fixes: #3078

 * Provide a new API method in `ConsumerSeekCallback` to seek to an offset
   based on the current offset. This is accomplished by a user-defined function
   where the user can make decision on the offset to seek to based on the current
   offset which is available via the function's input.
 * Adding tests, docs.

* Addressing PR review
  • Loading branch information
sobychacko authored Mar 5, 2024
1 parent f4c17c8 commit 29f8d18
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ The callback has the following methods:
----
void seek(String topic, int partition, long offset);
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
Expand All @@ -49,6 +51,11 @@ void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
----

The two different variants of the `seek` methods provide a way to seek to an arbitrary offset.
The method that takes a `Function` as an argument to compute the offset was added in version 3.2 of the framework.
This function provides access to the current offset (the current position returned by the consumer, which is the next offset to be fetched).
The user can decide what offset to seek to based on the current offset in the consumer as part of the function definition.

`seekRelative` was added in version 2.3, to perform relative seeks.

* `offset` negative and `toCurrent` `false` - seek relative to the end of the partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,9 @@ See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Proc
=== Change @RetryableTopic SameIntervalTopicReuseStrategy default value
Change `@RetryableTopic` property `SameIntervalTopicReuseStrategy` default value to `SINGLE_TOPIC`.
See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topic for maxInterval Exponential Delay].

[[x32-seek-offset-compute-fn]]
=== New API method to seek to an offset based on a user provided function
`ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument.
See xref:kafka/seek.adoc#seek[Seek API Docs] for more details.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.util.Collection;
import java.util.Map;
import java.util.function.Function;

import org.apache.kafka.common.TopicPartition;

Expand All @@ -27,6 +28,7 @@
* seek operation.
*
* @author Gary Russell
* @author Soby Chacko
* @since 1.1
*
*/
Expand Down Expand Up @@ -105,6 +107,23 @@ interface ConsumerSeekCallback {
*/
void seek(String topic, int partition, long offset);

/**
* Perform a seek operation based on the given function to compute the offset to seek to.
* The function provides the user with access to the current offset in the consumer which
* is the current position, i.e, the next offset to be fetched.
* When called from {@link ConsumerSeekAware#onPartitionsAssigned(Map, ConsumerSeekCallback)}
* or from {@link ConsumerSeekAware#onIdleContainer(Map, ConsumerSeekCallback)}
* perform the seek immediately on the consumer. When called from elsewhere,
* queue the seek operation to the consumer. The queued seek will occur after any
* pending offset commits. The consumer must be currently assigned the specified
* partition.
* @param topic the topic.
* @param partition the partition.
* @param offsetComputeFunction function to compute the absolute offset to seek to.
* @since 3.2.0
*/
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);

/**
* Perform a seek to beginning operation. When called from
* {@link ConsumerSeekAware#onPartitionsAssigned(Map, ConsumerSeekCallback)} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3324,6 +3324,12 @@ public void seek(String topic, int partition, long offset) {
this.seeks.add(new TopicPartitionOffset(topic, partition, offset));
}

@Override
public void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction) {
this.seeks.add(new TopicPartitionOffset(topic, partition, offsetComputeFunction.apply(
this.consumer.position(new TopicPartition(topic, partition)))));
}

@Override
public void seekToBeginning(String topic, int partition) {
this.seeks.add(new TopicPartitionOffset(topic, partition, SeekPosition.BEGINNING));
Expand Down Expand Up @@ -3768,6 +3774,13 @@ public void seek(String topic, int partition, long offset) {
ListenerConsumer.this.consumer.seek(new TopicPartition(topic, partition), offset);
}

@Override
public void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction) {
ListenerConsumer.this.consumer.seek(new TopicPartition(topic, partition),
offsetComputeFunction.apply(
ListenerConsumer.this.consumer.position(new TopicPartition(topic, partition))));
}

@Override
public void seekToBeginning(String topic, int partition) {
ListenerConsumer.this.consumer.seekToBeginning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,106 @@ void testSyncRelativeSeeks() throws InterruptedException {
container.stop();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void seekOffsetFromComputeFnOnInitAssignmentAndIdleContainer() throws InterruptedException {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = mock(Consumer.class);
TestMessageListener3 listener = new TestMessageListener3();
ConsumerRecords empty = new ConsumerRecords<>(Collections.emptyMap());
willAnswer(invocation -> {
Thread.sleep(10);
return empty;
}).given(consumer).poll(any());
TopicPartition tp0 = new TopicPartition("test-topic", 0);
TopicPartition tp1 = new TopicPartition("test-topic", 1);
TopicPartition tp2 = new TopicPartition("test-topic", 2);
TopicPartition tp3 = new TopicPartition("test-topic", 3);
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
return null;
}).given(consumer).subscribe(any(Collection.class), any());
given(consumer.position(any())).willReturn(30L); // current offset position is always 30
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
ContainerProperties containerProperties = new ContainerProperties("test-topic");
containerProperties.setGroupId("grp");
containerProperties.setMessageListener(listener);
containerProperties.setIdleEventInterval(10L);
containerProperties.setMissingTopicsFatal(false);
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory,
containerProperties);
container.start();
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(consumer).seek(tp0, 20L);
verify(consumer).seek(tp1, 21L);
verify(consumer).seek(tp2, 22L);
verify(consumer).seek(tp3, 23L);

verify(consumer).seek(tp0, 30L);
verify(consumer).seek(tp1, 30L);
verify(consumer).seek(tp2, 30L);
verify(consumer).seek(tp3, 30L);
container.stop();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void seekOffsetFromComputeFnFromActiveListener() throws InterruptedException {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = mock(Consumer.class);
TestMessageListener4 listener = new TestMessageListener4();
CountDownLatch latch = new CountDownLatch(2);
TopicPartition tp0 = new TopicPartition("test-topic", 0);
TopicPartition tp1 = new TopicPartition("test-topic", 1);
TopicPartition tp2 = new TopicPartition("test-topic", 2);
TopicPartition tp3 = new TopicPartition("test-topic", 3);
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("test-topic", 0, 0, null, "test-data")));
recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("test-topic", 1, 0, null, "test-data")));
recordMap.put(tp2, Collections.singletonList(new ConsumerRecord("test-topic", 2, 0, null, "test-data")));
recordMap.put(tp3, Collections.singletonList(new ConsumerRecord("test-topic", 3, 0, null, "test-data")));
ConsumerRecords records = new ConsumerRecords<>(recordMap);
willAnswer(invocation -> {
Thread.sleep(10);
if (listener.latch.getCount() <= 0) {
latch.countDown();
}
return records;
}).given(consumer).poll(any());
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
return null;
}).given(consumer).subscribe(any(Collection.class), any());
given(consumer.position(tp0)).willReturn(30L); // current offset 30, target 20 (see hard-coded in onMessage)
given(consumer.position(tp1)).willReturn(10L); // current 10, target 21
given(consumer.position(tp2)).willReturn(22L); // current 22, target 22
given(consumer.position(tp3)).willReturn(22L); // current 22, target 23
given(consumer.beginningOffsets(any())).willReturn(assignments.stream()
.collect(Collectors.toMap(tp -> tp, tp -> 0L)));
given(consumer.endOffsets(any())).willReturn(assignments.stream()
.collect(Collectors.toMap(tp -> tp, tp -> 100L)));
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
ContainerProperties containerProperties = new ContainerProperties("test-topic");
containerProperties.setGroupId("grp");
containerProperties.setMessageListener(listener);
containerProperties.setMissingTopicsFatal(false);
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory,
containerProperties);
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
verify(consumer).seek(tp0, 20L);
verify(consumer).seek(tp1, 10L);
verify(consumer).seek(tp2, 22L);
verify(consumer).seek(tp3, 22L);
container.stop();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
@DisplayName("Seek from activeListener")
Expand Down Expand Up @@ -1282,4 +1382,81 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC

}

public static class TestMessageListener3 implements MessageListener<String, String>, ConsumerSeekAware {

private static final ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();

CountDownLatch latch = new CountDownLatch(2);

@Override
public void onMessage(ConsumerRecord<String, String> data) {

}

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
callbacks.set(callback);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
if (latch.getCount() > 0) {
int absoluteTarget1 = 20;
int absoluteTarget2 = 21;
int absoluteTarget3 = 22;
int absoluteTarget4 = 23;
callback.seek("test-topic", 0, current -> current > absoluteTarget1 ? absoluteTarget1 : current);
callback.seek("test-topic", 1, current -> current > absoluteTarget2 ? absoluteTarget2 : current);
callback.seek("test-topic", 2, current -> current > absoluteTarget3 ? absoluteTarget3 : current);
callback.seek("test-topic", 3, current -> current > absoluteTarget4 ? absoluteTarget4 : current);
}
this.latch.countDown();
}


@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
if (latch.getCount() > 0) {
int absoluteTarget = 31;
callback.seek("test-topic", 0, current -> current > absoluteTarget ? absoluteTarget : current);
callback.seek("test-topic", 1, current -> current > absoluteTarget ? absoluteTarget : current);
callback.seek("test-topic", 2, current -> current > absoluteTarget ? absoluteTarget : current);
callback.seek("test-topic", 3, current -> current > absoluteTarget ? absoluteTarget : current);
}
this.latch.countDown();
}

}

public static class TestMessageListener4 implements MessageListener<String, String>, ConsumerSeekAware {

private static final ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();

CountDownLatch latch = new CountDownLatch(1);

@Override
public void onMessage(ConsumerRecord<String, String> data) {
ConsumerSeekCallback callback = callbacks.get();
if (latch.getCount() > 0) {

int absoluteTarget1 = 20;
int absoluteTarget2 = 21;
int absoluteTarget3 = 22;
int absoluteTarget4 = 23;

callback.seek("test-topic", 0, current -> current > absoluteTarget1 ? absoluteTarget1 : current);
callback.seek("test-topic", 1, current -> current > absoluteTarget2 ? absoluteTarget2 : current);
callback.seek("test-topic", 2, current -> current > absoluteTarget3 ? absoluteTarget3 : current);
callback.seek("test-topic", 3, current -> current > absoluteTarget4 ? absoluteTarget4 : current);
}
this.latch.countDown();
}

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
callbacks.set(callback);
}

}

}

0 comments on commit 29f8d18

Please sign in to comment.