Skip to content

Commit

Permalink
AOT/Native related changes in Kafka Streams binder
Browse files Browse the repository at this point in the history
 - Add KafkaStreams related native hints (Note: Once we add the
   Kafka Streams specific 3rd party hints to the reachability
   metadata repository, we will remove those hints from the binder)
 - Related changes in DefaultBinderFactory
 - KafkaStreamsBindableProxyFactory changes triggered by AOT/native

Resolves #2323

Addressing PR review comments
Resolves #2832
  • Loading branch information
sobychacko authored and olegz committed Oct 20, 2023
1 parent 6af3206 commit e8431e4
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,14 +38,14 @@
* @author Marius Bogoevici
* @author Soby Chacko
*/
class KStreamBoundElementFactory extends AbstractBindingTargetFactory<KStream> {
public class KStreamBoundElementFactory extends AbstractBindingTargetFactory<KStream> {

private final BindingServiceProperties bindingServiceProperties;

private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private final EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler;

KStreamBoundElementFactory(BindingServiceProperties bindingServiceProperties,
public KStreamBoundElementFactory(BindingServiceProperties bindingServiceProperties,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler) {
super(KStream.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,7 +35,7 @@
*
* @author Soby Chacko
*/
class KTableBoundElementFactory extends AbstractBindingTargetFactory<KTable> {
public class KTableBoundElementFactory extends AbstractBindingTargetFactory<KTable> {

private final BindingServiceProperties bindingServiceProperties;
private final EncodingDecodingBindAdviceHandler encodingDecodingBindAdviceHandler;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.kafka.streams.aot;

import java.util.stream.Stream;

import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.rocksdb.RocksDBException;
import org.rocksdb.Status;

import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aot.hint.MemberCategory;
import org.springframework.aot.hint.ProxyHints;
import org.springframework.aot.hint.ReflectionHints;
import org.springframework.aot.hint.RuntimeHints;
import org.springframework.aot.hint.RuntimeHintsRegistrar;
import org.springframework.aot.hint.TypeReference;
import org.springframework.cloud.stream.binder.kafka.streams.GlobalKTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KTableBoundElementFactory;
import org.springframework.lang.Nullable;


/**
* {@link RuntimeHintsRegistrar} for the Kafka Streams binder in Spring Cloud Stream.
*
* @author Soby Chacko
* @since 4.1.0
*/
public class KafkaStreamsBinderRuntimeHints implements RuntimeHintsRegistrar {

@Override
public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) {
// The following Kafka Streams specific (3rd party) hints will be removed
// once these are added to https://github.com/oracle/graalvm-reachability-metadata
registerKafkaStreamsReflectionHints(hints);
registerKafkaStreamsJniHints(hints);
hints.resources().registerPattern("*/kafka-streams-version.properties");

// Binder specific hints
ProxyHints proxyHints = hints.proxies();
registerSpringJdkProxy(proxyHints, KStreamBoundElementFactory.KStreamWrapper.class, KStream.class);
registerSpringJdkProxy(proxyHints, KTableBoundElementFactory.KTableWrapper.class, KTable.class);
registerSpringJdkProxy(proxyHints, GlobalKTableBoundElementFactory.GlobalKTableWrapper.class, GlobalKTable.class);
}

private static void registerKafkaStreamsJniHints(RuntimeHints hints) {
hints.jni().registerType(RocksDBException.class, MemberCategory.DECLARED_FIELDS, MemberCategory.INVOKE_DECLARED_METHODS);
hints.jni().registerType(Status.class, MemberCategory.DECLARED_FIELDS, MemberCategory.INVOKE_DECLARED_METHODS);

hints.resources().registerPattern("librocksdbjni-*");

}

private static void registerKafkaStreamsReflectionHints(RuntimeHints hints) {
ReflectionHints reflectionHints = hints.reflection();

Stream.of(
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler.class,
org.apache.kafka.streams.errors.LogAndFailExceptionHandler.class,
org.apache.kafka.streams.processor.FailOnInvalidTimestamp.class,
org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.class,
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.class)
.forEach(type -> reflectionHints.registerType(type,
builder -> builder.withMembers(MemberCategory.INVOKE_DECLARED_METHODS)));

reflectionHints.registerType(TypeReference.of("org.apache.kafka.streams.processor.internals.StateDirectory$StateDirectoryProcessFile"),
MemberCategory.INVOKE_DECLARED_METHODS, MemberCategory.DECLARED_FIELDS, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS, MemberCategory.INVOKE_PUBLIC_CONSTRUCTORS);
}

private static void registerSpringJdkProxy(ProxyHints proxyHints, Class<?>... proxiedInterfaces) {
proxyHints.registerJdkProxy(AopProxyUtils.completeJdkProxyInterfaces(proxiedInterfaces));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,7 +37,6 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.cloud.stream.binding.AbstractBindableProxyFactory;
Expand Down Expand Up @@ -70,8 +69,7 @@
*/
public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFactory implements InitializingBean, BeanFactoryAware {

@Autowired
private StreamFunctionProperties streamFunctionProperties;
private final StreamFunctionProperties streamFunctionProperties;

private final ResolvableType[] types;

Expand All @@ -81,11 +79,12 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto

private BeanFactory beanFactory;

public KafkaStreamsBindableProxyFactory(ResolvableType[] types, String functionName, Method method) {
public KafkaStreamsBindableProxyFactory(ResolvableType[] types, String functionName, Method method, StreamFunctionProperties streamFunctionProperties) {
super(types[0].getType().getClass());
this.types = types;
this.functionName = functionName;
this.method = method;
this.streamFunctionProperties = streamFunctionProperties;
}

@Override
Expand Down Expand Up @@ -286,8 +285,5 @@ public Map<String, BoundTargetHolder> getOutputHolders() {
return outputHolders;
}

public void setStreamFunctionProperties(StreamFunctionProperties streamFunctionProperties) {
this.streamFunctionProperties = streamFunctionProperties;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand All @@ -50,6 +51,10 @@
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderUtils;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.core.type.StandardClassMetadata;
import org.springframework.core.type.StandardMethodMetadata;
Expand All @@ -61,7 +66,7 @@
*
* @since 2.2.0
*/
public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean, BeanFactoryAware {
public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean, BeanFactoryAware, ApplicationContextAware {

private static final Log LOG = LogFactory.getLog(KafkaStreamsFunctionBeanPostProcessor.class);

Expand All @@ -76,6 +81,7 @@ public class KafkaStreamsFunctionBeanPostProcessor implements InitializingBean,

private final Map<String, ResolvableType> kafkaStreamsOnlyResolvableTypes = new HashMap<>();
private final Map<String, Method> kafakStreamsOnlyMethods = new HashMap<>();
private ConfigurableApplicationContext applicationContext;

public KafkaStreamsFunctionBeanPostProcessor(StreamFunctionProperties streamFunctionProperties) {
this.streamFunctionProperties = streamFunctionProperties;
Expand Down Expand Up @@ -167,13 +173,13 @@ public void afterPropertiesSet() {
}

private void registerKakaStreamsProxyFactory(BeanDefinitionRegistry registry, String s, ResolvableType[] resolvableTypes, RootBeanDefinition rootBeanDefinition) {
rootBeanDefinition.getConstructorArgumentValues()
.addGenericArgumentValue(resolvableTypes);
rootBeanDefinition.getConstructorArgumentValues()
.addGenericArgumentValue(s);
rootBeanDefinition.getConstructorArgumentValues()
.addGenericArgumentValue(getMethods().get(s));
registry.registerBeanDefinition("kafkaStreamsBindableProxyFactory-" + s, rootBeanDefinition);
AtomicReference<KafkaStreamsBindableProxyFactory> proxyFactory = new AtomicReference<>();
Method method = getMethods().get(s);
KafkaStreamsBindableProxyFactory kafkaStreamsBindableProxyFactory =
new KafkaStreamsBindableProxyFactory(resolvableTypes, s, method, this.streamFunctionProperties);
proxyFactory.set(kafkaStreamsBindableProxyFactory);
((GenericApplicationContext) this.applicationContext).registerBean("kafkaStreamsBindableProxyFactory-" + s,
KafkaStreamsBindableProxyFactory.class, proxyFactory::get);
}

private void extractResolvableTypes(String key) {
Expand Down Expand Up @@ -276,4 +282,9 @@ private boolean isKafkaStreamsTypeFound(Method method) {
public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException {
this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.aot.hint.RuntimeHintsRegistrar=org.springframework.cloud.stream.binder.kafka.streams.aot.KafkaStreamsBinderRuntimeHints
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,12 @@ private <T> Binder<T, ConsumerProperties, ProducerProperties> doGetBinder(String
private <T> Binder<T, ConsumerProperties, ProducerProperties> doGetBinderAOT(String name, Class<? extends T> bindingTargetType) {
// If neither name nor default given - return single or fail when > 1
if (!StringUtils.hasText(name) && !StringUtils.hasText(this.defaultBinder)) {
if (this.binderChildContextInitializers.size() == 1) {
boolean kafkaStreamsType = isKafkaStreamsType(bindingTargetType);
if (this.binderChildContextInitializers.size() == 1 || kafkaStreamsType) {
if (kafkaStreamsType) {
String kafkaStreamsBinderSimpleName = getKafkaStreamsBinderSimpleName(bindingTargetType);
return this.getBinderInstance(kafkaStreamsBinderSimpleName);
}
String configurationName = this.binderChildContextInitializers.keySet().iterator().next();
this.logger.debug("No specific name or default given - using single available child initializer '" + configurationName + "'");
return this.getBinderInstance(configurationName);
Expand All @@ -206,6 +211,15 @@ private <T> Binder<T, ConsumerProperties, ProducerProperties> doGetBinderAOT(Str
this.binderChildContextInitializers.keySet());
}

private <T> String getKafkaStreamsBinderSimpleName(Class<? extends T> bindingTargetType) {
return bindingTargetType.getSimpleName().toLowerCase();
}

private <T> boolean isKafkaStreamsType(Class<? extends T> bindingTargetType) {
String className = bindingTargetType.getName();
return className.contains("KStream") || className.contains("KTable");
}

private <T> Binder<T, ConsumerProperties, ProducerProperties> doGetBinderConventional(String name,
Class<? extends T> bindingTargetType) {

Expand Down

0 comments on commit e8431e4

Please sign in to comment.