Skip to content

Commit 8eda292

Browse files
mp911dechristophstrobl
authored andcommitted
Switch to Flux.flatMapSequential(…) to prevent backpressure shaping.
We now use Flux.flatMapSequential(…) instead of concatMap as concatMap reduces the request size to 1. The change in backpressure/request size reduces parallelism and impacts the batch size by fetching 2 documents instead of considering the actual backpressure. flatMapSequential doesn't tamper the requested amount while retaining the sequence order. Closes: #4543 Original Pull Request: #4550
1 parent a429ff8 commit 8eda292

File tree

3 files changed

+52
-13
lines changed

3 files changed

+52
-13
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -952,7 +952,7 @@ private <O> Flux<O> aggregateAndMap(MongoCollection<Document> collection, List<D
952952
return (isOutOrMerge ? Flux.from(cursor.toCollection()) : Flux.from(cursor.first())).thenMany(Mono.empty());
953953
}
954954

955-
return Flux.from(cursor).concatMap(readCallback::doWith);
955+
return Flux.from(cursor).flatMapSequential(readCallback::doWith);
956956
}
957957

958958
@Override
@@ -988,7 +988,7 @@ protected <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<?> entityClass, S
988988
.withOptions(AggregationOptions.builder().collation(near.getCollation()).build());
989989

990990
return aggregate($geoNear, collection, Document.class) //
991-
.concatMap(callback::doWith);
991+
.flatMapSequential(callback::doWith);
992992
}
993993

994994
@Override
@@ -1212,7 +1212,7 @@ public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> batchToSave
12121212

12131213
Assert.notNull(batchToSave, "Batch to insert must not be null");
12141214

1215-
return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName));
1215+
return Flux.from(batchToSave).flatMapSequential(collection -> insert(collection, collectionName));
12161216
}
12171217

12181218
@Override
@@ -1280,7 +1280,7 @@ public <T> Flux<T> insertAll(Collection<? extends T> objectsToSave) {
12801280

12811281
@Override
12821282
public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> objectsToSave) {
1283-
return Flux.from(objectsToSave).flatMap(this::insertAll);
1283+
return Flux.from(objectsToSave).flatMapSequential(this::insertAll);
12841284
}
12851285

12861286
protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) {
@@ -1331,7 +1331,7 @@ protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends
13311331
return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
13321332
});
13331333

