Skip to content

Commit

Permalink
GH-2823: Messages routed to the wrong consumer
Browse files Browse the repository at this point in the history
 - When event-type routing is enabled in Kafka Streams binder
   and conurrency > 1 is used, messages are occasionally getting
   dispatched to the wrong consumer causing CCE. This is due
   to a race condition caused by a shared resource across threads.
   Fixing the issue by introducing a ThreadLocal variable.

Resolves #2823
  • Loading branch information
sobychacko committed Sep 29, 2023
1 parent db5b717 commit 5ea391b
Showing 1 changed file with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -107,6 +106,8 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application

protected ConfigurableApplicationContext applicationContext;

private static final ThreadLocal<Boolean> matchedRecordThreadLocal = ThreadLocal.withInitial(() -> false);

public AbstractKafkaStreamsBinderProcessor(BindingServiceProperties bindingServiceProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
Expand Down Expand Up @@ -446,13 +447,18 @@ protected Serde<?> getValueSerde(String inboundName, KafkaStreamsConsumerPropert
//Check to see if event type based routing is enabled.
//See this issue for more context: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1003
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
AtomicBoolean matched = new AtomicBoolean();
AtomicReference<String> topicObject = new AtomicReference<>();
AtomicReference<Headers> headersObject = new AtomicReference<>();
// Processor to retrieve the header value.
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched, topicObject, headersObject));
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matchedRecordThreadLocal, topicObject, headersObject));
// Branching based on event type match.
final KStream<?, ?>[] branch = stream.branch((key, value) -> matched.getAndSet(false));
final KStream<?, ?>[] branch = stream.branch((key, value) -> {
if (matchedRecordThreadLocal.get()) {
matchedRecordThreadLocal.set(false);
return true;
}
return false;
});
// Deserialize if we have a branch from above.
final KStream<?, Object> deserializedKStream = !kafkaStreamsConsumerProperties.isUseConfiguredSerdeWhenRoutingEvents() ?
branch[0].mapValues(value -> valueSerde.deserializer().deserialize(
Expand Down Expand Up @@ -579,17 +585,22 @@ private <K, V> GlobalKTable<K, V> materializedAsGlobalKTable(
: streamsBuilder.table(bindingDestination,
consumed);
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
AtomicBoolean matched = new AtomicBoolean();
AtomicReference<String> topicObject = new AtomicReference<>();
AtomicReference<Headers> headersObject = new AtomicReference<>();

final KStream<?, ?> stream = kTable.toStream();

// Processor to retrieve the header value.
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched, topicObject, headersObject));
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matchedRecordThreadLocal, topicObject, headersObject));
// Branching based on event type match.
final Map<String, ? extends KStream<?, ?>> stringKStreamMap = stream.split()
.branch((key, value) -> matched.getAndSet(false))
.branch((key, value) -> {
if (matchedRecordThreadLocal.get()) {
matchedRecordThreadLocal.set(false);
return true;
}
return false;
})
.noDefaultBranch();
final KStream<?, ?>[] branch = stringKStreamMap.values().toArray(new KStream[0]);
// Deserialize if we have a branch from above.
Expand Down Expand Up @@ -621,7 +632,7 @@ private <K, V> Consumed<K, V> getConsumed(KafkaStreamsConsumerProperties kafkaSt
}

private <K, V> Processor<K, V, Void, Void> eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
AtomicBoolean matched, AtomicReference<String> topicObject, AtomicReference<Headers> headersObject) {
ThreadLocal<Boolean> matchedValHolder, AtomicReference<String> topicObject, AtomicReference<Headers> headersObject) {
return new Processor<>() {

org.apache.kafka.streams.processor.api.ProcessorContext<?, ?> context;
Expand All @@ -647,7 +658,7 @@ public void process(Record<K, V> record) {
final String[] eventTypesFromBinding = StringUtils.commaDelimitedListToStringArray(kafkaStreamsConsumerProperties.getEventTypes());
for (String eventTypeFromBinding : eventTypesFromBinding) {
if (eventTypeFromHeader.equals(eventTypeFromBinding)) {
matched.set(true);
matchedValHolder.set(true);
break;
}
}
Expand Down

0 comments on commit 5ea391b

Please sign in to comment.