Skip to content

Commit

Permalink
Fix NPE in MessagingMessageListenerAdapter
Browse files Browse the repository at this point in the history
Related to #1189

* Some other code clean up including deprecation warning
  • Loading branch information
artembilan committed Feb 12, 2024
1 parent c4e9d32 commit e6affce
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private final KafkaListenerErrorHandler errorHandler;

@Nullable
private HandlerAdapter handlerMethod;

private boolean isConsumerRecordList;
Expand Down Expand Up @@ -216,7 +217,7 @@ public void setMessagingConverter(SmartMessageConverter messageConverter) {
}

/**
* Returns the inferred type for conversion or, if null, the
* Return the inferred type for conversion or, if null, the
* {@link #setFallbackType(Class) fallbackType}.
* @return the type.
*/
Expand Down Expand Up @@ -245,7 +246,7 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) {
}

public boolean isAsyncReplies() {
return this.handlerMethod.isAsyncReplies();
return this.handlerMethod != null && this.handlerMethod.isAsyncReplies();
}

protected boolean isConsumerRecordList() {
Expand All @@ -262,7 +263,7 @@ public boolean isConversionNeeded() {

/**
* Set the topic to which to send any result from the method invocation.
* May be a SpEL expression {@code !{...}} evaluated at runtime.
* Maybe a SpEL expression {@code !{...}} evaluated at runtime.
* @param replyTopicParam the topic or expression.
* @since 2.0
*/
Expand Down Expand Up @@ -334,7 +335,7 @@ protected boolean isSplitIterables() {
}

/**
* Set to false to disable splitting {@link Iterable} reply values into separate
* Set to {@code false} to disable splitting {@link Iterable} reply values into separate
* records.
* @param splitIterables false to disable; default true.
* @since 2.3.5
Expand Down Expand Up @@ -406,6 +407,7 @@ protected final Object invokeHandler(Object data, @Nullable Acknowledgment ackno
if (ack == null && this.noOpAck) {
ack = NO_OP_ACK;
}
Assert.notNull(this.handlerMethod, "the 'handlerMethod' must not be null");
try {
if (data instanceof List && !this.isConsumerRecordList) {
return this.handlerMethod.invoke(message, ack, consumer);
Expand Down Expand Up @@ -545,7 +547,9 @@ private String evaluateTopic(Object request, Object source, Object result, @Null
* @since 2.1.3
*/
@SuppressWarnings("unchecked")
protected void sendResponse(Object result, String topic, @Nullable Object source, boolean returnTypeMessage) {
protected void sendResponse(Object result, @Nullable String topic, @Nullable Object source,
boolean returnTypeMessage) {

if (!returnTypeMessage && topic == null) {
this.logger.debug(() -> "No replyTopic to handle the reply: " + result);
}
Expand Down Expand Up @@ -622,7 +626,7 @@ private void sendReplyForMessageSource(Object result, String topic, Message<?> s
})
.filter(e -> this.replyHeadersConfigurer.shouldCopy(e.getKey(), e.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (headersToCopy.size() > 0) {
if (!headersToCopy.isEmpty()) {
builder.copyHeaders(headersToCopy);
}
headersToCopy = this.replyHeadersConfigurer.additionalHeaders();
Expand All @@ -637,7 +641,9 @@ private void sendReplyForMessageSource(Object result, String topic, Message<?> s
this.replyTemplate.send(builder.build());
}

protected void asyncSuccess(@Nullable Object result, String replyTopic, Message<?> source, boolean returnTypeMessage) {
protected void asyncSuccess(@Nullable Object result, String replyTopic, Message<?> source,
boolean returnTypeMessage) {

if (result == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Async result is null, ignoring");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ public DestinationTopic getDestinationTopicByName(String mainListenerId, String
() -> "No DestinationTopic found for " + mainListenerId + ":" + topic).getSourceDestination();
}

@Override
public DestinationTopic getDltFor(String mainListenerId, String topicName) {
return getDltFor(mainListenerId, topicName, null);
}

@Nullable
@Override
public DestinationTopic getDltFor(String mainListenerId, String topicName, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public interface DestinationTopicContainer {
*/
@Nullable
@Deprecated(since = "3.2", forRemoval = true)
DestinationTopic getDltFor(String mainListenerId, String topicName);
default DestinationTopic getDltFor(String mainListenerId, String topicName) {
return getDltFor(mainListenerId, topicName, null);
}

/**
* Returns the {@link DestinationTopic} instance registered as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void shouldGetNextDestinationTopic() {
@Test
void shouldGetGeneralPurposeDltWhenExceptionIsNotKnown() {
assertThat(defaultDestinationTopicContainer
.getDltFor("id", mainDestinationTopic.getDestinationName()))
.getDltFor("id", mainDestinationTopic.getDestinationName(), null))
.isEqualTo(dltDestinationTopic);
}

Expand Down

0 comments on commit e6affce

Please sign in to comment.