Unexpected behavior consuming messages after rebalance from async+manual consumer #2575
-
This is a PoC for an unexpected behavior we are seeing in spring-kafka 3.0.2 Consumer is configured with Our assumption comes from this part in the docs "The consumer will be paused (no new records delivered) until all the offsets for the previous poll have been committed.", but is this assumption correct? The (unexpected) behaviorWe are seeing that when a poll batch has been processed, but not been fully acknowledge and there is a rebalance event for some reason the instance re-consumes everything and more (beyond the 500 batch size) This is our setup for the PoCtests:
Test 1
Test 2
CodeYou can find the full code here https://github.com/AminArria/consumer-poc What I imagine are the important parts ConsumerConfig.java @Bean(LISTENER_CONTAINER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> ListenerContainerFactory(ConsumerFactory<byte[], byte[]> consumerFactory) {
final ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
factory.getContainerProperties().setAsyncAcks(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.TRACE);
return factory;
}
@Bean
public ConsumerFactory<byte[], byte[]> consumerFactory() {
final Map<String, Object> configs = kafkaProperties.buildConsumerProperties();
configs.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
configs.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
configs.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, List.of(RoundRobinAssignor.class));
configs.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, List.of("localhost:19092"));
return new DefaultKafkaConsumerFactory<>(configs);
} KafkaListenerService.java @KafkaListener(id = "listener-destination",
topics = "destination",
autoStartup = "true",
containerFactory = LISTENER_CONTAINER_FACTORY)
public void onDestination(final ConsumerRecord<byte[], byte[]> record,
final Acknowledgment acknowledgment) {
accept(record, acknowledgment);
}
private void accept(final ConsumerRecord<byte[], byte[]> record,
final Acknowledgment acknowledgment) {
if (record.offset() > 0 && record.offset() % 10 == 0) {
// no ack
} else {
System.out.println("ACK " + record.offset());
acknowledgment.acknowledge();
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 9 replies
-
The reported behavior is exactly as I would have expected; why would you expect something different? |
Beta Was this translation helpful? Give feedback.
The reported behavior is exactly as I would have expected; why would you expect something different?