From 5d881b2adb732b16e50c4dbe83876b2a18a83af2 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 23 Sep 2024 16:47:36 +0200 Subject: [PATCH] GH-2994, GH-2986 Add documentation explaining batch type conversion --- .../producing-and-consuming-messages.adoc | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/docs/modules/ROOT/pages/spring-cloud-stream/producing-and-consuming-messages.adoc b/docs/modules/ROOT/pages/spring-cloud-stream/producing-and-consuming-messages.adoc index 584c16f81..d7eab0934 100644 --- a/docs/modules/ROOT/pages/spring-cloud-stream/producing-and-consuming-messages.adoc +++ b/docs/modules/ROOT/pages/spring-cloud-stream/producing-and-consuming-messages.adoc @@ -795,6 +795,53 @@ public Function, Person> findFirstPerson() { } ---- +==== Batch Type Conversion +Similarly to type conversion for a single message consumer, batching requires every message in the batch to be converted to the requested type. For instance, in the previous example, this type is `Person`. + +It is also important to understand that the headers for each message in a batch are provided separately in the `MessageHeaders` of the Message representing the entire batch. These Messages and their corresponding batched headers are created by the respective binders, and their structure may differ. Therefore, you should refer to the binder documentation to understand the structure of batched headers. For Kafka and Rabbit, you can search for `amqp_batchedHeaders` and `kafka_batchConvertedHeaders`, respectively. + +In short, if you have a message representing a batch with 5 payloads, the same message will contain a set of headers where each header corresponds to a payload with the same index. + +However, what happens if a particular payload fails to convert? In a single message scenario, we simply return null and invoke your method with the unconverted message, which either results in an exception or allows you to handle the raw message, depending on your function's signature. + +In the case of batch processing, things are a bit more complex. Returning null for an unconverted payload effectively reduces the batch size. For example, if the original batch contained 5 messages, and 2 failed to convert, the converted batch will only contain 3 messages. This might be acceptable, but what about the corresponding batched headers? There will still be 5 headers, as they were created when the binder formed the initial batch. This discrepancy makes it difficult to correlate the headers with their corresponding payloads. + +To address this issue, we provide the MessageConverterHelper interface. + +[source, java] +---- +public interface MessageConverterHelper { + + /** + * This method will be called by the framework in cases when a message failed to convert. + * It allows you to signal to the framework if such failure should be considered fatal or not. + * + * @param message failed message + * @return true if conversion failure must be considered fatal. + */ + default boolean shouldFailIfCantConvert(Message message) { + return false; + } + + /** + * This method will be called by the framework in cases when a single message within batch of messages failed to convert. + * It provides a place for providing post-processing logic before message converter returns. + * + * @param message failed message. + * @param index index of failed message within the batch + */ + default void postProcessBatchMessageOnFailure(Message message, int index) { + } +} +---- + +If implemented, this interface is called by the framework’s message converter logic to perform post-processing on the batched message when a particular payload cannot be converted. + +The default implementations for Kafka and Rabbit automatically remove the corresponding batch headers to maintain the correlation between batched payloads and their headers. However, you can provide your own implementation and register it as a bean if you need to add custom behavior for such cases. + +Additionally, the interface offers a method that allows for more deterministic handling of conversion failures. By default, this method returns `false`, but you can customize the implementation if you prefer to fail the entire process when a conversion error occurs. + + [[batch-producers]] === Batch Producers