Skip to content

Commit

Permalink
Fixes #3178
Browse files Browse the repository at this point in the history
* add adoc whats-new.adoc or listener-annotation.adoc.
* align `@PartitionOffset` to `TopicPartitionOffset`.
* add unit test for `@PartitionOffset.SeekPosition`.
* add unit for SpEL partitions to Integer[] and Integer.
  • Loading branch information
Wzy19930507 committed Apr 10, 2024
1 parent df8b7a1 commit e6f5304
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,25 @@ public void listen(ConsumerRecord<?, ?> record) {

The initial offset will be applied to all 6 partitions.

Since 3.2, `@PartitionOffset` support `SeekPosition.END`, `SeekPosition.BEGINNING`, `SeekPosition.TIMESTAMP`, `seekPosition` match `SeekPosition` enum name:

[source, java]
----
@KafkaListener(id = "seekPositionTime", topicPartitions = {
@TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
})
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
----

If seekPosition set `END` or `BEGINNING` will ignore `initialOffset` and `relativeToCurrent`.
If seekPosition set `TIMESTAMP`, `initialOffset` means timestamp.

[[manual-acknowledgment]]
== Manual Acknowledgment

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ Provides new class `EndpointHandlerMultiMethod` to handler multi method for retr
`ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument.
See xref:kafka/seek.adoc#seek[Seek API Docs] for more details.

[[x32-annotation-partition-offset-seek-position]]
=== @PartitionOffset support SeekPosition
`@PartitionOffset` support `TopicPartitionOffset.SeekPosition`.
`@PartitionOffset` add new properties `seekPosition`.
See xref:kafka/receiving-messages/listener-annotation.adoc#manual-assignment[manual-assignment] for more details.

[[x32-topic-partition-offset-constructor]]
=== New constructor in TopicPartitionOffset that accepts a function to compute the offset to seek to
`TopicPartitionOffset` has a new constructor that takes a user-provided function to compute the offset to seek to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicSchedulerWrapper;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
Expand Down Expand Up @@ -827,10 +828,8 @@ private String getEndpointGroupId(KafkaListener kafkaListener, @Nullable String
private TopicPartitionOffset[] resolveTopicPartitions(KafkaListener kafkaListener) {
TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
List<TopicPartitionOffset> result = new ArrayList<>();
if (topicPartitions.length > 0) {
for (TopicPartition topicPartition : topicPartitions) {
result.addAll(resolveTopicPartitionsList(topicPartition));
}
for (TopicPartition topicPartition : topicPartitions) {
result.addAll(resolveTopicPartitionsList(topicPartition));
}
return result.toArray(new TopicPartitionOffset[0]);
}
Expand Down Expand Up @@ -877,7 +876,7 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
() -> "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
List<TopicPartitionOffset> result = new ArrayList<>();
for (String partition : partitions) {
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result, null, false, false);
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
}
if (partitionOffsets.length == 1 && resolveExpression(partitionOffsets[0].partition()).equals("*")) {
result.forEach(tpo -> {
Expand All @@ -890,7 +889,8 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
Assert.isTrue(!partitionOffset.partition().equals("*"), () ->
"Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result);
resolvePartitionAsInteger((String) topic, resolveExpression(partitionOffset.partition()), result,
resolveInitialOffset(topic, partitionOffset), isRelative(topic, partitionOffset), true);
resolveInitialOffset(topic, partitionOffset), isRelative(topic, partitionOffset), true,
resolveExpression(partitionOffset.seekPosition()));
}
}
Assert.isTrue(!result.isEmpty(), () -> "At least one partition required for " + topic);
Expand All @@ -899,11 +899,11 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top

private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {
Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
Long initialOffset;
long initialOffset;
if (initialOffsetValue instanceof String str) {
Assert.state(StringUtils.hasText(str),
() -> "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
initialOffset = Long.valueOf(str);
initialOffset = Long.parseLong(str);
}
else if (initialOffsetValue instanceof Long lng) {
initialOffset = lng;
Expand Down Expand Up @@ -954,20 +954,33 @@ else if (resolvedValue instanceof Iterable) {
}
}

private void resolvePartitionAsInteger(String topic, Object resolvedValue, List<TopicPartitionOffset> result) {
resolvePartitionAsInteger(topic, resolvedValue, result, null, false, false, null);
}

@SuppressWarnings(UNCHECKED)
private void resolvePartitionAsInteger(String topic, Object resolvedValue,
List<TopicPartitionOffset> result, @Nullable Long offset, boolean isRelative, boolean checkDups) {
private void resolvePartitionAsInteger(String topic, Object resolvedValue, List<TopicPartitionOffset> result,
@Nullable Long offset, boolean isRelative, boolean checkDups, @Nullable Object seekPosition) {

if (resolvedValue instanceof String[] strArr) {
for (Object object : strArr) {
resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups);
resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups, seekPosition);
}
return;
}
else if (resolvedValue instanceof String str) {
else if (resolvedValue instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValue) {
resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups, seekPosition);
}
return;
}

TopicPartitionOffset.SeekPosition tpoSp = resloveTopicPartitionOffsetSeekPosition(seekPosition);
if (resolvedValue instanceof String str) {
Assert.state(StringUtils.hasText(str),
() -> "partition in @TopicPartition for topic '" + topic + "' cannot be empty");
List<TopicPartitionOffset> collected = parsePartitions(str)
.map(part -> new TopicPartitionOffset(topic, part, offset, isRelative))
.map(part -> createTopicPartitionOffset(topic, part, offset, isRelative, tpoSp))
.toList();
if (checkDups) {
collected.forEach(tpo -> {
Expand All @@ -980,23 +993,46 @@ else if (resolvedValue instanceof String str) {
}
else if (resolvedValue instanceof Integer[] intArr) {
for (Integer partition : intArr) {
result.add(new TopicPartitionOffset(topic, partition));
result.add(createTopicPartitionOffset(topic, partition, offset, isRelative, tpoSp));
}
}
else if (resolvedValue instanceof Integer intgr) {
result.add(new TopicPartitionOffset(topic, intgr));
}
else if (resolvedValue instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValue) {
resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups);
}
result.add(createTopicPartitionOffset(topic, intgr, offset, isRelative, tpoSp));
}
else {
throw new IllegalArgumentException(String.format(
"@KafKaListener for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
}
}

@Nullable
private TopicPartitionOffset.SeekPosition resloveTopicPartitionOffsetSeekPosition(@Nullable Object seekPosition) {
TopicPartitionOffset.SeekPosition resloveTpoSp = null;
if (seekPosition instanceof String string) {
if (SeekPosition.BEGINNING.name().equals(string.toUpperCase())) {
resloveTpoSp = SeekPosition.BEGINNING;
}
else if (SeekPosition.END.name().equals(string.toUpperCase())) {
resloveTpoSp = SeekPosition.END;
}
else if (SeekPosition.TIMESTAMP.name().equals(string.toUpperCase())) {
resloveTpoSp = SeekPosition.TIMESTAMP;
}
}
return resloveTpoSp;
}

private TopicPartitionOffset createTopicPartitionOffset(String topic, int partition, @Nullable Long offset,
boolean isRelative, @Nullable SeekPosition seekPosition) {

if (seekPosition != null) {
return new TopicPartitionOffset(topic, partition, offset, seekPosition);
}
else {
return new TopicPartitionOffset(topic, partition, offset, isRelative);
}
}

private String resolveExpressionAsString(String value, String attribute) {
Object resolved = resolveExpression(value);
if (resolved instanceof String str) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,11 +20,14 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;

/**
* Used to add partition/initial offset information to a {@code KafkaListener}.
*
* @author Artem Bilan
* @author Gary Russell
* @author Wang Zhiyang
*/
@Target({})
@Retention(RetentionPolicy.RUNTIME)
Expand Down Expand Up @@ -60,4 +63,16 @@
*/
String relativeToCurrent() default "false";

/**
* Providers seek position strategy, by default, seek by offset.
* Upper case {@link SeekPosition} seek position enum name to match "special" seeks.
* If seekPosition set 'BEGINNING' or 'END', ignore {@code relativeToCurrent}
* and {@code initialOffset}.
* If seekPosition set 'TIMESTAMP', initialOffset means time stamp, ignore {@code relativeToCurrent}.
* @return special seeks.
* @since 3.2
* @see SeekPosition
*/
String seekPosition() default "";

}
Original file line number Diff line number Diff line change
Expand Up @@ -3103,6 +3103,7 @@ private void initPartitionsIfNeeded() {
.filter(e -> SeekPosition.TIMESTAMP.equals(e.getValue().seekPosition))
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().offset));
if (!times.isEmpty()) {
times.forEach((key, value) -> partitions.remove(key));
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer.offsetsForTimes(times);
offsetsForTimes.forEach((tp, off) -> {
if (off == null) {
Expand All @@ -3117,7 +3118,7 @@ private void initPartitionsIfNeeded() {
if (this.consumerSeekAwareListener != null) {
this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream()
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
this.seekCallback);
}
}
Expand Down
Loading

0 comments on commit e6f5304

Please sign in to comment.