ack-mode = MANUAL_IMMEDIATE, enable.auto.commit = false and @RetryableTopic not working as expected #2495
-
Hi all! I am using spring-kafka 2.9.0 and currently testing it with @embeddedkafka. @Test
public void testFailuresDontBlockListener() throws Exception {
final Request badRequest = randomRequest(); // create random request
processor.failOn(badRequest); // tell mock processor to always fail on this request
final ListenableFuture<SendResult<String, GetRequestResponseDto>> future = kafkaTemplate.send(new ProducerRecord<>(properties.processRequestTopic(), badRequest)); // send bad request to kafka
future.get(); // wait until it is acknowledged
final Request okRequest = randomRequest(); // create random request (processor will not fail while processing it)
kafkaTemplate.send(new ProducerRecord<>(properties.processRequestTopic(), okRequest)); // send it
Thread.sleep(1000); // wait until things gets done
verify(port, times(1)).sendResponse(any()); // verify that badRequest did not block okRequest from processing
} This test case actually passes, but I can't understand why. @RetryableTopic(
attempts = "100",
backoff = @Backoff(
delayExpression = "20",
multiplierExpression = "2"
),
dltStrategy = DltStrategy.NO_DLT,
kafkaTemplate = "myKafkaTemplate"
)
@KafkaListener(
topics = "myTopic",
groupId = "myGroupId",
concurrency = "1",
containerFactory = "myContainerFactory"
)
public void receive(final ConsumerRecord<?, Request> rec, final Acknowledgment acknowledgment) {
logger.debug("Message received {}", rec);
final Request value = rec.value();
// mocked processor which will fail for bad request
// and acknowledgement will not be called
final Response response = processor.process(value);
final ListenableFuture<SendResult<String, Response>> future = kafkaTemplate.send(
new ProducerRecord<>(
properties.sendResponseTopic(),
response
)
);
future.addCallback(
new ListenableFutureCallback<SendResult<String, Response>>() {
@Override
public void onFailure(final @NotNull Throwable ex) {
logger.error("Exception while sending {} to sendResponseTopic", rec, ex);
}
@Override
public void onSuccess(final SendResult<String, Response> result) {
acknowledgment.acknowledge();
logger.debug("Successfully processed {}", rec);
}
}
);
}
So as you see, when listener receives the first Also I should note, that if I comment out @RetryableTopic -- everything works as expected and listener does not process Am I missing something here? Does it mean, that ack-mode does not matter when we use @RetryableTopic? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 4 replies
-
That's the part that doesn't make any sense to me. Receiving okRequest after badRequest is exactly what I would expect. Kafka maintains 2 pointers, the current position and the committed offset. When the poll returns badrequest, the position is updated to 1 (assuming Whether or not the offset was committed, the next record returned will be the one at offset 1 (assuming When you comment out the retrable topic, the default error handler will kick in, which will re-seek the position to 0 and replay the bad record (10 times by default). |
Beta Was this translation helpful? Give feedback.
That's the part that doesn't make any sense to me.
Receiving okRequest after badRequest is exactly what I would expect.
Kafka maintains 2 pointers, the current position and the committed offset.
When the poll returns badrequest, the position is updated to 1 (assuming
max.poll.records=1
.Whether or not the offset was committed, the next record returned will be the one at offset 1 (assuming
max.poll.records=1
- otherwise the okRequest would likely have been returned by the first poll, and the position would have been updated to 2, the container simply passing …