From 550e6b11a73eae31e5e3b15791a388bd5acc4b35 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 17 Jan 2024 11:11:43 -0500 Subject: [PATCH] GH-2981: Suppress abortTransaction exception Fixes: #2981 When `producer.abortTransaction()` fails, the original exception is lost in `KafkaTemplate`. * Catch an exception on `producer.abortTransaction()` and `ex.addSuppressed(abortException)` **Cherry-pick to `3.0.x`** --- .../kafka/core/KafkaTemplate.java | 11 +++++--- .../core/KafkaTemplateTransactionTests.java | 27 ++++++++++++++++++- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 219f5c71e0..9fe9e75574 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -657,9 +657,14 @@ public T executeInTransaction(OperationsCallback callback) { catch (SkipAbortException e) { // NOSONAR - exception flow control throw ((RuntimeException) e.getCause()); // NOSONAR - lost stack trace } - catch (Exception e) { - producer.abortTransaction(); - throw e; + catch (Exception ex) { + try { + producer.abortTransaction(); + } + catch (Exception abortException) { + ex.addSuppressed(abortException); + } + throw ex; } finally { this.producers.remove(currentThread); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index b28fae476d..7155145826 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -506,6 +506,31 @@ public void testAbort() { verify(producer, never()).commitTransaction(); } + @Test + public void abortFiledOriginalExceptionRethrown() { + MockProducer producer = spy(new MockProducer<>()); + producer.initTransactions(); + producer.abortTransactionException = new RuntimeException("abort failed"); + + ProducerFactory pf = new MockProducerFactory<>((tx, id) -> producer, null); + + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(STRING_KEY_TOPIC); + + assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> + template.executeInTransaction(t -> { + throw new RuntimeException("intentional"); + })) + .withMessage("intentional") + .withStackTraceContaining("abort failed"); + + assertThat(producer.transactionCommitted()).isFalse(); + assertThat(producer.transactionAborted()).isFalse(); + assertThat(producer.closed()).isTrue(); + verify(producer, never()).commitTransaction(); + } + @Test public void testExecuteInTransactionNewInnerTx() { @SuppressWarnings("unchecked")