Skip to content

Commit

Permalink
Docs changes for the DltAwareProcessor hierarchy
Browse files Browse the repository at this point in the history
  • Loading branch information
sobychacko committed Oct 17, 2023
1 parent a74ea9f commit 7f89964
Showing 1 changed file with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ However, you still can configure production exception handlers using the `Stream

When it comes to handling errors from application code, i.e. from the business logic execution, it is usually up to the application to handle that.
Because, the Kafka Streams binder does not have a way to interfere with the application code.
However, to make things a bit easier for the application, the binder provides a convenient `DltAwareProcessor`, using which, you can dictate how you want to handle the application level errors.
However, to make things a bit easier for the application, the binder provides a convenient `RecordRecoverableProcessor`, using which, you can dictate how you want to handle the application level errors.

Consider the following code.

Expand All @@ -150,8 +150,10 @@ public java.util.function.Function<KStream<String, String>, KStream<String, Stri
```

If the business code inside your `map` call above throws an exception, it is your responsibility to handle that error.
This is where `DltAwareProcessor` becomes handy.
This is where `RecordRecoverableProcessor` becomes handy.
By default, `RecordRecoverableProcessor`, will simply log the error and let the application move on.
Let's say that you want to publish the failed record to a DLT, rather than handling it within the application.
In that case, you must use a custom implementation of `RecordRecoverableProcessor` called `DltAwareProcessor`.
Here is how you can do that.

```
Expand All @@ -166,29 +168,30 @@ public java.util.function.Function<KStream<String, String>, KStream<String, Stri

The business logic code from the original `map` call now has been moved as part of `KStream#process` method call, which takes a `ProcessorSupplier`.
We, then, pass in the custom `DltAwareProcessor,` which is capable to publishing to a DLT.
The constructor for `DltAwareProcessor` above takes three parameters - a `Function` that takes the input record and then the business logic operation as part of the `Function` body, the DLT topic, and finally a `DltPublishingContext`. When the `Function`'s lambda expression throws an exception, the `DltAwareProcessor` will send the input record to a DLT. The `DltPublishingContext` provides `DltAwareProcessor` the necessary publishing infrastructure beans.
The constructor for `DltAwareProcessor` above takes three parameters - a `Function` that takes the input record and then the business logic operation as part of the `Function` body, the DLT topic, and finally a `DltPublishingContext`.
When the `Function`'s lambda expression throws an exception, the `DltAwareProcessor` will send the input record to a DLT.
The `DltPublishingContext` provides `DltAwareProcessor` the necessary publishing infrastructure beans.
The `DltPublishingContext` is autoconfigured by the binder, so that you can inject directly this into the application.

If you do not want the binder to publish failed records to a DLT, then you can provide your own recoverer as a `BiConsumer` that takes the input `Record` and the exception as arguments.
If you do not want the binder to publish failed records to a DLT, then you must use the `RecordRecoverableProcessor` directly instead of the `DltAwareProcessor`.
You can provide your own recoverer as a `BiConsumer` that takes the input `Record` and the exception as arguments.
Assume a scenario, in which you do not want to send the record to the DLT, but simply log the message and move on.
It is convenient, if we can override the default recovery mechanism provided by the `DltAwareProcessor`.

Here is an example.
Below an example of how you can accomplish that.

```
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new DltAwareProcessor<>(record -> {
.process(() -> new RecordRecoverableProcessor<>(record -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// log the message
// Handle the record
}));
}
```

In this case, when the record fails, the `DltAwareProcessor`, instead of using its built-in recoverer which publishes to a DLT, uses the user provided recoverer which is a `BiConsumer` that takes the failed record and the exception thrown as arguments.
In this case, when the record fails, the `RecordRecoverableProcessor`, uses the user provided recoverer which is a `BiConsumer` that takes the failed record and the exception thrown as arguments.

=== Handling Record Keys in DltAwareProcessor

Expand Down

0 comments on commit 7f89964

Please sign in to comment.