Skip to content

Commit

Permalink
AOT required changes in Kafka Streams binder
Browse files Browse the repository at this point in the history
  • Loading branch information
sobychacko committed Oct 13, 2023
1 parent 4b53c83 commit 04e27d1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,5 +285,9 @@ public String getFunctionName() {
public Map<String, BoundTargetHolder> getOutputHolders() {
return outputHolders;
}

public void setStreamFunctionProperties(StreamFunctionProperties streamFunctionProperties) {
this.streamFunctionProperties = streamFunctionProperties;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderUtils;
import org.springframework.cloud.stream.function.StreamFunctionProperties;
import org.springframework.core.ResolvableType;
import org.springframework.core.type.StandardClassMetadata;
import org.springframework.core.type.StandardMethodMetadata;
import org.springframework.lang.NonNull;
import org.springframework.util.ClassUtils;

/**
* @author Soby Chacko
Expand Down Expand Up @@ -122,6 +123,7 @@ public void afterPropertiesSet() {
ResolvableType[] resolvableTypes = new ResolvableType[]{getResolvableTypes().get(s)};
RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(
KafkaStreamsBindableProxyFactory.class);
rootBeanDefinition.getPropertyValues().add("streamFunctionProperties", this.streamFunctionProperties);
registerKakaStreamsProxyFactory(registry, s, resolvableTypes, rootBeanDefinition);
}
}
Expand All @@ -146,6 +148,7 @@ public void afterPropertiesSet() {
if (!nonKafkaStreamsFunctionsFound) {
RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(
KafkaStreamsBindableProxyFactory.class);
rootBeanDefinition.getPropertyValues().add("streamFunctionProperties", this.streamFunctionProperties);
registerKakaStreamsProxyFactory(registry, derivedNameFromComposed, resolvableTypes, rootBeanDefinition);
}
}
Expand All @@ -155,6 +158,7 @@ public void afterPropertiesSet() {
ResolvableType[] resolvableTypes = new ResolvableType[]{getResolvableTypes().get(functionUnit)};
RootBeanDefinition rootBeanDefinition = new RootBeanDefinition(
KafkaStreamsBindableProxyFactory.class);
rootBeanDefinition.getPropertyValues().add("streamFunctionProperties", this.streamFunctionProperties);
registerKakaStreamsProxyFactory(registry, functionUnit, resolvableTypes, rootBeanDefinition);
}
}
Expand All @@ -173,55 +177,50 @@ private void registerKakaStreamsProxyFactory(BeanDefinitionRegistry registry, St
}

private void extractResolvableTypes(String key) {
final Class<?> classObj = ClassUtils.resolveClassName(((AnnotatedBeanDefinition)
this.beanFactory.getBeanDefinition(key))
.getMetadata().getClassName(),
ClassUtils.getDefaultClassLoader());
BeanDefinition beanDefinition = this.beanFactory.getBeanDefinition(key);
ResolvableType resolvableType = null;
Class<?> rawClass = null;
try {
Method[] methods = classObj.getDeclaredMethods();
Optional<Method> functionalBeanMethods = KafkaStreamsBinderUtils.findMethodWithName(key, methods);
if (functionalBeanMethods.isEmpty()) {
methods = classObj.getMethods(); // check the inherited methods
functionalBeanMethods = KafkaStreamsBinderUtils.findMethodWithName(key, methods);
}
if (functionalBeanMethods.isEmpty()) {
final BeanDefinition beanDefinition = this.beanFactory.getBeanDefinition(key);
final String factoryMethodName = beanDefinition.getFactoryMethodName();
functionalBeanMethods = KafkaStreamsBinderUtils.findMethodWithName(factoryMethodName, methods);
}

if (functionalBeanMethods.isPresent()) {
Method method = functionalBeanMethods.get();
ResolvableType resolvableType = ResolvableType.forMethodReturnType(method, classObj);
final Class<?> rawClass = resolvableType.getGeneric(0).getRawClass();
if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) {
if (onlySingleFunction) {
resolvableTypeMap.put(key, resolvableType);
}
else {
discoverOnlyKafkaStreamsResolvableTypes(key, resolvableType);
}
if (beanDefinition instanceof AnnotatedBeanDefinition annotatedBeanDefinition) {
StandardMethodMetadata factoryMethodMetadata = (StandardMethodMetadata) annotatedBeanDefinition.getFactoryMethodMetadata();
if (factoryMethodMetadata != null) {
Method introspectedMethod = factoryMethodMetadata.getIntrospectedMethod();
resolvableType = ResolvableType.forMethodReturnType(introspectedMethod);
rawClass = resolvableType.getGeneric(0).getRawClass();
}
}
else {
Optional<Method> componentBeanMethods = Arrays.stream(methods)
else {
Class<?> introspectedClass = ((StandardClassMetadata) annotatedBeanDefinition.getMetadata()).getIntrospectedClass();
Method[] methods = introspectedClass.getDeclaredMethods();

Optional<Method> componentBeanMethods = Arrays.stream(methods)
.filter(m -> m.getName().equals("apply") && isKafkaStreamsTypeFound(m) ||
m.getName().equals("accept") && isKafkaStreamsTypeFound(m)).findFirst();
if (componentBeanMethods.isPresent()) {
Method method = componentBeanMethods.get();
final ResolvableType resolvableType = ResolvableType.forMethodParameter(method, 0);
final Class<?> rawClass = resolvableType.getRawClass();
if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) {
m.getName().equals("accept") && isKafkaStreamsTypeFound(m)).findFirst();
if (componentBeanMethods.isPresent()) {
Method method = componentBeanMethods.get();
resolvableType = ResolvableType.forMethodParameter(method, 0);
rawClass = resolvableType.getRawClass();
if (onlySingleFunction) {
resolvableTypeMap.put(key, resolvableType);
this.methods.put(key, method);
}
else {
discoverOnlyKafkaStreamsResolvableTypesAndMethods(key, resolvableType, method);
kafakStreamsOnlyMethods.put(key, method);
}
}
}
}
else {
resolvableType = beanDefinition.getResolvableType();
rawClass = resolvableType.getGeneric(0).getRawClass();
}

if (rawClass == KStream.class || rawClass == KTable.class || rawClass == GlobalKTable.class) {
if (onlySingleFunction) {
resolvableTypeMap.put(key, resolvableType);
}
else {
discoverOnlyKafkaStreamsResolvableTypes(key, resolvableType);
}
}
}
catch (Exception e) {
LOG.error("Function activation issues while mapping the function: " + key, e);
Expand Down

0 comments on commit 04e27d1

Please sign in to comment.