Skip to content

Acknowledge+commit always issued in AckMode.MANUAL #3695

Closed
@Ermintar

Description

@Ermintar

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3.0

Worked fine with kafka 3.1.2

Describe the bug

MessagingMessageListenerAdapter issues acknoledge on successfull read handler, even if consumer ack mode is set to MANUAL, causing offsets to be commited

To Reproduce

We have a consumer that uses a partitionFinder to consume the same message on multiple pods without groupId, based on this example https://stackoverflow.com/questions/64022266/what-is-the-simplest-spring-kafka-kafkalistener-configuration-to-consume-all-re/
We can't set different groupIds to our consumers due to project restrictions and therefore use get-latest offset at the start and never- commit-offset strategy.

Config files

@Configuration
public class CommonKafkaConfig {

    @Bean
    public KafkaListenerContainerFactory manualListenerContainerFactory(ConsumerFactory consumerFactory) {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
        //use default factory
        factory.setConsumerFactory(consumerFactory);
        var props = factory.getContainerProperties();
        props.setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

    @Bean
    public PartitionFinder сommonPartitionFinder(ConsumerFactory<String, Object> consumerFactory) {
        return new PartitionFinder(consumerFactory);
    }
}

Here's our listener, we do not acknoledge the message. (Passing Acknoledgement object to receiverHandle doesn't change behaviour)

@Component
@KafkaListener(topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "${kafka-topics.my-event-topic}",
        partitions = "#{@сommonPartitionFinder.partitions('${kafka-topics.my-event-topic}')}"),
        containerFactory = "manualListenerContainerFactory",
        properties = {"enable.auto.commit:false", "auto.offset.reset:latest"},
        idIsGroup = false,
        autoStartup = "true")
public class MyEventListener implements ConsumerSeekAware {


    @KafkaHandler
    public void receiverHandle(@Payload(required = false) Object message,
                               Acknowledgment acknowledgment) {
        try {
            log.info("Hello message {}", message);
        } catch (Exception e) {
            log.error("error when process event", e);
        } finally {
            log.info("Done");
        }
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
        log.info("assign partitions with offset {}", assignments);
        ConsumerSeekAware.super.onPartitionsAssigned(assignments, callback);
    }
}

After upgrading to spring-kafka 3.3.0 we started receiveing the folllowing error

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.errors.InvalidGroupIdException's; no record information is available
	at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198) ~[spring-kafka-3.3.0.jar:3.3.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1984) ~[spring-kafka-3.3.0.jar:3.3.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1379) ~[spring-kafka-3.3.0.jar:3.3.0]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.

The error would be valid, if the message was acknoledged and we were trying to commit our offsets.

Brief comparison between 3.3.0 and spring-kafka 3.1.2 showed the following changes

Old:

KafkaMessageListenerContainer.buildCommits returns empty list, because this.offsets.entrySet()) is empty

No acknowledge is issued above the stack trace,

New:

KafkaMessageListenerContainer.buildCommits returns single record, because offsets where acknoledged

I suspect the root cause is within this block

MessagingMessageListenerAdapter

		completableFutureResult.whenComplete((r, t) -> {
			try {
				if (t == null) {
					asyncSuccess(r, replyTopic, source, messageReturnType);
					acknowledge(acknowledgment);
				}
				else {
					Throwable cause = t instanceof CompletionException ? t.getCause() : t;
					observation.error(cause);
					asyncFailure(request, acknowledgment, consumer, cause, source);
				}
			}
			finally {
				observation.stop();
			}
		});

Expected behavior

KafkaMessageListenerContainer.buildCommits returning empty list, no attempt to commit offsets

A link to a GitHub repository with a minimal, reproducible, sample.

Thank you for any feedback or workaround

UPD: after some source code browsing, found an intersting block, setting acknolegde operation to use noAck-class

	@Nullable
	protected Type determineInferredType(@Nullable Method method) { 
			//skipped code here
			
			boolean isAck = parameterIsType(parameterType, Acknowledgment.class);
			this.hasAckParameter |= isAck;
			if (isAck) {
				this.noOpAck |= methodParameter.getParameterAnnotation(NonNull.class) != null;
			}
			//skipped here
	}

Unfortunatelly, this settings work only when KafkaListener annotation is set above method and annotated with NotNull annotation, not the component class. (IMAO, annotations above class and above method should work in the same way)
So, after the WA my source code looks like this

@Component
public class MyEventListener implements ConsumerSeekAware {

    @KafkaListener(topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "${kafka-topics.my-event-topic}",
            partitions = "#{@сommonPartitionFinder.partitions('${kafka-topics.my-event-topic}')}"),
            containerFactory = "manualListenerContainerFactory",
            properties = {"enable.auto.commit:false", "auto.offset.reset:latest"},
            idIsGroup = false,
            autoStartup = "true")
    public void receiverHandle(@Payload(required = false) Object message,
                               @NonNull Acknowledgment acknowledgment) {
        try {
            log.info("Hello message {}", message);
        } catch (Exception e) {
            log.error("error when process event", e);
        } finally {
            log.info("Done");
        }
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
        log.info("assign partitions with offset {}", assignments);
        ConsumerSeekAware.super.onPartitionsAssigned(assignments, callback);
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions