Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor improvement DelegatingInvocableHandler and AKLE relate #2976

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory) {
this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null);
if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) {
this.resolver = configurableListableBeanFactory.getBeanExpressionResolver();
this.expressionContext = new BeanExpressionContext(configurableListableBeanFactory, null);
}
this.beanResolver = new BeanFactoryResolver(beanFactory);
}
Expand Down Expand Up @@ -275,7 +275,7 @@ public void setGroup(String group) {
* @since 1.1
*/
public boolean isBatchListener() {
return this.batchListener == null ? false : this.batchListener;
return this.batchListener != null && this.batchListener;
}

/**
Expand Down Expand Up @@ -530,11 +530,10 @@ private void setupMessageListener(MessageListenerContainer container,
.acceptIfNotNull(this.correlationHeaderName, adapter::setCorrelationHeaderName);
adapter.setSplitIterables(this.splitIterables);
Object messageListener = adapter;
boolean isBatchListener = isBatchListener();
Assert.state(messageListener != null,
() -> "Endpoint [" + this + "] must provide a non null message listener");
if (this.recordFilterStrategy != null) {
if (isBatchListener) {
if (isBatchListener()) {
if (((MessagingMessageListenerAdapter<K, V>) messageListener).isConsumerRecords()) {
this.logger.warn(() -> "Filter strategy ignored when consuming 'ConsumerRecords' instead of a List"
+ (this.id != null ? " id: " + this.id : ""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,22 +207,22 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(

MessagingMessageListenerAdapter<K, V> listener;
if (isBatchListener()) {
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<K, V>(
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<>(
this.bean, this.method, this.errorHandler);
BatchToRecordAdapter<K, V> batchToRecordAdapter = getBatchToRecordAdapter();
if (batchToRecordAdapter != null) {
messageListener.setBatchToRecordAdapter(batchToRecordAdapter);
}
if (messageConverter instanceof BatchMessageConverter) {
messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
if (messageConverter instanceof BatchMessageConverter batchMessageConverter) {
messageListener.setBatchMessageConverter(batchMessageConverter);
}
listener = messageListener;
}
else {
RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<K, V>(
RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<>(
this.bean, this.method, this.errorHandler);
if (messageConverter instanceof RecordMessageConverter) {
messageListener.setMessageConverter((RecordMessageConverter) messageConverter);
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) {
messageListener.setMessageConverter(recordMessageConverter);
}
listener = messageListener;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,7 +71,7 @@ public void setValidator(Validator validator) {

@Override
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<InvocableHandlerMethod>();
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<>();
InvocableHandlerMethod defaultHandler = null;
for (Method method : this.methods) {
InvocableHandlerMethod handler = getMessageHandlerMethodFactory()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,10 +67,9 @@ public static Object buildConsumerRecordMetadataFromArray(Object... data) {
*/
@Nullable
public static ConsumerRecordMetadata buildConsumerRecordMetadata(Object data) {
if (!(data instanceof ConsumerRecord)) {
if (!(data instanceof ConsumerRecord<?, ?> record)) {
return null;
}
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) data;
return new ConsumerRecordMetadata(new RecordMetadata(new TopicPartition(record.topic(), record.partition()),
record.offset(), 0, record.timestamp(), record.serializedKeySize(),
record.serializedValueSize()), record.timestampType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
this.bean = bean;
this.resolver = beanExpressionResolver;
this.beanExpressionContext = beanExpressionContext;
this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory
? (ConfigurableListableBeanFactory) beanFactory
this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory
? configurableListableBeanFactory
: null;
this.validator = validator == null ? null : new PayloadValidator(validator);
}
Expand All @@ -124,7 +124,7 @@ private void checkSpecial(@Nullable InvocableHandlerMethod handler) {
}
Parameter[] parameters = handler.getMethod().getParameters();
for (Parameter parameter : parameters) {
if (parameter.getType().equals(ConsumerRecordMetadata.class)) {
if (ConsumerRecordMetadata.class.equals(parameter.getType())) {
this.handlerMetadataAware.put(handler, true);
return;
}
Expand All @@ -148,7 +148,7 @@ public Object getBean() {
* or the method raised an exception.
*/
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
Class<? extends Object> payloadClass = message.getPayload().getClass();
Class<?> payloadClass = message.getPayload().getClass();
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
if (this.validator != null && this.defaultHandler != null) {
MethodParameter parameter = this.payloadMethodParameters.get(handler);
Expand All @@ -175,7 +175,7 @@ public Object invoke(Message<?> message, Object... providedArgs) throws Exceptio
* @param payloadClass the payload class.
* @return the handler.
*/
protected InvocableHandlerMethod getHandlerForPayload(Class<? extends Object> payloadClass) {
protected InvocableHandlerMethod getHandlerForPayload(Class<?> payloadClass) {
InvocableHandlerMethod handler = this.cachedHandlers.get(payloadClass);
if (handler == null) {
handler = findHandlerForPayload(payloadClass);
Expand Down Expand Up @@ -246,36 +246,32 @@ protected InvocableHandlerMethod findHandlerForPayload(Class<? extends Object> p
InvocableHandlerMethod result = null;
for (InvocableHandlerMethod handler : this.handlers) {
if (matchHandlerMethod(payloadClass, handler)) {
if (result != null) {
boolean resultIsDefault = result.equals(this.defaultHandler);
if (!handler.equals(this.defaultHandler) && !resultIsDefault) {
if (result != null && !result.equals(this.defaultHandler)) {
if (!handler.equals(this.defaultHandler)) {
throw new KafkaException("Ambiguous methods for payload type: " + payloadClass + ": " +
result.getMethod().getName() + " and " + handler.getMethod().getName());
}
if (!resultIsDefault) {
continue; // otherwise replace the result with the actual match
}
continue; // otherwise replace the result with the actual match
}
result = handler;
}
}
return result != null ? result : this.defaultHandler;
}

protected boolean matchHandlerMethod(Class<? extends Object> payloadClass, InvocableHandlerMethod handler) {
protected boolean matchHandlerMethod(Class<?> payloadClass, InvocableHandlerMethod handler) {
Method method = handler.getMethod();
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
// Single param; no annotation or not @Header
if (parameterAnnotations.length == 1) {
MethodParameter methodParameter = new MethodParameter(method, 0);
if ((methodParameter.getParameterAnnotations().length == 0
|| !methodParameter.hasParameterAnnotation(Header.class))
&& methodParameter.getParameterType().isAssignableFrom(payloadClass)) {
boolean isPayload = assignPayload(methodParameter, payloadClass);
if (isPayload) {
if (this.validator != null) {
this.payloadMethodParameters.put(handler, methodParameter);
}
return true;
}
return isPayload;
Copy link
Contributor Author

@Wzy19930507 Wzy19930507 Dec 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When isPayload is false, will call findCandidate then return null.

}

MethodParameter foundCandidate = findCandidate(payloadClass, method, parameterAnnotations);
Expand All @@ -285,14 +281,12 @@ protected boolean matchHandlerMethod(Class<? extends Object> payloadClass, Invoc
return foundCandidate != null;
}

private MethodParameter findCandidate(Class<? extends Object> payloadClass, Method method,
Annotation[][] parameterAnnotations) {
@Nullable
private MethodParameter findCandidate(Class<?> payloadClass, Method method, Annotation[][] parameterAnnotations) {
MethodParameter foundCandidate = null;
for (int i = 0; i < parameterAnnotations.length; i++) {
MethodParameter methodParameter = new MethodParameter(method, i);
if ((methodParameter.getParameterAnnotations().length == 0
|| !methodParameter.hasParameterAnnotation(Header.class))
&& methodParameter.getParameterType().isAssignableFrom(payloadClass)) {
if (assignPayload(methodParameter, payloadClass)) {
if (foundCandidate != null) {
throw new KafkaException("Ambiguous payload parameter for " + method.toGenericString());
}
Expand All @@ -316,15 +310,20 @@ public boolean hasDefaultHandler() {
return this.defaultHandler != null;
}

private boolean assignPayload(MethodParameter methodParameter, Class<?> payloadClass) {
return (methodParameter.getParameterAnnotations().length == 0
|| !methodParameter.hasParameterAnnotation(Header.class))
&& methodParameter.getParameterType().isAssignableFrom(payloadClass);
}

private static final class PayloadValidator extends PayloadMethodArgumentResolver {

PayloadValidator(Validator validator) {
super(new MessageConverter() { // Required but never used

@Override
@Nullable
public Message<?> toMessage(Object payload, @Nullable
MessageHeaders headers) {
public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
return null;
}

Expand Down