-
My example is R2dbc + Kotlin Coroutines, the complete codes is here, https://github.com/hantsy/micronaut-sandbox/tree/master/data-r2dbc-kotlin-co The following test was working well, but failed after upgrade to the latest Micronaut. "find by title" {
val sql = "insert into posts(title, content, status) values ($1, $2, $3)";
Mono
.from(template.withTransaction { status: ReactiveTransactionStatus<Connection> ->
Mono.from(
status.connection.createStatement(sql)
.bind(0, "test title")
.bind(1, "test content")
.bind(2, "DRAFT")
.execute()
).flatMap { Mono.from(it.rowsUpdated) }
})
.`as` { StepVerifier.create(it) }
.consumeNextWith { it shouldBeEqualComparingTo 1 }
.verifyComplete()
runBlocking {
val all = posts.findAll(Specifications.titleLike("test")).toList()
log.debug("all posts size:{}", all.size)
all shouldHaveSize 1
val all2 = posts.findAll(Specifications.titleLike("test2")).toList()
log.debug("all2 posts size:{}", all2.size)
all2 shouldHaveSize 0
}
} I got the exception when running the tests. 23:28:31.364 [reactor-tcp-nio-3] DEBUG i.m.d.r.o.DefaultR2dbcRepositoryOperations - Transaction Begin for DataSource: default
23:28:31.373 [reactor-tcp-nio-3] DEBUG i.m.d.r.o.DefaultR2dbcRepositoryOperations - Committing transaction for DataSource default.
23:28:31.373 [reactor-tcp-nio-3] DEBUG i.m.d.r.o.DefaultR2dbcRepositoryOperations - Closing Connection for DataSource: default
23:28:31.374 [reactor-tcp-nio-3] WARN reactor.core.publisher.FluxUsingWhen - Async resource cleanup failed after cancel
io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: Connection closed
at io.r2dbc.postgresql.client.ReactorNettyClient.lambda$static$1(ReactorNettyClient.java:100)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.close(ReactorNettyClient.java:919)
at io.r2dbc.postgresql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:513)
at io.r2dbc.postgresql.client.ReactorNettyClient.lambda$close$7(ReactorNettyClient.java:186)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.cancel(FluxUsingWhen.java:330)
at reactor.core.publisher.Operators$DeferredSubscription.cancel(Operators.java:1633)
at reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.cancel(FluxUsingWhen.java:248)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:81)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:184)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:213)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:229)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:391)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:145)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:213)
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:620)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:885)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:761)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:668)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:119)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
23:28:31.378 [reactor-tcp-nio-3] WARN i.r.p.client.ReactorNettyClient - Connection Error
reactor.netty.channel.AbortedException: Connection has been closed BEFORE send operation
at reactor.netty.channel.AbortedException.beforeSend(AbortedException.java:59)
at reactor.netty.channel.ChannelOperations.send(ChannelOperations.java:283)
at reactor.netty.NettyOutbound.send(NettyOutbound.java:86)
at io.r2dbc.postgresql.client.ReactorNettyClient.lambda$new$3(ReactorNettyClient.java:168)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2194)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2068)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:451)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:296)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:887)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:230)
at reactor.core.publisher.UnicastProcessor.checkTerminated(UnicastProcessor.java:495)
at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:380)
at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:470)
at reactor.core.publisher.UnicastProcessor.tryEmitComplete(UnicastProcessor.java:264)
at reactor.core.publisher.UnicastProcessor.onComplete(UnicastProcessor.java:248)
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)
at io.r2dbc.postgresql.ExtendedFlowDelegate$ExtendedFlowOperator.close(ExtendedFlowDelegate.java:342)
at io.r2dbc.postgresql.ExtendedFlowDelegate.lambda$fetchAll$3(ExtendedFlowDelegate.java:146)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:213)
at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:620)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:885)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:761)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:668)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:119)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833) The |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Using |
Beta Was this translation helpful? Give feedback.
-
It works after changing as suggested. But this test class also includes other tests, other tests also use |
Beta Was this translation helpful? Give feedback.
Using
Mono#from
might be problematic for R2dbc as you shouldn't close the upstream, try to useMono#fromDirect
.