Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-1189: Asynchronous server-side processing in a request/reply scenario #2996

Merged
merged 11 commits into from
Jan 31, 2024

Conversation

Wzy19930507
Copy link
Contributor

@Wzy19930507 Wzy19930507 commented Jan 18, 2024

Resolves #1189

  • support Mono and Future
  • support kotlin suspend
  • Auto detect async reply
  • @SendTo for @KafkaHandler after error is handled

Refactor MessagingMessageListenerAdapter

  • move BatchMessagingMessageListenerAdapter#invoke and RecordMessagingMessageListenerAdapter#invoke to MessagingMessageListenerAdapter
  • move KafkaListenerErrorHandler to MessagingMessageListenerAdapter
  • add @Nullable to KafkaListenerErrorHandler

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great, but looks like it require more work to make it perfect.

Also see the warning in your new Kotlin test:

 w: file:///home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt:100:14 Unsafe use of a nullable receiver of type ConsumerRecord<String!, String!>?

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very close to the final solution.
Please, address those my final "at a glance" reviews and I'll pull it locally to play with and possible merge.

Thanks

@Wzy19930507

This comment was marked as resolved.

@artembilan
Copy link
Member

Probably some race condition.
Since coroutine is an async you might have some thread barrier to wait for the reply to be fulfilled.

catch (Exception ex) {
}
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
acknowledge(acknowledgment);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we ack positively in case of error?
Spring AMQP logic is like this:

	protected void asyncFailure(Message request, Channel channel, Throwable t, Object source) {
		this.logger.error("Future, Mono, or suspend function was completed with an exception for " + request, t);
		try {
			channel.basicNack(request.getMessageProperties().getDeliveryTag(), false,
					ContainerUtils.shouldRequeue(this.defaultRequeueRejected, t, this.logger));
		}
		catch (IOException e) {
			this.logger.error("Failed to nack message", e);
		}
	}

Don't we have something like that in Spring Kafka as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having some trouble, function Acknowlege.nack not support out-of-order commit now.
Another Similar functions is recoverer(in particular DLT), but not sure how to implement it yet.

Could we temporary use KafkaListenerErrorHandler?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I see you do use it in the handleException():

Object errorResult = this.errorHandler.handleError(message, e, consumer, acknowledgment);

Then it goes to the:

handleResult(Objects.requireNonNullElse(result, errorResult),
							records, acknowledgment, consumer, message);

So, probably it is too late over here to make some assumption and do this unconditional acknowledge(acknowledgment).

I mean it feel like this call has to be removed from here

@Wzy19930507
Copy link
Contributor Author

Probably some race condition.
Since coroutine is an async you might have some thread barrier to wait for the reply to be fulfilled.

After kafka client 2.4 producer use sticky partition, it random chose partition and topic default partitions is 2.

@artembilan
Copy link
Member

After kafka client 2.4 producer use sticky partition, it random chose partition and topic default partitions is 2.

So, consider to configure that @EmbeddedKafka to provide just one partition per topic.

@Wzy19930507
Copy link
Contributor Author

Wzy19930507 commented Jan 19, 2024

So, consider to configure that @EmbeddedKafka to provide just one partition per topic.

Thanks, nice idea.

catch (Exception ex) {
}
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
acknowledge(acknowledgment);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I see you do use it in the handleException():

Object errorResult = this.errorHandler.handleError(message, e, consumer, acknowledgment);

Then it goes to the:

handleResult(Objects.requireNonNullElse(result, errorResult),
							records, acknowledgment, consumer, message);

So, probably it is too late over here to make some assumption and do this unconditional acknowledge(acknowledgment).

I mean it feel like this call has to be removed from here

protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Message<?> message, ListenerExecutionFailedException e) {

if (this.errorHandler != null) {
Copy link
Contributor Author

@Wzy19930507 Wzy19930507 Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean it feel like this call has to be removed from here

When this.errorHandler != null, need acknowledgment skip error record.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. The idea is to ack the record we have just handled via errorHandler.
Isn't that this error handler responsibility then?
I'm not sure though what we should do otherwise, when this.errorHandler == null...

Copy link
Contributor Author

@Wzy19930507 Wzy19930507 Jan 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this.errorHandler != null, need acknowledgment skip error record.

Sorry, misdescription, When this.errorHandler == null will throw origin exception or errorHandler threw exception, need acknowledgment skip error record. at out of order commit mode, if not ack, will pause the consumer.

Zhiyang.Wang1 and others added 9 commits January 30, 2024 10:55
* move `BatchMessagingMessageListenerAdapter#invoke` and `RecordMessagingMessageListenerAdapter#invoke` to `MessagingMessageListenerAdapter`
* move `KafkaListenerErrorHandler` to `MessagingMessageListenerAdapter`
* add `@Nullable` to `KafkaListenerErrorHandler`
* Support `Mono` and `Future`
* Support auto ack at async return scenario when manual commit
* Support `KafkaListenerErrorHandler`
* Add warn log if the container is not configured for out-of-order manual commit
* Add async return test in `BatchMessagingMessageListenerAdapterTests`
  and `MessagingMessageListenerAdapterTests`
* Add unit test async listener with `@SendTo` in `AsyncListenerTests`
* Add `async-returns.adoc` and `whats-new.adoc`
* Support kotlin suspend
* Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo`
* Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter`
* Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3`
* auto-detect async reply than coerce the out-of-order manual commit.
* add new interface `HandlerMethodDetect` to detect handler args and return type.
* add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene.
* modify async-returns.adoc
…handled

Sending the result from a `KafkaListenerErrorHandler` was broken
for `@KafkaHandler` because the send to expression
was lost.
* move class from package `annotation` to package `adapter`
* re name bar,baz in BatchMessagingMessageListenerAdapterTests
* poblish unit test `MessagingMessageListenerAdapterTests` and `EnableKafkaKotlinCoroutinesTests`
* poblish doc async-returns.adoc and nav.adoc
* rename `HandlerMethodDetect` to `AsyncRepliesAware`
* fix javadoc in `ContinuationHandlerMethodArgumentResolver`
After kafka client 2.4 producer uses sticky partition, its randomly chose partition and topic default partitions is 2, configure that @embeddedkafka to provide just one partition per topic.

* javadoc in `AsyncRepliesAware`
* fix test in EnableKafkaKotlinCoroutinesTests
* polish adoc
* polish `HandlerAdapter`
* change `InvocationResult` to record
* Optimization `MessagingMessageListenerAdapter.asyncFailure`
@Wzy19930507
Copy link
Contributor Author

Sorry i do some wrong when git rebase action, should i reopen RP to keep the RP clean?

@artembilan
Copy link
Member

I believe you can just do git rebase main if your main is up today.

if that doesn’t help, no worries: we will rebase and squash on merge.

we didn’t forget about you: just busy with some other urgent stuff.

thank you for updates anyway !

@Wzy19930507
Copy link
Contributor Author

Wzy19930507 commented Jan 30, 2024

I solved this problem and thanks for your patient answer.

@artembilan artembilan enabled auto-merge (squash) January 31, 2024 18:26
@artembilan artembilan merged commit b44d742 into spring-projects:main Jan 31, 2024
3 checks passed
@Wzy19930507 Wzy19930507 deleted the GH-1189 branch February 1, 2024 02:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Asynchronous server-side processing in a request/reply scenario
2 participants