1334-
return insertDocuments.flatMap(tuple -> {
1334+
return insertDocuments.flatMapSequential(tuple -> {
13351335

13361336
Document document = tuple.getT2();
13371337
Object id = MappedDocument.of(document).getId();
@@ -1488,7 +1488,7 @@ protected Flux<ObjectId> insertDocumentList(String collectionName, List<Document
14881488

14891489
return collectionToUse.insertMany(documents);
14901490

1491-
}).flatMap(s -> {
1491+
}).flatMapSequential(s -> {
14921492

14931493
return Flux.fromStream(documents.stream() //
14941494
.map(MappedDocument::of) //
@@ -2038,7 +2038,7 @@ public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inpu
20382038
publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
20392039

20402040
return Flux.from(publisher)
2041-
.concatMap(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
2041+
.flatMapSequential(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
20422042
});
20432043
}
20442044

@@ -2106,7 +2106,7 @@ protected <T> Flux<T> doFindAndDelete(String collectionName, Query query, Class<
21062106

21072107
return Flux.from(flux).collectList().filter(it -> !it.isEmpty())
21082108
.flatMapMany(list -> Flux.from(remove(operations.getByIdInQuery(list), entityClass, collectionName))
2109-
.flatMap(deleteResult -> Flux.fromIterable(list)));
2109+
.flatMapSequential(deleteResult -> Flux.fromIterable(list)));
21102110
}
21112111

21122112
/**
@@ -2545,7 +2545,7 @@ private <T> Flux<T> executeFindMultiInternal(ReactiveCollectionQueryCallback<Doc
25452545

25462546
return createFlux(collectionName, collection -> {
25472547
return Flux.from(preparer.initiateFind(collection, collectionCallback::doInCollection))
2548-
.concatMap(objectCallback::doWith);
2548+
.flatMapSequential(objectCallback::doWith);
25492549
});
25502550
}
25512551

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
107107

108108
Assert.notNull(entityStream, "The given Publisher of entities must not be null");
109109

110-
return Flux.from(entityStream).flatMap(entity -> entityInformation.isNew(entity) ? //
110+
return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? //
111111
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
112112
mongoOperations.save(entity, entityInformation.getCollectionName()));
113113
}
@@ -165,7 +165,7 @@ public Flux<T> findAllById(Publisher<ID> ids) {
165165

166166
Assert.notNull(ids, "The given Publisher of Id's must not be null");
167167

168-
return Flux.from(ids).buffer().flatMap(this::findAllById);
168+
return Flux.from(ids).buffer().flatMapSequential(this::findAllById);
169169
}
170170

171171
@Override
@@ -295,7 +295,8 @@ public <S extends T> Flux<S> insert(Publisher<S> entities) {
295295

296296
Assert.notNull(entities, "The given Publisher of entities must not be null");
297297

298-
return Flux.from(entities).flatMap(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
298+
return Flux.from(entities)
299+
.flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
299300
}
300301

301302
// -------------------------------------------------------------------------

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicLong;
3940
import java.util.stream.Collectors;
4041

4142
import org.assertj.core.api.Assertions;
@@ -632,6 +633,28 @@ void aggreateShouldUseDefaultCollationIfPresent() {
632633
verify(aggregatePublisher).collation(eq(com.mongodb.client.model.Collation.builder().locale("de_AT").build()));
633634
}
634635

636+
@Test // GH-4543
637+
void aggregateDoesNotLimitBackpressure() {
638+
639+
reset(collection);
640+
641+
AtomicLong request = new AtomicLong();
642+
Publisher<Document> realPublisher = Flux.just(new Document()).doOnRequest(request::addAndGet);
643+
644+
doAnswer(invocation -> {
645+
Subscriber<Document> subscriber = invocation.getArgument(0);
646+
realPublisher.subscribe(subscriber);
647+
return null;
648+
}).when(aggregatePublisher).subscribe(any());
649+
650+
when(collection.aggregate(anyList())).thenReturn(aggregatePublisher);
651+
when(collection.aggregate(anyList(), any(Class.class))).thenReturn(aggregatePublisher);
652+
653+
template.aggregate(newAggregation(Sith.class, project("id")), AutogenerateableId.class, Document.class).subscribe();
654+
655+
assertThat(request).hasValueGreaterThan(128);
656+
}
657+
635658
@Test // DATAMONGO-1854
636659
void aggreateShouldUseCollationFromOptionsEvenIfDefaultCollationIsPresent() {
637660

@@ -1185,6 +1208,17 @@ void findShouldInvokeAfterConvertCallbacks() {
11851208
assertThat(results.get(0).id).isEqualTo("after-convert");
11861209
}
11871210

1211+
@Test // GH-4543
1212+
void findShouldNotLimitBackpressure() {
1213+
1214+
AtomicLong request = new AtomicLong();
1215+
stubFindSubscribe(new Document(), request);
1216+
1217+
template.find(new Query(), Person.class).subscribe();
1218+
1219+
assertThat(request).hasValueGreaterThan(128);
1220+
}
1221+
11881222
@Test // DATAMONGO-2479
11891223
void findByIdShouldInvokeAfterConvertCallbacks() {
11901224

@@ -1549,8 +1583,12 @@ void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() {
15491583
}
15501584

15511585
private void stubFindSubscribe(Document document) {
1586+
stubFindSubscribe(document, new AtomicLong());
1587+
}
1588+
1589+
private void stubFindSubscribe(Document document, AtomicLong request) {
15521590

1553-
Publisher<Document> realPublisher = Flux.just(document);
1591+
Publisher<Document> realPublisher = Flux.just(document).doOnRequest(request::addAndGet);
15541592

15551593
doAnswer(invocation -> {
15561594
Subscriber<Document> subscriber = invocation.getArgument(0);

0 commit comments

Comments
 (0)