Skip to content

Commit

Permalink
GH-2862: Add Option to Log Recovery to DLPR (#2869)
Browse files Browse the repository at this point in the history
* GH-2862: Add Option to Log Recovery to DLPR

Resolves #2862

* add option logRecoveryRecord in `annotation-error-handling.adoc`

Fix Javadoc for CommonErrorHandler

Delete unused code in SerializationUtils

* Fix since.

---------

Co-authored-by: Zhiyang.Wang1 <[email protected]>
Co-authored-by: Gary Russell <[email protected]>
  • Loading branch information
3 people authored Oct 31, 2023
1 parent 6166da3 commit 9794c58
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,8 @@ Starting with version 2.7, the recoverer checks that the partition selected by t
If the partition is not present, the partition in the `ProducerRecord` is set to `null`, allowing the `KafkaProducer` to select the partition.
You can disable this check by setting the `verifyPartition` property to `false`.

Starting with version 3.1, setting the `logRecoveryRecord` property to `true` will log the recovery record and exception.

[[dlpr-headers]]
== Managing Dead Letter Record Headers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ default boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record
}

/**
* Handle the exception for a record listener when {@link #remainingRecords()} returns
* Handle the exception for a record listener when {@link #seeksAfterHandling()} returns
* true. The failed record and all the remaining records from the poll are passed in.
* Usually used when the error handler performs seeks so that the remaining records
* will be redelivered on the next poll.
* @param thrownException the exception.
* @param records the remaining records including the one that failed.
* @param consumer the consumer.
* @param container the container.
* @see #remainingRecords()
* @see #seeksAfterHandling()
*/
default void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement

private boolean skipSameTopicFatalExceptions = true;

private boolean logRecoveryRecord = false;

private ExceptionHeadersCreator exceptionHeadersCreator = this::addExceptionInfoHeaders;

private Supplier<HeaderNames> headerNamesSupplier = () -> HeaderNames.Builder
Expand Down Expand Up @@ -400,6 +402,15 @@ public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions
this.skipSameTopicFatalExceptions = skipSameTopicFatalExceptions;
}

/**
* Set to true if you want to log recovery record and exception.
* @param logRecoveryRecord true to log record and exception.
* @since 3.1
*/
public void setLogRecoveryRecord(boolean logRecoveryRecord) {
this.logRecoveryRecord = logRecoveryRecord;
}

/**
* Set a {@link ExceptionHeadersCreator} implementation to completely take over
* setting the exception headers in the output record. Disables all headers that are
Expand Down Expand Up @@ -503,6 +514,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
+ " and the destination resolver routed back to the same topic");
return;
}
if (this.logRecoveryRecord) {
this.logger.info(exception, () -> "Recovery record " + KafkaUtils.format(record));
}
if (consumer != null && this.verifyPartition) {
tp = checkPartition(tp, consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public DeadLetterPublishingRecoverer create(String mainListenerId) {
recoverer.setAppendOriginalHeaders(false);
recoverer.setThrowIfNoDestinationReturned(false);
recoverer.setSkipSameTopicFatalExceptions(false);
recoverer.setLogRecoveryRecord(false);
this.recovererCustomizer.accept(recoverer);
this.fatalExceptions.forEach(recoverer::addNotRetryableExceptions);
this.nonFatalExceptions.forEach(recoverer::removeClassification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ public static DeserializationException getExceptionFromHeader(final ConsumerReco
return null;
}
if (header != null) {
byte[] value = header.value();
DeserializationException exception = byteArrayToDeserializationException(logger, header);
if (exception != null) {
Headers headers = new RecordHeaders(record.headers().toArray());
Expand Down

0 comments on commit 9794c58

Please sign in to comment.