Skip to content

Commit

Permalink
Merge pull request #46248 from franz1981/mongo_netty
Browse files Browse the repository at this point in the history
Mongo reactive client should use Netty transport
  • Loading branch information
geoand authored Feb 13, 2025
2 parents 22d8b35 + ee18cb3 commit cda43af
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,28 @@ public interface MongoClientConfig {
*/
Optional<UuidRepresentation> uuidRepresentation();

enum TransportConfig {
/**
* Uses a Netty-based transport re-using the existing Netty event loops.
*/
NETTY,
/**
* With a reactive driver it uses an async transport backed by a driver-managed thread pool,
* while with a blocking driver it uses a blocking transport.
*/
MONGO
}

/**
* Configures the reactive transport.
*/
@WithDefault("netty")
TransportConfig reactiveTransport();

/**
* Configures the blocking transport.
*/
@WithDefault("mongo")
TransportConfig blockingTransport();

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,19 @@
import com.mongodb.connection.ServerSettings;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.event.CommandListener;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.reactivestreams.client.ReactiveContextProvider;

import io.netty.channel.socket.nio.NioSocketChannel;
import io.quarkus.credentials.CredentialsProvider;
import io.quarkus.credentials.runtime.CredentialsProviderFinder;
import io.quarkus.mongodb.MongoClientName;
import io.quarkus.mongodb.impl.ReactiveMongoClientImpl;
import io.quarkus.mongodb.reactive.ReactiveMongoClient;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.impl.VertxByteBufAllocator;

/**
* This class is sort of a producer for {@link MongoClient} and {@link ReactiveMongoClient}.
Expand All @@ -86,20 +90,23 @@ public class MongoClients {
private final Map<String, ReactiveMongoClient> reactiveMongoClients = new HashMap<>();
private final Instance<ReactiveContextProvider> reactiveContextProviders;
private final Instance<MongoClientCustomizer> customizers;
private final Vertx vertx;

public MongoClients(MongodbConfig mongodbConfig, MongoClientSupport mongoClientSupport,
Instance<CodecProvider> codecProviders,
Instance<PropertyCodecProvider> propertyCodecProviders,
Instance<CommandListener> commandListeners,
Instance<ReactiveContextProvider> reactiveContextProviders,
@Any Instance<MongoClientCustomizer> customizers) {
@Any Instance<MongoClientCustomizer> customizers,
Vertx vertx) {
this.mongodbConfig = mongodbConfig;
this.mongoClientSupport = mongoClientSupport;
this.codecProviders = codecProviders;
this.propertyCodecProviders = propertyCodecProviders;
this.commandListeners = commandListeners;
this.reactiveContextProviders = reactiveContextProviders;
this.customizers = customizers;
this.vertx = vertx;

try {
//JDK bug workaround
Expand Down Expand Up @@ -254,6 +261,18 @@ private MongoClientSettings createMongoConfiguration(String name, MongoClientCon

MongoClientSettings.Builder settings = MongoClientSettings.builder();

switch (config.reactiveTransport()) {
case NETTY:
// we supports just NIO for now
if (!vertx.isNativeTransportEnabled()) {
configureNettyTransport(settings);
}
break;
case MONGO:
// no-op since this is the default behaviour
break;
}

if (isReactive) {
reactiveContextProviders.stream().findAny().ifPresent(settings::contextProvider);
}
Expand Down Expand Up @@ -329,6 +348,14 @@ private MongoClientSettings createMongoConfiguration(String name, MongoClientCon
return settings.build();
}

private void configureNettyTransport(MongoClientSettings.Builder settings) {
var nettyStreaming = TransportSettings.nettyBuilder()
.allocator(VertxByteBufAllocator.POOLED_ALLOCATOR)
.eventLoopGroup(vertx.nettyEventLoopGroup())
.socketChannelClass(NioSocketChannel.class).build();
settings.transportSettings(nettyStreaming);
}

private boolean doesNotHaveClientNameQualifier(Bean<?> bean) {
for (Annotation qualifier : bean.getQualifiers()) {
if (qualifier.annotationType().equals(MongoClientName.class)) {
Expand Down

0 comments on commit cda43af

Please sign in to comment.