Support kotlin coroutines in consumer #2653
Replies: 6 comments 2 replies
-
It's not really the I am not that familiar with Kotlin Coroutines, but my understanding is that the listener container would have to be written in Kotlin to support them (a new listener container would be required). This will require a huge effort. Given that project Loom (and virtual threads) is on the horizon, it is not clear that it worth the effort because supporting Loom is trivial (I already have tested it). |
Beta Was this translation helpful? Give feedback.
-
I believe that it's not necessary write in kotlin. When in Kotlin we have a suspend function in Java it's a method with a one more parameter of type You could create a implementation of import java.util.concurrent.Executor;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlinx.coroutines.Dispatchers;
public class JavaContinuation<T> implements Continuation<T> {
private final Executor executor;
private Continuation<? super T> continuation;
public JavaContinuation(Executor executor) {
this.executor = executor;
}
@Override
public CoroutineContext getContext() {
return Dispatchers.from(executor);
}
@Override
public void resumeWith(Result<T> result) {
continuation.resumeWith(result);
}
public void setContinuation(Continuation<? super T> continuation) {
this.continuation = continuation;
}
} ExecutorService executor = Executors.newSingleThreadExecutor();
JavaContinuation<Integer> javaContinuation = new JavaContinuation<>(executor);
CoroutineScope scope = CoroutineScopeKt.newCoroutineScope(Dispatchers.getDefault());
scope.launch(javaContinuation.getContext(), () -> {
Object result = consume(...);
javaContinuation.resumeWith(Result.Companion.success(result));
});
executor.shutdown(); But for this you should have Kotlin coroutine dependencies. I don't know so much about the project. There are implementations of Flux with this project ? |
Beta Was this translation helpful? Give feedback.
-
It is not clear what that would buy you, the listener container will still have a thread running in the background and the record's offset will be committed when handed off (by default). Exactly what are you trying to achieve by using coroutines?
This project does not support reactive workloads.; see
Google? |
Beta Was this translation helpful? Give feedback.
-
There is just no listener method async return types handling in the framework yet. I don't think, Gary, that virtual threads would do anything replacing Kotlin's |
Beta Was this translation helpful? Give feedback.
-
It is much simpler there because individual messages are acknowledged; with Kafka, all we have is a committed offset. so any implementation here would be limited to using the out of order commit deferral capability. Use of multi-threaded consumers is discouraged because of these complications. |
Beta Was this translation helpful? Give feedback.
-
In my first example it can be seen that use
Ok, I will check this project.
jajaja, you're right.
It's a good perspective, but how to work |
Beta Was this translation helpful? Give feedback.
-
Hi everyone, I'm working in a project with spring-kafka and Kotlin. I have a service class with suspend functions and from the consume invoke those functions whereby the consume must be suspend function, but this not working with spring-kafka.
Service class example:
Consume class example:
Apparently in InvocableHandlerMethod of spring-kafka not ready to work with Kotlin Coroutines, it's is correct ?
Beta Was this translation helpful? Give feedback.
All reactions