Skip to content

Commit

Permalink
GH-2994, GH-2986 Add documentation explaining batch type conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
olegz committed Sep 23, 2024
1 parent 03dc940 commit 5d881b2
Showing 1 changed file with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,53 @@ public Function<List<Person>, Person> findFirstPerson() {
}
----

==== Batch Type Conversion
Similarly to type conversion for a single message consumer, batching requires every message in the batch to be converted to the requested type. For instance, in the previous example, this type is `Person`.

It is also important to understand that the headers for each message in a batch are provided separately in the `MessageHeaders` of the Message representing the entire batch. These Messages and their corresponding batched headers are created by the respective binders, and their structure may differ. Therefore, you should refer to the binder documentation to understand the structure of batched headers. For Kafka and Rabbit, you can search for `amqp_batchedHeaders` and `kafka_batchConvertedHeaders`, respectively.

In short, if you have a message representing a batch with 5 payloads, the same message will contain a set of headers where each header corresponds to a payload with the same index.

However, what happens if a particular payload fails to convert? In a single message scenario, we simply return null and invoke your method with the unconverted message, which either results in an exception or allows you to handle the raw message, depending on your function's signature.

In the case of batch processing, things are a bit more complex. Returning null for an unconverted payload effectively reduces the batch size. For example, if the original batch contained 5 messages, and 2 failed to convert, the converted batch will only contain 3 messages. This might be acceptable, but what about the corresponding batched headers? There will still be 5 headers, as they were created when the binder formed the initial batch. This discrepancy makes it difficult to correlate the headers with their corresponding payloads.

To address this issue, we provide the MessageConverterHelper interface.

[source, java]
----
public interface MessageConverterHelper {
/**
* This method will be called by the framework in cases when a message failed to convert.
* It allows you to signal to the framework if such failure should be considered fatal or not.
*
* @param message failed message
* @return true if conversion failure must be considered fatal.
*/
default boolean shouldFailIfCantConvert(Message<?> message) {
return false;
}
/**
* This method will be called by the framework in cases when a single message within batch of messages failed to convert.
* It provides a place for providing post-processing logic before message converter returns.
*
* @param message failed message.
* @param index index of failed message within the batch
*/
default void postProcessBatchMessageOnFailure(Message<?> message, int index) {
}
}
----

If implemented, this interface is called by the framework’s message converter logic to perform post-processing on the batched message when a particular payload cannot be converted.

The default implementations for Kafka and Rabbit automatically remove the corresponding batch headers to maintain the correlation between batched payloads and their headers. However, you can provide your own implementation and register it as a bean if you need to add custom behavior for such cases.

Additionally, the interface offers a method that allows for more deterministic handling of conversion failures. By default, this method returns `false`, but you can customize the implementation if you prefer to fail the entire process when a conversion error occurs.


[[batch-producers]]
=== Batch Producers

Expand Down

0 comments on commit 5d881b2

Please sign in to comment.