diff --git a/pom.xml b/pom.xml index f4eb82b8bb..651bd7cb47 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-GH-4543-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index 2de4b6b635..f2d6837771 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-GH-4543-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 41b81f9aa6..cb0ab5c56d 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-GH-4543-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index d7a9ddaa63..3bf479528e 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.2.0-SNAPSHOT + 4.2.0-GH-4543-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java index 45d6709c91..8939589888 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java @@ -216,7 +216,7 @@ private Mono bulkWriteTo(MongoCollection collection) collection = collection.withWriteConcern(defaultWriteConcern); } - Flux concat = Flux.concat(models).flatMap(it -> { + Flux concat = Flux.concat(models).flatMapSequential(it -> { if (it.model()instanceof InsertOneModel iom) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index a0f4312cae..1e2c834a3a 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -1051,7 +1051,7 @@ private Flux aggregateAndMap(MongoCollection collection, List Flux> geoNear(NearQuery near, Class entityClass, S .withOptions(optionsBuilder.build()); return aggregate($geoNear, collection, Document.class) // - .concatMap(callback::doWith); + .flatMapSequential(callback::doWith); } @Override @@ -1324,7 +1324,7 @@ public Flux insertAll(Mono> batchToSave Assert.notNull(batchToSave, "Batch to insert must not be null"); - return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName)); + return Flux.from(batchToSave).flatMapSequential(collection -> insert(collection, collectionName)); } @Override @@ -1392,7 +1392,7 @@ public Flux insertAll(Collection objectsToSave) { @Override public Flux insertAll(Mono> objectsToSave) { - return Flux.from(objectsToSave).flatMap(this::insertAll); + return Flux.from(objectsToSave).flatMapSequential(this::insertAll); } protected Flux doInsertAll(Collection listToSave, MongoWriter writer) { @@ -1443,7 +1443,7 @@ protected Flux doInsertBatch(String collectionName, Collection { + return insertDocuments.flatMapSequential(tuple -> { Document document = tuple.getT2(); Object id = MappedDocument.of(document).getId(); @@ -1600,7 +1600,7 @@ protected Flux insertDocumentList(String collectionName, List { + }).flatMapSequential(s -> { return Flux.fromStream(documents.stream() // .map(MappedDocument::of) // @@ -2187,7 +2187,7 @@ public Flux mapReduce(Query filterQuery, Class domainType, String inpu publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher); return Flux.from(publisher) - .concatMap(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith); + .flatMapSequential(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith); }); } @@ -2255,7 +2255,7 @@ protected Flux doFindAndDelete(String collectionName, Query query, Class< return Flux.from(flux).collectList().filter(it -> !it.isEmpty()) .flatMapMany(list -> Flux.from(remove(operations.getByIdInQuery(list), entityClass, collectionName)) - .flatMap(deleteResult -> Flux.fromIterable(list))); + .flatMapSequential(deleteResult -> Flux.fromIterable(list))); } /** @@ -2729,7 +2729,7 @@ private Flux executeFindMultiInternal(ReactiveCollectionQueryCallback { return Flux.from(preparer.initiateFind(collection, collectionCallback::doInCollection)) - .concatMap(objectCallback::doWith); + .flatMapSequential(objectCallback::doWith); }); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index 1c84e5fde3..7cf6de3525 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -121,7 +121,7 @@ public Flux saveAll(Publisher entityStream) { Assert.notNull(entityStream, "The given Publisher of entities must not be null"); - return Flux.from(entityStream).flatMap(entity -> entityInformation.isNew(entity) ? // + return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? // mongoOperations.insert(entity, entityInformation.getCollectionName()) : // mongoOperations.save(entity, entityInformation.getCollectionName())); } @@ -191,7 +191,7 @@ public Flux findAllById(Publisher ids) { Assert.notNull(ids, "The given Publisher of Id's must not be null"); Optional readPreference = getReadPreference(); - return Flux.from(ids).buffer().flatMap(listOfIds -> { + return Flux.from(ids).buffer().flatMapSequential(listOfIds -> { Query query = getIdQuery(listOfIds); readPreference.ifPresent(query::withReadPreference); return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName()); @@ -345,7 +345,8 @@ public Flux insert(Publisher entities) { Assert.notNull(entities, "The given Publisher of entities must not be null"); - return Flux.from(entities).flatMap(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName())); + return Flux.from(entities) + .flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName())); } // ------------------------------------------------------------------------- diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java index 79fc1869f8..8e6173f5a4 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; @@ -697,6 +698,28 @@ void aggreateShouldUseReadReadPreference() { verify(collection).withReadPreference(ReadPreference.primaryPreferred()); } + @Test // GH-4543 + void aggregateDoesNotLimitBackpressure() { + + reset(collection); + + AtomicLong request = new AtomicLong(); + Publisher realPublisher = Flux.just(new Document()).doOnRequest(request::addAndGet); + + doAnswer(invocation -> { + Subscriber subscriber = invocation.getArgument(0); + realPublisher.subscribe(subscriber); + return null; + }).when(aggregatePublisher).subscribe(any()); + + when(collection.aggregate(anyList())).thenReturn(aggregatePublisher); + when(collection.aggregate(anyList(), any(Class.class))).thenReturn(aggregatePublisher); + + template.aggregate(newAggregation(Sith.class, project("id")), AutogenerateableId.class, Document.class).subscribe(); + + assertThat(request).hasValueGreaterThan(128); + } + @Test // DATAMONGO-1854 void aggreateShouldUseCollationFromOptionsEvenIfDefaultCollationIsPresent() { @@ -1261,6 +1284,17 @@ void findShouldInvokeAfterConvertCallbacks() { assertThat(results.get(0).id).isEqualTo("after-convert"); } + @Test // GH-4543 + void findShouldNotLimitBackpressure() { + + AtomicLong request = new AtomicLong(); + stubFindSubscribe(new Document(), request); + + template.find(new Query(), Person.class).subscribe(); + + assertThat(request).hasValueGreaterThan(128); + } + @Test // DATAMONGO-2479 void findByIdShouldInvokeAfterConvertCallbacks() { @@ -1706,8 +1740,12 @@ public WriteConcern resolve(MongoAction action) { } private void stubFindSubscribe(Document document) { + stubFindSubscribe(document, new AtomicLong()); + } + + private void stubFindSubscribe(Document document, AtomicLong request) { - Publisher realPublisher = Flux.just(document); + Publisher realPublisher = Flux.just(document).doOnRequest(request::addAndGet); doAnswer(invocation -> { Subscriber subscriber = invocation.getArgument(0);