diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBoundElementFactory.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBoundElementFactory.java index d484a5ba7e..7b739dd7ff 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBoundElementFactory.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBoundElementFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,14 +38,14 @@ * @author Marius Bogoevici * @author Soby Chacko */ -class KStreamBoundElementFactory extends AbstractBindingTargetFactory { +public class KStreamBoundElementFactory extends AbstractBindingTargetFactory { private final BindingServiceProperties bindingServiceProperties; private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue; private final EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler; - KStreamBoundElementFactory(BindingServiceProperties bindingServiceProperties, + public KStreamBoundElementFactory(BindingServiceProperties bindingServiceProperties, KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue, EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) { super(KStream.class); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBoundElementFactory.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBoundElementFactory.java index 2dc7a90a7e..a71dafe589 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBoundElementFactory.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBoundElementFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,7 @@ * * @author Soby Chacko */ -class KTableBoundElementFactory extends AbstractBindingTargetFactory { +public class KTableBoundElementFactory extends AbstractBindingTargetFactory { private final BindingServiceProperties bindingServiceProperties; private final EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler; diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/aot/KafkaStreamsBinderRuntimeHints.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/aot/KafkaStreamsBinderRuntimeHints.java new file mode 100644 index 0000000000..d64877e494 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/aot/KafkaStreamsBinderRuntimeHints.java @@ -0,0 +1,90 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka.streams.aot; + +import java.util.stream.Stream; + +import org.apache.kafka.streams.kstream.GlobalKTable; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.rocksdb.RocksDBException; +import org.rocksdb.Status; + +import org.springframework.aop.framework.AopProxyUtils; +import org.springframework.aot.hint.MemberCategory; +import org.springframework.aot.hint.ProxyHints; +import org.springframework.aot.hint.ReflectionHints; +import org.springframework.aot.hint.RuntimeHints; +import org.springframework.aot.hint.RuntimeHintsRegistrar; +import org.springframework.aot.hint.TypeReference; +import org.springframework.cloud.stream.binder.kafka.streams.GlobalKTableBoundElementFactory; +import org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory; +import org.springframework.cloud.stream.binder.kafka.streams.KTableBoundElementFactory; +import org.springframework.lang.Nullable; + + +/** + * {@link RuntimeHintsRegistrar} for the Kafka Streams binder in Spring Cloud Stream. + * + * @author Soby Chacko + * @since 4.1.0 + */ +public class KafkaStreamsBinderRuntimeHints implements RuntimeHintsRegistrar { + + @Override + public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) { + // The following Kafka Streams specific (3rd party) hints will be removed + // once these are added to https://github.com/oracle/graalvm-reachability-metadata + registerKafkaStreamsReflectionHints(hints); + registerKafkaStreamsJniHints(hints); + hints.resources().registerPattern("*/kafka-streams-version.properties"); + + // Binder specific hints + ProxyHints proxyHints = hints.proxies(); + registerSpringJdkProxy(proxyHints, KStreamBoundElementFactory.KStreamWrapper.class, KStream.class); + registerSpringJdkProxy(proxyHints, KTableBoundElementFactory.KTableWrapper.class, KTable.class); + registerSpringJdkProxy(proxyHints, GlobalKTableBoundElementFactory.GlobalKTableWrapper.class, GlobalKTable.class); + } + + private static void registerKafkaStreamsJniHints(RuntimeHints hints) { + hints.jni().registerType(RocksDBException.class, MemberCategory.DECLARED_FIELDS, MemberCategory.INVOKE_DECLARED_METHODS); + hints.jni().registerType(Status.class, MemberCategory.DECLARED_FIELDS, MemberCategory.INVOKE_DECLARED_METHODS); + + hints.resources().registerPattern("librocksdbjni-*"); + + } + + private static void registerKafkaStreamsReflectionHints(RuntimeHints hints) { + ReflectionHints reflectionHints = hints.reflection(); + + Stream.of( + org.apache.kafka.streams.errors.DefaultProductionExceptionHandler.class, + org.apache.kafka.streams.errors.LogAndFailExceptionHandler.class, + org.apache.kafka.streams.processor.FailOnInvalidTimestamp.class, + org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.class, + org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.class) + .forEach(type -> reflectionHints.registerType(type, + builder -> builder.withMembers(MemberCategory.INVOKE_DECLARED_METHODS))); + + reflectionHints.registerType(TypeReference.of("org.apache.kafka.streams.processor.internals.StateDirectory$StateDirectoryProcessFile"), + MemberCategory.INVOKE_DECLARED_METHODS, MemberCategory.DECLARED_FIELDS, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS, MemberCategory.INVOKE_PUBLIC_CONSTRUCTORS); + } + + private static void registerSpringJdkProxy(ProxyHints proxyHints, Class... proxiedInterfaces) { + proxyHints.registerJdkProxy(AopProxyUtils.completeJdkProxyInterfaces(proxiedInterfaces)); + } +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java index 1abe70ee28..dd52863034 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBindableProxyFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,6 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.cloud.stream.binding.AbstractBindableProxyFactory; @@ -70,8 +69,7 @@ */ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFactory implements InitializingBean, BeanFactoryAware { - @Autowired - private StreamFunctionProperties streamFunctionProperties; + private final StreamFunctionProperties streamFunctionProperties; private final ResolvableType[] types; @@ -81,11 +79,12 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto private BeanFactory beanFactory; - public KafkaStreamsBindableProxyFactory(ResolvableType[] types, String functionName, Method method) { + public KafkaStreamsBindableProxyFactory(ResolvableType[] types, String functionName, Method method, StreamFunctionProperties streamFunctionProperties) { super(types[0].getType().getClass()); this.types = types; this.functionName = functionName; this.method = method; + this.streamFunctionProperties = streamFunctionProperties; } @Override @@ -286,8 +285,5 @@ public Map getOutputHolders() { return outputHolders; } - public void setStreamFunctionProperties(StreamFunctionProperties streamFunctionProperties) { - this.streamFunctionProperties = streamFunctionProperties; - } } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionBeanPostProcessor.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionBeanPostProcessor.java index 50537f21e5..724df628f2 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionBeanPostProcessor.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionBeanPostProcessor.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -50,6 +51,10 @@ import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderUtils; import org.springframework.cloud.stream.function.StreamFunctionProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.ResolvableType; import org.springframework.core.type.StandardClassMetadata; import org.springframework.core.type.StandardMethodMetadata; @@ -61,7 +66,7 @@ * * @since 2.2.0 */ -public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean, BeanFactoryAware { +public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean, BeanFactoryAware, ApplicationContextAware { private static final Log LOG = LogFactory.getLog(KafkaStreamsFunctionBeanPostProcessor.class); @@ -76,6 +81,7 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean, private final Map kafkaStreamsOnlyResolvableTypes = new HashMap<>(); private final Map kafakStreamsOnlyMethods = new HashMap<>(); + private ConfigurableApplicationContext applicationContext; public KafkaStreamsFunctionBeanPostProcessor(StreamFunctionProperties streamFunctionProperties) { this.streamFunctionProperties = streamFunctionProperties; @@ -167,13 +173,13 @@ public void afterPropertiesSet() { } private void registerKakaStreamsProxyFactory(BeanDefinitionRegistry registry, String s, ResolvableType[] resolvableTypes, RootBeanDefinition rootBeanDefinition) { - rootBeanDefinition.getConstructorArgumentValues() - .addGenericArgumentValue(resolvableTypes); - rootBeanDefinition.getConstructorArgumentValues() - .addGenericArgumentValue(s); - rootBeanDefinition.getConstructorArgumentValues() - .addGenericArgumentValue(getMethods().get(s)); - registry.registerBeanDefinition("kafkaStreamsBindableProxyFactory-" + s, rootBeanDefinition); + AtomicReference proxyFactory = new AtomicReference<>(); + Method method = getMethods().get(s); + KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory = + new KafkaStreamsBindableProxyFactory(resolvableTypes, s, method, this.streamFunctionProperties); + proxyFactory.set(kafkaStreamsBindableProxyFactory); + ((GenericApplicationContext) this.applicationContext).registerBean("kafkaStreamsBindableProxyFactory-" + s, + KafkaStreamsBindableProxyFactory.class, proxyFactory::get); } private void extractResolvableTypes(String key) { @@ -276,4 +282,9 @@ private boolean isKafkaStreamsTypeFound(Method method) { public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException { this.beanFactory = (ConfigurableListableBeanFactory) beanFactory; } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = (ConfigurableApplicationContext) applicationContext; + } } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/resources/META-INF/spring/aot.factories b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/resources/META-INF/spring/aot.factories new file mode 100644 index 0000000000..dabebbcd59 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/resources/META-INF/spring/aot.factories @@ -0,0 +1 @@ +org.springframework.aot.hint.RuntimeHintsRegistrar=org.springframework.cloud.stream.binder.kafka.streams.aot.KafkaStreamsBinderRuntimeHints diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java index 4abb426143..1a287f0fed 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/DefaultBinderFactory.java @@ -183,7 +183,12 @@ private Binder doGetBinder(String private Binder doGetBinderAOT(String name, Class bindingTargetType) { // If neither name nor default given - return single or fail when > 1 if (!StringUtils.hasText(name) && !StringUtils.hasText(this.defaultBinder)) { - if (this.binderChildContextInitializers.size() == 1) { + boolean kafkaStreamsType = isKafkaStreamsType(bindingTargetType); + if (this.binderChildContextInitializers.size() == 1 || kafkaStreamsType) { + if (kafkaStreamsType) { + String kafkaStreamsBinderSimpleName = getKafkaStreamsBinderSimpleName(bindingTargetType); + return this.getBinderInstance(kafkaStreamsBinderSimpleName); + } String configurationName = this.binderChildContextInitializers.keySet().iterator().next(); this.logger.debug("No specific name or default given - using single available child initializer '" + configurationName + "'"); return this.getBinderInstance(configurationName); @@ -206,6 +211,15 @@ private Binder doGetBinderAOT(Str this.binderChildContextInitializers.keySet()); } + private String getKafkaStreamsBinderSimpleName(Class bindingTargetType) { + return bindingTargetType.getSimpleName().toLowerCase(); + } + + private boolean isKafkaStreamsType(Class bindingTargetType) { + String className = bindingTargetType.getName(); + return className.contains("KStream") || className.contains("KTable"); + } + private Binder doGetBinderConventional(String name, Class bindingTargetType) {