diff --git a/src/test/spec/command_monitoring/operation.rs b/src/test/spec/command_monitoring/operation.rs index a6d58c30d..b52b658be 100644 --- a/src/test/spec/command_monitoring/operation.rs +++ b/src/test/spec/command_monitoring/operation.rs @@ -1,7 +1,6 @@ use std::{ops::Deref, time::Duration}; -use async_trait::async_trait; -use futures::stream::StreamExt; +use futures::{future::BoxFuture, stream::StreamExt, FutureExt}; use serde::{ de::{self, Deserializer}, Deserialize, @@ -16,12 +15,11 @@ use crate::{ Collection, }; -#[async_trait] pub(super) trait TestOperation { /// The command names to monitor as part of this test. fn command_names(&self) -> &[&str]; - async fn execute(&self, collection: Collection) -> Result<()>; + fn execute(&self, collection: Collection) -> BoxFuture>; } pub(super) struct AnyTestOperation { @@ -75,17 +73,19 @@ pub(super) struct DeleteMany { filter: Document, } -#[async_trait] impl TestOperation for DeleteMany { fn command_names(&self) -> &[&str] { &["delete"] } - async fn execute(&self, collection: Collection) -> Result<()> { - collection - .delete_many(self.filter.clone(), None) - .await - .map(|_| ()) + fn execute(&self, collection: Collection) -> BoxFuture> { + async move { + collection + .delete_many(self.filter.clone(), None) + .await + .map(|_| ()) + } + .boxed() } } @@ -94,17 +94,19 @@ pub(super) struct DeleteOne { filter: Document, } -#[async_trait] impl TestOperation for DeleteOne { fn command_names(&self) -> &[&str] { &["delete"] } - async fn execute(&self, collection: Collection) -> Result<()> { - collection - .delete_one(self.filter.clone(), None) - .await - .map(|_| ()) + fn execute(&self, collection: Collection) -> BoxFuture> { + async move { + collection + .delete_one(self.filter.clone(), None) + .await + .map(|_| ()) + } + .boxed() } } @@ -159,31 +161,33 @@ pub(super) struct Find { modifiers: Option, } -#[async_trait] impl TestOperation for Find { fn command_names(&self) -> &[&str] { &["find", "getMore"] } - async fn execute(&self, collection: Collection) -> Result<()> { - let mut options = FindOptions { - sort: self.sort.clone(), - skip: self.skip, - batch_size: self.batch_size.map(|i| i as u32), - limit: self.limit, - ..Default::default() - }; - - if let Some(ref modifiers) = self.modifiers { - modifiers.update_options(&mut options); - } - - let mut cursor = collection.find(self.filter.clone(), options).await?; - - while let Some(result) = cursor.next().await { - result?; + fn execute(&self, collection: Collection) -> BoxFuture> { + async move { + let mut options = FindOptions { + sort: self.sort.clone(), + skip: self.skip, + batch_size: self.batch_size.map(|i| i as u32), + limit: self.limit, + ..Default::default() + }; + + if let Some(ref modifiers) = self.modifiers { + modifiers.update_options(&mut options); + } + + let mut cursor = collection.find(self.filter.clone(), options).await?; + + while let Some(result) = cursor.next().await { + result?; + } + Ok(()) } - Ok(()) + .boxed() } } @@ -194,17 +198,19 @@ pub(super) struct InsertMany { options: Option, } -#[async_trait] impl TestOperation for InsertMany { fn command_names(&self) -> &[&str] { &["insert"] } - async fn execute(&self, collection: Collection) -> Result<()> { - collection - .insert_many(self.documents.clone(), self.options.clone()) - .await - .map(|_| ()) + fn execute(&self, collection: Collection) -> BoxFuture> { + async move { + collection + .insert_many(self.documents.clone(), self.options.clone()) + .await + .map(|_| ()) + } + .boxed() } } @@ -213,17 +219,19 @@ pub(super) struct InsertOne { document: Document, } -#[async_trait] impl TestOperation for InsertOne { fn command_names(&self) -> &[&str] { &["insert"] } - async fn execute(&self, collection: Collection) -> Result<()> { - collection - .insert_one(self.document.clone(), None) - .await - .map(|_| ()) + fn execute(&self, collection: Collection) -> BoxFuture> { + async move { + collection + .insert_one(self.document.clone(), None) + .await + .map(|_| ()) + } + .boxed() } } @@ -233,17 +241,19 @@ pub(super) struct UpdateMany { update: Document, } -#[async_trait] impl TestOperation for UpdateMany { fn command_names(&self) -> &[&str] { &["update"] } - async fn execute(&self, collection: Collection) -> Result<()> { - collection - .update_many(self.filter.clone(), self.update.clone(), None) - .await - .map(|_| ()) + fn execute(&self, collection: Collection) -> BoxFuture> { + async move { + collection + .update_many(self.filter.clone(), self.update.clone(), None) + .await + .map(|_| ()) + } + .boxed() } } @@ -255,21 +265,23 @@ pub(super) struct UpdateOne { upsert: Option, } -#[async_trait] impl TestOperation for UpdateOne { fn command_names(&self) -> &[&str] { &["update"] } - async fn execute(&self, collection: Collection) -> Result<()> { - let options = self.upsert.map(|b| UpdateOptions { - upsert: Some(b), - ..Default::default() - }); - collection - .update_one(self.filter.clone(), self.update.clone(), options) - .await - .map(|_| ()) + fn execute(&self, collection: Collection) -> BoxFuture> { + async move { + let options = self.upsert.map(|b| UpdateOptions { + upsert: Some(b), + ..Default::default() + }); + collection + .update_one(self.filter.clone(), self.update.clone(), options) + .await + .map(|_| ()) + } + .boxed() } } diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index 278d62add..c93532f7d 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, convert::TryInto, fmt::Debug, ops::Deref, time::Duration}; -use async_trait::async_trait; -use futures::stream::TryStreamExt; +use futures::{future::BoxFuture, stream::TryStreamExt, FutureExt}; use serde::{de::Deserializer, Deserialize}; use super::{Entity, ExpectError, TestRunner}; @@ -37,15 +36,21 @@ use crate::{ RUNTIME, }; -#[async_trait] pub trait TestOperation: Debug { - async fn execute_test_runner_operation(&self, test_runner: &mut TestRunner); + fn execute_test_runner_operation<'a>( + &'a self, + _test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + todo!() + } - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result>; + fn execute_entity_operation<'a>( + &'a self, + _id: &'a str, + _test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + todo!() + } /// Whether or not this operation returns an array of root documents. This information is /// necessary to determine how the return value of an operation should be compared to the @@ -249,23 +254,21 @@ pub(super) struct DeleteMany { options: DeleteOptions, } -#[async_trait] impl TestOperation for DeleteMany { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .delete_many(self.filter.clone(), self.options.clone()) - .await?; - let result = to_bson(&result)?; - Ok(Some(result.into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id); + let result = collection + .delete_many(self.filter.clone(), self.options.clone()) + .await?; + let result = to_bson(&result)?; + Ok(Some(result.into())) + } + .boxed() } } @@ -278,33 +281,31 @@ pub(super) struct DeleteOne { options: DeleteOptions, } -#[async_trait] impl TestOperation for DeleteOne { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id).clone(); - let result = match &self.session { - Some(session_id) => { - let session = test_runner.get_mut_session(session_id); - collection - .delete_one_with_session(self.filter.clone(), self.options.clone(), session) - .await? - } - None => { - collection - .delete_one(self.filter.clone(), self.options.clone()) - .await? - } - }; - let result = to_bson(&result)?; - Ok(Some(result.into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .delete_one_with_session(self.filter.clone(), self.options.clone(), session) + .await? + } + None => { + collection + .delete_one(self.filter.clone(), self.options.clone()) + .await? + } + }; + let result = to_bson(&result)?; + Ok(Some(result.into())) + } + .boxed() } } @@ -317,42 +318,40 @@ pub(super) struct Find { options: FindOptions, } -#[async_trait] impl TestOperation for Find { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id).clone(); - let result = match &self.session { - Some(session_id) => { - let session = test_runner.get_mut_session(session_id); - let mut cursor = collection - .find_with_session(self.filter.clone(), self.options.clone(), session) - .await?; - cursor - .stream(session) - .try_collect::>() - .await? - } - None => { - let cursor = collection - .find(self.filter.clone(), self.options.clone()) - .await?; - cursor.try_collect::>().await? - } - }; - Ok(Some(Bson::from(result).into())) + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + let mut cursor = collection + .find_with_session(self.filter.clone(), self.options.clone(), session) + .await?; + cursor + .stream(session) + .try_collect::>() + .await? + } + None => { + let cursor = collection + .find(self.filter.clone(), self.options.clone()) + .await?; + cursor.try_collect::>().await? + } + }; + Ok(Some(Bson::from(result).into())) + } + .boxed() } fn returns_root_documents(&self) -> bool { false } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() - } } #[derive(Debug, Deserialize)] @@ -364,38 +363,40 @@ pub(super) struct InsertMany { options: InsertManyOptions, } -#[async_trait] impl TestOperation for InsertMany { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id).clone(); - let result = match &self.session { - Some(session_id) => { - let session = test_runner.get_mut_session(session_id); - collection - .insert_many_with_session(self.documents.clone(), self.options.clone(), session) - .await? - } - None => { - collection - .insert_many(self.documents.clone(), self.options.clone()) - .await? - } - }; - let ids: HashMap = result - .inserted_ids - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect(); - let ids = to_bson(&ids)?; - Ok(Some(Bson::from(doc! { "insertedIds": ids }).into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .insert_many_with_session( + self.documents.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .insert_many(self.documents.clone(), self.options.clone()) + .await? + } + }; + let ids: HashMap = result + .inserted_ids + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(); + let ids = to_bson(&ids)?; + Ok(Some(Bson::from(doc! { "insertedIds": ids }).into())) + } + .boxed() } } @@ -408,36 +409,34 @@ pub(super) struct InsertOne { options: InsertOneOptions, } -#[async_trait] impl TestOperation for InsertOne { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id).clone(); - let result = match &self.session { - Some(session_id) => { - collection - .insert_one_with_session( - self.document.clone(), - self.options.clone(), - test_runner.get_mut_session(session_id), - ) - .await? - } - None => { - collection - .insert_one(self.document.clone(), self.options.clone()) - .await? - } - }; - let result = to_bson(&result)?; - Ok(Some(result.into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + collection + .insert_one_with_session( + self.document.clone(), + self.options.clone(), + test_runner.get_mut_session(session_id), + ) + .await? + } + None => { + collection + .insert_one(self.document.clone(), self.options.clone()) + .await? + } + }; + let result = to_bson(&result)?; + Ok(Some(result.into())) + } + .boxed() } } @@ -450,27 +449,25 @@ pub(super) struct UpdateMany { options: UpdateOptions, } -#[async_trait] impl TestOperation for UpdateMany { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .update_many( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - ) - .await?; - let result = to_bson(&result)?; - Ok(Some(result.into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id); + let result = collection + .update_many( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + ) + .await?; + let result = to_bson(&result)?; + Ok(Some(result.into())) + } + .boxed() } } @@ -484,41 +481,39 @@ pub(super) struct UpdateOne { session: Option, } -#[async_trait] impl TestOperation for UpdateOne { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id).clone(); - let result = match &self.session { - Some(session_id) => { - collection - .update_one_with_session( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - test_runner.get_mut_session(session_id), - ) - .await? - } - None => { - collection - .update_one( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - ) - .await? - } - }; - let result = to_bson(&result)?; - Ok(Some(result.into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + collection + .update_one_with_session( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + test_runner.get_mut_session(session_id), + ) + .await? + } + None => { + collection + .update_one( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + ) + .await? + } + }; + let result = to_bson(&result)?; + Ok(Some(result.into())) + } + .boxed() } } @@ -531,68 +526,66 @@ pub(super) struct Aggregate { options: AggregateOptions, } -#[async_trait] impl TestOperation for Aggregate { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let result = match &self.session { - Some(session_id) => { - let entity = test_runner.entities.get(id).unwrap().clone(); - let session = test_runner.get_mut_session(session_id); - let mut cursor = match entity { - Entity::Collection(collection) => { - collection - .aggregate_with_session( + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match &self.session { + Some(session_id) => { + let entity = test_runner.entities.get(id).unwrap().clone(); + let session = test_runner.get_mut_session(session_id); + let mut cursor = match entity { + Entity::Collection(collection) => { + collection + .aggregate_with_session( + self.pipeline.clone(), + self.options.clone(), + session, + ) + .await? + } + Entity::Database(db) => { + db.aggregate_with_session( self.pipeline.clone(), self.options.clone(), session, ) .await? - } - Entity::Database(db) => { - db.aggregate_with_session( - self.pipeline.clone(), - self.options.clone(), - session, - ) + } + other => panic!("Cannot execute aggregate on {:?}", &other), + }; + cursor + .stream(session) + .try_collect::>() .await? - } - other => panic!("Cannot execute aggregate on {:?}", &other), - }; - cursor - .stream(session) - .try_collect::>() - .await? - } - None => { - let cursor = match test_runner.entities.get(id).unwrap() { - Entity::Collection(collection) => { - collection - .aggregate(self.pipeline.clone(), self.options.clone()) - .await? - } - Entity::Database(db) => { - db.aggregate(self.pipeline.clone(), self.options.clone()) - .await? - } - other => panic!("Cannot execute aggregate on {:?}", &other), - }; - cursor.try_collect::>().await? - } - }; - Ok(Some(Bson::from(result).into())) + } + None => { + let cursor = match test_runner.entities.get(id).unwrap() { + Entity::Collection(collection) => { + collection + .aggregate(self.pipeline.clone(), self.options.clone()) + .await? + } + Entity::Database(db) => { + db.aggregate(self.pipeline.clone(), self.options.clone()) + .await? + } + other => panic!("Cannot execute aggregate on {:?}", &other), + }; + cursor.try_collect::>().await? + } + }; + Ok(Some(Bson::from(result).into())) + } + .boxed() } fn returns_root_documents(&self) -> bool { true } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() - } } #[derive(Debug, Deserialize)] @@ -605,37 +598,35 @@ pub(super) struct Distinct { options: DistinctOptions, } -#[async_trait] impl TestOperation for Distinct { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id).clone(); - let result = match &self.session { - Some(session_id) => { - let session = test_runner.get_mut_session(session_id); - collection - .distinct_with_session( - &self.field_name, - self.filter.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .distinct(&self.field_name, self.filter.clone(), self.options.clone()) - .await? - } - }; - Ok(Some(Bson::Array(result).into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .distinct_with_session( + &self.field_name, + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .distinct(&self.field_name, self.filter.clone(), self.options.clone()) + .await? + } + }; + Ok(Some(Bson::Array(result).into())) + } + .boxed() } } @@ -648,36 +639,34 @@ pub(super) struct CountDocuments { options: CountOptions, } -#[async_trait] impl TestOperation for CountDocuments { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id).clone(); - let result = match &self.session { - Some(session_id) => { - let session = test_runner.get_mut_session(session_id); - collection - .count_documents_with_session( - self.filter.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .count_documents(self.filter.clone(), self.options.clone()) - .await? - } - }; - Ok(Some(Bson::Int64(result.try_into().unwrap()).into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .count_documents_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .count_documents(self.filter.clone(), self.options.clone()) + .await? + } + }; + Ok(Some(Bson::Int64(result.try_into().unwrap()).into())) + } + .boxed() } } @@ -688,22 +677,20 @@ pub(super) struct EstimatedDocumentCount { options: EstimatedDocumentCountOptions, } -#[async_trait] impl TestOperation for EstimatedDocumentCount { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .estimated_document_count(self.options.clone()) - .await?; - Ok(Some(Bson::Int64(result.try_into().unwrap()).into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id); + let result = collection + .estimated_document_count(self.options.clone()) + .await?; + Ok(Some(Bson::Int64(result.try_into().unwrap()).into())) + } + .boxed() } } @@ -715,25 +702,23 @@ pub(super) struct FindOne { options: FindOneOptions, } -#[async_trait] impl TestOperation for FindOne { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .find_one(self.filter.clone(), self.options.clone()) - .await?; - match result { - Some(result) => Ok(Some(Bson::from(result).into())), - None => Ok(Some(Entity::None)), + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id); + let result = collection + .find_one(self.filter.clone(), self.options.clone()) + .await?; + match result { + Some(result) => Ok(Some(Bson::from(result).into())), + None => Ok(Some(Entity::None)), + } } - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + .boxed() } } @@ -746,32 +731,34 @@ pub(super) struct ListDatabases { options: ListDatabasesOptions, } -#[async_trait] impl TestOperation for ListDatabases { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let client = test_runner.get_client(id).clone(); - let result = match &self.session { - Some(session_id) => { - let session = test_runner.get_mut_session(session_id); - client - .list_databases_with_session(self.filter.clone(), self.options.clone(), session) - .await? - } - None => { - client - .list_databases(self.filter.clone(), self.options.clone()) - .await? - } - }; - Ok(Some(bson::to_bson(&result)?.into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let client = test_runner.get_client(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + client + .list_databases_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + client + .list_databases(self.filter.clone(), self.options.clone()) + .await? + } + }; + Ok(Some(bson::to_bson(&result)?.into())) + } + .boxed() } } @@ -783,23 +770,21 @@ pub(super) struct ListDatabaseNames { options: ListDatabasesOptions, } -#[async_trait] impl TestOperation for ListDatabaseNames { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let client = test_runner.get_client(id); - let result = client - .list_database_names(self.filter.clone(), self.options.clone()) - .await?; - let result: Vec = result.iter().map(|s| Bson::String(s.to_string())).collect(); - Ok(Some(Bson::Array(result).into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let client = test_runner.get_client(id); + let result = client + .list_database_names(self.filter.clone(), self.options.clone()) + .await?; + let result: Vec = result.iter().map(|s| Bson::String(s.to_string())).collect(); + Ok(Some(Bson::Array(result).into())) + } + .boxed() } } @@ -812,43 +797,41 @@ pub(super) struct ListCollections { options: ListCollectionsOptions, } -#[async_trait] impl TestOperation for ListCollections { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let db = test_runner.get_database(id).clone(); - let result = match &self.session { - Some(session_id) => { - let session = test_runner.get_mut_session(session_id); - let mut cursor = db - .list_collections_with_session( - self.filter.clone(), - self.options.clone(), - session, - ) - .await?; - cursor.stream(session).try_collect::>().await? - } - None => { - let cursor = db - .list_collections(self.filter.clone(), self.options.clone()) - .await?; - cursor.try_collect::>().await? - } - }; - Ok(Some(bson::to_bson(&result)?.into())) + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let db = test_runner.get_database(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + let mut cursor = db + .list_collections_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await?; + cursor.stream(session).try_collect::>().await? + } + None => { + let cursor = db + .list_collections(self.filter.clone(), self.options.clone()) + .await?; + cursor.try_collect::>().await? + } + }; + Ok(Some(bson::to_bson(&result)?.into())) + } + .boxed() } fn returns_root_documents(&self) -> bool { true } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() - } } #[derive(Debug, Deserialize)] @@ -857,21 +840,19 @@ pub(super) struct ListCollectionNames { filter: Option, } -#[async_trait] impl TestOperation for ListCollectionNames { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let db = test_runner.get_database(id); - let result = db.list_collection_names(self.filter.clone()).await?; - let result: Vec = result.iter().map(|s| Bson::String(s.to_string())).collect(); - Ok(Some(Bson::from(result).into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let db = test_runner.get_database(id); + let result = db.list_collection_names(self.filter.clone()).await?; + let result: Vec = result.iter().map(|s| Bson::String(s.to_string())).collect(); + Ok(Some(Bson::from(result).into())) + } + .boxed() } } @@ -884,27 +865,25 @@ pub(super) struct ReplaceOne { options: ReplaceOptions, } -#[async_trait] impl TestOperation for ReplaceOne { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .replace_one( - self.filter.clone(), - self.replacement.clone(), - self.options.clone(), - ) - .await?; - let result = to_bson(&result)?; - Ok(Some(result.into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id); + let result = collection + .replace_one( + self.filter.clone(), + self.replacement.clone(), + self.options.clone(), + ) + .await?; + let result = to_bson(&result)?; + Ok(Some(result.into())) + } + .boxed() } } @@ -918,42 +897,40 @@ pub(super) struct FindOneAndUpdate { options: FindOneAndUpdateOptions, } -#[async_trait] impl TestOperation for FindOneAndUpdate { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id).clone(); - let result = match &self.session { - Some(session_id) => { - let session = test_runner.get_mut_session(session_id); - collection - .find_one_and_update_with_session( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .find_one_and_update( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - ) - .await? - } - }; - let result = to_bson(&result)?; - Ok(Some(result.into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + collection + .find_one_and_update_with_session( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .find_one_and_update( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + ) + .await? + } + }; + let result = to_bson(&result)?; + Ok(Some(result.into())) + } + .boxed() } } @@ -966,27 +943,25 @@ pub(super) struct FindOneAndReplace { options: FindOneAndReplaceOptions, } -#[async_trait] impl TestOperation for FindOneAndReplace { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .find_one_and_replace( - self.filter.clone(), - self.replacement.clone(), - self.options.clone(), - ) - .await?; - let result = to_bson(&result)?; - Ok(Some(result.into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id); + let result = collection + .find_one_and_replace( + self.filter.clone(), + self.replacement.clone(), + self.options.clone(), + ) + .await?; + let result = to_bson(&result)?; + Ok(Some(result.into())) + } + .boxed() } } @@ -998,23 +973,21 @@ pub(super) struct FindOneAndDelete { options: FindOneAndDeleteOptions, } -#[async_trait] impl TestOperation for FindOneAndDelete { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let collection = test_runner.get_collection(id); - let result = collection - .find_one_and_delete(self.filter.clone(), self.options.clone()) - .await?; - let result = to_bson(&result)?; - Ok(Some(result.into())) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let collection = test_runner.get_collection(id); + let result = collection + .find_one_and_delete(self.filter.clone(), self.options.clone()) + .await?; + let result = to_bson(&result)?; + Ok(Some(result.into())) + } + .boxed() } } @@ -1025,25 +998,22 @@ pub(super) struct FailPointCommand { client: String, } -#[async_trait] impl TestOperation for FailPointCommand { - async fn execute_test_runner_operation(&self, test_runner: &mut TestRunner) { - let client = test_runner.get_client(&self.client); - let guard = self - .fail_point - .clone() - .enable(client, Some(ReadPreference::Primary.into())) - .await - .unwrap(); - test_runner.fail_point_guards.push(guard); - } - - async fn execute_entity_operation( - &self, - _id: &str, - _test_runner: &mut TestRunner, - ) -> Result> { - unimplemented!() + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + let client = test_runner.get_client(&self.client); + let guard = self + .fail_point + .clone() + .enable(client, Some(ReadPreference::Primary.into())) + .await + .unwrap(); + test_runner.fail_point_guards.push(guard); + } + .boxed() } } @@ -1054,20 +1024,17 @@ pub(super) struct AssertCollectionExists { database_name: String, } -#[async_trait] impl TestOperation for AssertCollectionExists { - async fn execute_test_runner_operation(&self, test_runner: &mut TestRunner) { - let db = test_runner.internal_client.database(&self.database_name); - let names = db.list_collection_names(None).await.unwrap(); - assert!(names.contains(&self.collection_name)); - } - - async fn execute_entity_operation( - &self, - _id: &str, - _test_runner: &mut TestRunner, - ) -> Result> { - unimplemented!() + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + let db = test_runner.internal_client.database(&self.database_name); + let names = db.list_collection_names(None).await.unwrap(); + assert!(names.contains(&self.collection_name)); + } + .boxed() } } @@ -1078,20 +1045,17 @@ pub(super) struct AssertCollectionNotExists { database_name: String, } -#[async_trait] impl TestOperation for AssertCollectionNotExists { - async fn execute_test_runner_operation(&self, test_runner: &mut TestRunner) { - let db = test_runner.internal_client.database(&self.database_name); - let names = db.list_collection_names(None).await.unwrap(); - assert!(!names.contains(&self.collection_name)); - } - - async fn execute_entity_operation( - &self, - _id: &str, - _test_runner: &mut TestRunner, - ) -> Result> { - unimplemented!() + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + let db = test_runner.internal_client.database(&self.database_name); + let names = db.list_collection_names(None).await.unwrap(); + assert!(!names.contains(&self.collection_name)); + } + .boxed() } } @@ -1104,33 +1068,31 @@ pub(super) struct CreateCollection { session: Option, } -#[async_trait] impl TestOperation for CreateCollection { - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() - } - - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let database = test_runner.get_database(id).clone(); - - if let Some(session_id) = &self.session { - database - .create_collection_with_session( - &self.collection, - self.options.clone(), - test_runner.get_mut_session(session_id), - ) - .await?; - } else { - database - .create_collection(&self.collection, self.options.clone()) - .await?; + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let database = test_runner.get_database(id).clone(); + + if let Some(session_id) = &self.session { + database + .create_collection_with_session( + &self.collection, + self.options.clone(), + test_runner.get_mut_session(session_id), + ) + .await?; + } else { + database + .create_collection(&self.collection, self.options.clone()) + .await?; + } + Ok(None) } - Ok(None) + .boxed() } } @@ -1143,31 +1105,29 @@ pub(super) struct DropCollection { session: Option, } -#[async_trait] impl TestOperation for DropCollection { - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() - } - - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let database = test_runner.entities.get(id).unwrap().as_database(); - let collection = database.collection::(&self.collection).clone(); - - if let Some(session_id) = &self.session { - collection - .drop_with_session( - self.options.clone(), - test_runner.get_mut_session(session_id), - ) - .await?; - } else { - collection.drop(self.options.clone()).await?; + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let database = test_runner.entities.get(id).unwrap().as_database(); + let collection = database.collection::(&self.collection).clone(); + + if let Some(session_id) = &self.session { + collection + .drop_with_session( + self.options.clone(), + test_runner.get_mut_session(session_id), + ) + .await?; + } else { + collection.drop(self.options.clone()).await?; + } + Ok(None) } - Ok(None) + .boxed() } } @@ -1182,39 +1142,37 @@ pub(super) struct RunCommand { write_concern: Option, } -#[async_trait] impl TestOperation for RunCommand { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let mut command = self.command.clone(); - if let Some(ref read_concern) = self.read_concern { - command.insert("readConcern", read_concern.clone()); - } - if let Some(ref write_concern) = self.write_concern { - command.insert("writeConcern", write_concern.clone()); - } - - let db = test_runner.get_database(id).clone(); - let result = match &self.session { - Some(session_id) => { - let session = test_runner.get_mut_session(session_id); - db.run_command_with_session(command, self.read_preference.clone(), session) - .await? + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let mut command = self.command.clone(); + if let Some(ref read_concern) = self.read_concern { + command.insert("readConcern", read_concern.clone()); } - None => { - db.run_command(command, self.read_preference.clone()) - .await? + if let Some(ref write_concern) = self.write_concern { + command.insert("writeConcern", write_concern.clone()); } - }; - let result = to_bson(&result)?; - Ok(Some(result.into())) - } - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + let db = test_runner.get_database(id).clone(); + let result = match &self.session { + Some(session_id) => { + let session = test_runner.get_mut_session(session_id); + db.run_command_with_session(command, self.read_preference.clone(), session) + .await? + } + None => { + db.run_command(command, self.read_preference.clone()) + .await? + } + }; + let result = to_bson(&result)?; + Ok(Some(result.into())) + } + .boxed() } } @@ -1222,21 +1180,19 @@ impl TestOperation for RunCommand { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct EndSession {} -#[async_trait] impl TestOperation for EndSession { - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() - } - - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let session = test_runner.get_mut_session(id).client_session.take(); - drop(session); - RUNTIME.delay_for(Duration::from_secs(1)).await; - Ok(None) + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let session = test_runner.get_mut_session(id).client_session.take(); + drop(session); + RUNTIME.delay_for(Duration::from_secs(1)).await; + Ok(None) + } + .boxed() } } @@ -1247,26 +1203,23 @@ pub(super) struct AssertSessionTransactionState { state: String, } -#[async_trait] impl TestOperation for AssertSessionTransactionState { - async fn execute_test_runner_operation(&self, test_runner: &mut TestRunner) { - let session: &ClientSession = test_runner.get_session(&self.session); - let session_state = match &session.transaction.state { - TransactionState::None => "none", - TransactionState::Starting => "starting", - TransactionState::InProgress => "inprogress", - TransactionState::Committed { data_committed: _ } => "committed", - TransactionState::Aborted => "aborted", - }; - assert_eq!(session_state, self.state); - } - - async fn execute_entity_operation( - &self, - _id: &str, - _test_runner: &mut TestRunner, - ) -> Result> { - unimplemented!() + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + let session: &ClientSession = test_runner.get_session(&self.session); + let session_state = match &session.transaction.state { + TransactionState::None => "none", + TransactionState::Starting => "starting", + TransactionState::InProgress => "inprogress", + TransactionState::Committed { data_committed: _ } => "committed", + TransactionState::Aborted => "aborted", + }; + assert_eq!(session_state, self.state); + } + .boxed() } } @@ -1276,23 +1229,20 @@ pub(super) struct AssertDifferentLsidOnLastTwoCommands { client: String, } -#[async_trait] impl TestOperation for AssertDifferentLsidOnLastTwoCommands { - async fn execute_test_runner_operation(&self, test_runner: &mut TestRunner) { - let client = test_runner.entities.get(&self.client).unwrap().as_client(); - let events = client.get_all_command_started_events(); - - let lsid1 = events[events.len() - 1].command.get("lsid").unwrap(); - let lsid2 = events[events.len() - 2].command.get("lsid").unwrap(); - assert_ne!(lsid1, lsid2); - } - - async fn execute_entity_operation( - &self, - _id: &str, - _test_runner: &mut TestRunner, - ) -> Result> { - unimplemented!() + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + let client = test_runner.entities.get(&self.client).unwrap().as_client(); + let events = client.get_all_command_started_events(); + + let lsid1 = events[events.len() - 1].command.get("lsid").unwrap(); + let lsid2 = events[events.len() - 2].command.get("lsid").unwrap(); + assert_ne!(lsid1, lsid2); + } + .boxed() } } @@ -1302,23 +1252,20 @@ pub(super) struct AssertSameLsidOnLastTwoCommands { client: String, } -#[async_trait] impl TestOperation for AssertSameLsidOnLastTwoCommands { - async fn execute_test_runner_operation(&self, test_runner: &mut TestRunner) { - let client = test_runner.entities.get(&self.client).unwrap().as_client(); - let events = client.get_all_command_started_events(); - - let lsid1 = events[events.len() - 1].command.get("lsid").unwrap(); - let lsid2 = events[events.len() - 2].command.get("lsid").unwrap(); - assert_eq!(lsid1, lsid2); - } - - async fn execute_entity_operation( - &self, - _id: &str, - _test_runner: &mut TestRunner, - ) -> Result> { - unimplemented!() + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + let client = test_runner.entities.get(&self.client).unwrap().as_client(); + let events = client.get_all_command_started_events(); + + let lsid1 = events[events.len() - 1].command.get("lsid").unwrap(); + let lsid2 = events[events.len() - 2].command.get("lsid").unwrap(); + assert_eq!(lsid1, lsid2); + } + .boxed() } } @@ -1328,19 +1275,16 @@ pub(super) struct AssertSessionDirty { session: String, } -#[async_trait] impl TestOperation for AssertSessionDirty { - async fn execute_test_runner_operation(&self, test_runner: &mut TestRunner) { - let session: &ClientSession = test_runner.get_session(&self.session); - assert!(session.is_dirty()); - } - - async fn execute_entity_operation( - &self, - _id: &str, - _test_runner: &mut TestRunner, - ) -> Result> { - unimplemented!() + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + let session: &ClientSession = test_runner.get_session(&self.session); + assert!(session.is_dirty()); + } + .boxed() } } @@ -1350,19 +1294,16 @@ pub(super) struct AssertSessionNotDirty { session: String, } -#[async_trait] impl TestOperation for AssertSessionNotDirty { - async fn execute_test_runner_operation(&self, test_runner: &mut TestRunner) { - let session: &ClientSession = test_runner.get_session(&self.session); - assert!(!session.is_dirty()); - } - - async fn execute_entity_operation( - &self, - _id: &str, - _test_runner: &mut TestRunner, - ) -> Result> { - unimplemented!() + fn execute_test_runner_operation<'a>( + &'a self, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, ()> { + async move { + let session: &ClientSession = test_runner.get_session(&self.session); + assert!(!session.is_dirty()); + } + .boxed() } } @@ -1370,20 +1311,18 @@ impl TestOperation for AssertSessionNotDirty { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct StartTransaction {} -#[async_trait] impl TestOperation for StartTransaction { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let session: &mut ClientSession = test_runner.get_mut_session(id); - session.start_transaction(None).await?; - Ok(None) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let session: &mut ClientSession = test_runner.get_mut_session(id); + session.start_transaction(None).await?; + Ok(None) + } + .boxed() } } @@ -1391,20 +1330,18 @@ impl TestOperation for StartTransaction { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct CommitTransaction {} -#[async_trait] impl TestOperation for CommitTransaction { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let session: &mut ClientSession = test_runner.get_mut_session(id); - session.commit_transaction().await?; - Ok(None) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let session: &mut ClientSession = test_runner.get_mut_session(id); + session.commit_transaction().await?; + Ok(None) + } + .boxed() } } @@ -1412,37 +1349,22 @@ impl TestOperation for CommitTransaction { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub(super) struct AbortTransaction {} -#[async_trait] impl TestOperation for AbortTransaction { - async fn execute_entity_operation( - &self, - id: &str, - test_runner: &mut TestRunner, - ) -> Result> { - let session: &mut ClientSession = test_runner.get_mut_session(id); - session.abort_transaction().await?; - Ok(None) - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() + fn execute_entity_operation<'a>( + &'a self, + id: &'a str, + test_runner: &'a mut TestRunner, + ) -> BoxFuture<'a, Result>> { + async move { + let session: &mut ClientSession = test_runner.get_mut_session(id); + session.abort_transaction().await?; + Ok(None) + } + .boxed() } } #[derive(Debug, Deserialize)] pub(super) struct UnimplementedOperation; -#[async_trait] -impl TestOperation for UnimplementedOperation { - async fn execute_entity_operation( - &self, - _id: &str, - _test_runner: &mut TestRunner, - ) -> Result> { - unimplemented!() - } - - async fn execute_test_runner_operation(&self, _test_runner: &mut TestRunner) { - unimplemented!() - } -} +impl TestOperation for UnimplementedOperation {} diff --git a/src/test/spec/v2_runner/operation.rs b/src/test/spec/v2_runner/operation.rs index d1728c361..a3628035a 100644 --- a/src/test/spec/v2_runner/operation.rs +++ b/src/test/spec/v2_runner/operation.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, convert::TryInto, fmt::Debug, ops::Deref}; -use async_trait::async_trait; -use futures::stream::TryStreamExt; +use futures::{future::BoxFuture, stream::TryStreamExt, FutureExt}; use serde::{de::Deserializer, Deserialize}; use crate::{ @@ -40,26 +39,36 @@ use crate::{ Database, }; -// The linked issue causes a warning that cannot be suppressed when providing a default -// implementation for the async functions contained in this trait. -// -#[async_trait] pub trait TestOperation: Debug { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result>; + fn execute_on_collection<'a>( + &'a self, + _collection: &'a Collection, + _session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + todo!() + } - async fn execute_on_database( - &self, - database: &Database, - session: Option<&mut ClientSession>, - ) -> Result>; + fn execute_on_database<'a>( + &'a self, + _database: &'a Database, + _session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + todo!() + } - async fn execute_on_client(&self, client: &TestClient) -> Result>; + fn execute_on_client<'a>( + &'a self, + _client: &'a TestClient, + ) -> BoxFuture<'a, Result>> { + todo!() + } - async fn execute_on_session(&self, session: &mut ClientSession) -> Result>; + fn execute_on_session<'a>( + &'a self, + _session: &'a mut ClientSession, + ) -> BoxFuture<'a, Result>> { + todo!() + } } #[derive(Debug)] @@ -293,43 +302,33 @@ pub(super) struct DeleteMany { options: Option, } -#[async_trait] impl TestOperation for DeleteMany { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .delete_many_with_session(self.filter.clone(), self.options.clone(), session) - .await? - } - None => { - collection - .delete_many(self.filter.clone(), self.options.clone()) - .await? - } - }; - let result = bson::to_bson(&result)?; - Ok(Some(result)) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .delete_many_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .delete_many(self.filter.clone(), self.options.clone()) + .await? + } + }; + let result = bson::to_bson(&result)?; + Ok(Some(result)) + } + .boxed() } } @@ -340,43 +339,29 @@ pub(super) struct DeleteOne { options: Option, } -#[async_trait] impl TestOperation for DeleteOne { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .delete_one_with_session(self.filter.clone(), self.options.clone(), session) - .await? - } - None => { - collection - .delete_one(self.filter.clone(), self.options.clone()) - .await? - } - }; - let result = bson::to_bson(&result)?; - Ok(Some(result)) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .delete_one_with_session(self.filter.clone(), self.options.clone(), session) + .await? + } + None => { + collection + .delete_one(self.filter.clone(), self.options.clone()) + .await? + } + }; + let result = bson::to_bson(&result)?; + Ok(Some(result)) + } + .boxed() } } @@ -387,47 +372,33 @@ pub(super) struct Find { options: Option, } -#[async_trait] impl TestOperation for Find { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - let mut cursor = collection - .find_with_session(self.filter.clone(), self.options.clone(), session) - .await?; - cursor - .stream(session) - .try_collect::>() - .await? - } - None => { - let cursor = collection - .find(self.filter.clone(), self.options.clone()) - .await?; - cursor.try_collect::>().await? - } - }; - Ok(Some(Bson::from(result))) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + let mut cursor = collection + .find_with_session(self.filter.clone(), self.options.clone(), session) + .await?; + cursor + .stream(session) + .try_collect::>() + .await? + } + None => { + let cursor = collection + .find(self.filter.clone(), self.options.clone()) + .await?; + cursor.try_collect::>().await? + } + }; + Ok(Some(Bson::from(result))) + } + .boxed() } } @@ -438,48 +409,33 @@ pub(super) struct InsertMany { options: Option, } -#[async_trait] impl TestOperation for InsertMany { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .insert_many_with_session(self.documents.clone(), self.options.clone(), session) - .await? - } - None => { - collection - .insert_many(self.documents.clone(), self.options.clone()) - .await? - } - }; - let ids: HashMap = result - .inserted_ids - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect(); - let ids = bson::to_bson(&ids)?; - Ok(Some(Bson::from(doc! { "insertedIds": ids }))) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + let documents = self.documents.clone(); + let options = self.options.clone(); + + async move { + let result = match session { + Some(session) => { + collection + .insert_many_with_session(documents, options, session) + .await? + } + None => collection.insert_many(documents, options).await?, + }; + let ids: HashMap = result + .inserted_ids + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(); + let ids = bson::to_bson(&ids)?; + Ok(Some(Bson::from(doc! { "insertedIds": ids }))) + } + .boxed() } } @@ -490,43 +446,27 @@ pub(super) struct InsertOne { options: Option, } -#[async_trait] impl TestOperation for InsertOne { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .insert_one_with_session(self.document.clone(), self.options.clone(), session) - .await? - } - None => { - collection - .insert_one(self.document.clone(), self.options.clone()) - .await? - } - }; - let result = bson::to_bson(&result)?; - Ok(Some(result)) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + let document = self.document.clone(); + let options = self.options.clone(); + async move { + let result = match session { + Some(session) => { + collection + .insert_one_with_session(document, options, session) + .await? + } + None => collection.insert_one(document, options).await?, + }; + let result = bson::to_bson(&result)?; + Ok(Some(result)) + } + .boxed() } } @@ -538,52 +478,38 @@ pub(super) struct UpdateMany { options: Option, } -#[async_trait] impl TestOperation for UpdateMany { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .update_many_with_session( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .update_many( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - ) - .await? - } - }; - let result = bson::to_bson(&result)?; - Ok(Some(result)) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .update_many_with_session( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .update_many( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + ) + .await? + } + }; + let result = bson::to_bson(&result)?; + Ok(Some(result)) + } + .boxed() } } @@ -595,52 +521,38 @@ pub(super) struct UpdateOne { options: Option, } -#[async_trait] impl TestOperation for UpdateOne { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .update_one_with_session( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .update_one( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - ) - .await? - } - }; - let result = bson::to_bson(&result)?; - Ok(Some(result)) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .update_one_with_session( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .update_one( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + ) + .await? + } + }; + let result = bson::to_bson(&result)?; + Ok(Some(result)) + } + .boxed() } } @@ -652,65 +564,70 @@ pub(super) struct Aggregate { options: Option, } -#[async_trait] impl TestOperation for Aggregate { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - let mut cursor = collection - .aggregate_with_session(self.pipeline.clone(), self.options.clone(), session) - .await?; - cursor - .stream(session) - .try_collect::>() - .await? - } - None => { - let cursor = collection - .aggregate(self.pipeline.clone(), self.options.clone()) - .await?; - cursor.try_collect::>().await? - } - }; - Ok(Some(Bson::from(result))) - } - - async fn execute_on_database( - &self, - database: &Database, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - let mut cursor = database - .aggregate_with_session(self.pipeline.clone(), self.options.clone(), session) - .await?; - cursor - .stream(session) - .try_collect::>() - .await? - } - None => { - let cursor = database - .aggregate(self.pipeline.clone(), self.options.clone()) - .await?; - cursor.try_collect::>().await? - } - }; - - Ok(Some(Bson::from(result))) - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + let mut cursor = collection + .aggregate_with_session( + self.pipeline.clone(), + self.options.clone(), + session, + ) + .await?; + cursor + .stream(session) + .try_collect::>() + .await? + } + None => { + let cursor = collection + .aggregate(self.pipeline.clone(), self.options.clone()) + .await?; + cursor.try_collect::>().await? + } + }; + Ok(Some(Bson::from(result))) + } + .boxed() + } + + fn execute_on_database<'a>( + &'a self, + database: &'a Database, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + let mut cursor = database + .aggregate_with_session( + self.pipeline.clone(), + self.options.clone(), + session, + ) + .await?; + cursor + .stream(session) + .try_collect::>() + .await? + } + None => { + let cursor = database + .aggregate(self.pipeline.clone(), self.options.clone()) + .await?; + cursor.try_collect::>().await? + } + }; + + Ok(Some(Bson::from(result))) + } + .boxed() } } @@ -723,47 +640,33 @@ pub(super) struct Distinct { options: Option, } -#[async_trait] impl TestOperation for Distinct { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .distinct_with_session( - &self.field_name, - self.filter.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .distinct(&self.field_name, self.filter.clone(), self.options.clone()) - .await? - } - }; - Ok(Some(Bson::Array(result))) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .distinct_with_session( + &self.field_name, + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .distinct(&self.field_name, self.filter.clone(), self.options.clone()) + .await? + } + }; + Ok(Some(Bson::Array(result))) + } + .boxed() } } @@ -774,46 +677,32 @@ pub(super) struct CountDocuments { options: Option, } -#[async_trait] impl TestOperation for CountDocuments { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .count_documents_with_session( - self.filter.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .count_documents(self.filter.clone(), self.options.clone()) - .await? - } - }; - Ok(Some(Bson::Int64(result.try_into().unwrap()))) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .count_documents_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .count_documents(self.filter.clone(), self.options.clone()) + .await? + } + }; + Ok(Some(Bson::Int64(result.try_into().unwrap()))) + } + .boxed() } } @@ -823,33 +712,19 @@ pub(super) struct EstimatedDocumentCount { options: Option, } -#[async_trait] impl TestOperation for EstimatedDocumentCount { - async fn execute_on_collection( - &self, - collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - let result = collection - .estimated_document_count(self.options.clone()) - .await?; - Ok(Some(Bson::Int64(result.try_into().unwrap()))) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + _session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = collection + .estimated_document_count(self.options.clone()) + .await?; + Ok(Some(Bson::Int64(result.try_into().unwrap()))) + } + .boxed() } } @@ -860,45 +735,31 @@ pub(super) struct FindOne { options: Option, } -#[async_trait] impl TestOperation for FindOne { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .find_one_with_session(self.filter.clone(), self.options.clone(), session) - .await? - } - None => { - collection - .find_one(self.filter.clone(), self.options.clone()) - .await? + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .find_one_with_session(self.filter.clone(), self.options.clone(), session) + .await? + } + None => { + collection + .find_one(self.filter.clone(), self.options.clone()) + .await? + } + }; + match result { + Some(result) => Ok(Some(Bson::from(result))), + None => Ok(None), } - }; - match result { - Some(result) => Ok(Some(Bson::from(result))), - None => Ok(None), } - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + .boxed() } } @@ -909,48 +770,34 @@ pub(super) struct ListCollections { options: Option, } -#[async_trait] impl TestOperation for ListCollections { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - database: &Database, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - let mut cursor = database - .list_collections_with_session( - self.filter.clone(), - self.options.clone(), - session, - ) - .await?; - cursor.stream(session).try_collect::>().await? - } - None => { - let cursor = database - .list_collections(self.filter.clone(), self.options.clone()) - .await?; - cursor.try_collect::>().await? - } - }; - Ok(Some(bson::to_bson(&result)?)) - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_database<'a>( + &'a self, + database: &'a Database, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + let mut cursor = database + .list_collections_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await?; + cursor.stream(session).try_collect::>().await? + } + None => { + let cursor = database + .list_collections(self.filter.clone(), self.options.clone()) + .await?; + cursor.try_collect::>().await? + } + }; + Ok(Some(bson::to_bson(&result)?)) + } + .boxed() } } @@ -959,39 +806,25 @@ pub(super) struct ListCollectionNames { filter: Option, } -#[async_trait] impl TestOperation for ListCollectionNames { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - database: &Database, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - database - .list_collection_names_with_session(self.filter.clone(), session) - .await? - } - None => database.list_collection_names(self.filter.clone()).await?, - }; - let result: Vec = result.iter().map(|s| Bson::String(s.to_string())).collect(); - Ok(Some(Bson::from(result))) - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_database<'a>( + &'a self, + database: &'a Database, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + database + .list_collection_names_with_session(self.filter.clone(), session) + .await? + } + None => database.list_collection_names(self.filter.clone()).await?, + }; + let result: Vec = result.iter().map(|s| Bson::String(s.to_string())).collect(); + Ok(Some(Bson::from(result))) + } + .boxed() } } @@ -1003,52 +836,38 @@ pub(super) struct ReplaceOne { options: Option, } -#[async_trait] impl TestOperation for ReplaceOne { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .replace_one_with_session( - self.filter.clone(), - self.replacement.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .replace_one( - self.filter.clone(), - self.replacement.clone(), - self.options.clone(), - ) - .await? - } - }; - let result = bson::to_bson(&result)?; - Ok(Some(result)) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .replace_one_with_session( + self.filter.clone(), + self.replacement.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .replace_one( + self.filter.clone(), + self.replacement.clone(), + self.options.clone(), + ) + .await? + } + }; + let result = bson::to_bson(&result)?; + Ok(Some(result)) + } + .boxed() } } @@ -1060,52 +879,38 @@ pub(super) struct FindOneAndUpdate { options: Option, } -#[async_trait] impl TestOperation for FindOneAndUpdate { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .find_one_and_update_with_session( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .find_one_and_update( - self.filter.clone(), - self.update.clone(), - self.options.clone(), - ) - .await? - } - }; - let result = bson::to_bson(&result)?; - Ok(Some(result)) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .find_one_and_update_with_session( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .find_one_and_update( + self.filter.clone(), + self.update.clone(), + self.options.clone(), + ) + .await? + } + }; + let result = bson::to_bson(&result)?; + Ok(Some(result)) + } + .boxed() } } @@ -1117,52 +922,38 @@ pub(super) struct FindOneAndReplace { options: Option, } -#[async_trait] impl TestOperation for FindOneAndReplace { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .find_one_and_replace_with_session( - self.filter.clone(), - self.replacement.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .find_one_and_replace( - self.filter.clone(), - self.replacement.clone(), - self.options.clone(), - ) - .await? - } - }; - let result = bson::to_bson(&result)?; - Ok(Some(result)) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .find_one_and_replace_with_session( + self.filter.clone(), + self.replacement.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .find_one_and_replace( + self.filter.clone(), + self.replacement.clone(), + self.options.clone(), + ) + .await? + } + }; + let result = bson::to_bson(&result)?; + Ok(Some(result)) + } + .boxed() } } @@ -1173,47 +964,33 @@ pub(super) struct FindOneAndDelete { options: Option, } -#[async_trait] impl TestOperation for FindOneAndDelete { - async fn execute_on_collection( - &self, - collection: &Collection, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - collection - .find_one_and_delete_with_session( - self.filter.clone(), - self.options.clone(), - session, - ) - .await? - } - None => { - collection - .find_one_and_delete(self.filter.clone(), self.options.clone()) - .await? - } - }; - let result = bson::to_bson(&result)?; - Ok(Some(result)) - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_collection<'a>( + &'a self, + collection: &'a Collection, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + collection + .find_one_and_delete_with_session( + self.filter.clone(), + self.options.clone(), + session, + ) + .await? + } + None => { + collection + .find_one_and_delete(self.filter.clone(), self.options.clone()) + .await? + } + }; + let result = bson::to_bson(&result)?; + Ok(Some(result)) + } + .boxed() } } @@ -1224,33 +1001,18 @@ pub(super) struct ListDatabases { options: Option, } -#[async_trait] impl TestOperation for ListDatabases { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, client: &TestClient) -> Result> { - let result = client - .list_databases(self.filter.clone(), self.options.clone()) - .await?; - Ok(Some(bson::to_bson(&result)?)) - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_client<'a>( + &'a self, + client: &'a TestClient, + ) -> BoxFuture<'a, Result>> { + async move { + let result = client + .list_databases(self.filter.clone(), self.options.clone()) + .await?; + Ok(Some(bson::to_bson(&result)?)) + } + .boxed() } } @@ -1261,34 +1023,19 @@ pub(super) struct ListDatabaseNames { options: Option, } -#[async_trait] impl TestOperation for ListDatabaseNames { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, client: &TestClient) -> Result> { - let result = client - .list_database_names(self.filter.clone(), self.options.clone()) - .await?; - let result: Vec = result.iter().map(|s| Bson::String(s.to_string())).collect(); - Ok(Some(Bson::Array(result))) - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_client<'a>( + &'a self, + client: &'a TestClient, + ) -> BoxFuture<'a, Result>> { + async move { + let result = client + .list_database_names(self.filter.clone(), self.options.clone()) + .await?; + let result: Vec = result.iter().map(|s| Bson::String(s.to_string())).collect(); + Ok(Some(Bson::Array(result))) + } + .boxed() } } @@ -1297,50 +1044,35 @@ pub(super) struct AssertSessionTransactionState { state: String, } -#[async_trait] impl TestOperation for AssertSessionTransactionState { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, session: &mut ClientSession) -> Result> { - match self.state.as_str() { - "none" => assert!(matches!(session.transaction.state, TransactionState::None)), - "starting" => assert!(matches!( - session.transaction.state, - TransactionState::Starting - )), - "in_progress" => assert!(matches!( - session.transaction.state, - TransactionState::InProgress - )), - "committed" => assert!(matches!( - session.transaction.state, - TransactionState::Committed { .. } - )), - "aborted" => assert!(matches!( - session.transaction.state, - TransactionState::Aborted - )), - other => panic!("Unknown transaction state: {}", other), + fn execute_on_session<'a>( + &'a self, + session: &'a mut ClientSession, + ) -> BoxFuture<'a, Result>> { + async move { + match self.state.as_str() { + "none" => assert!(matches!(session.transaction.state, TransactionState::None)), + "starting" => assert!(matches!( + session.transaction.state, + TransactionState::Starting + )), + "in_progress" => assert!(matches!( + session.transaction.state, + TransactionState::InProgress + )), + "committed" => assert!(matches!( + session.transaction.state, + TransactionState::Committed { .. } + )), + "aborted" => assert!(matches!( + session.transaction.state, + TransactionState::Aborted + )), + other => panic!("Unknown transaction state: {}", other), + } + Ok(None) } - Ok(None) + .boxed() } } @@ -1349,93 +1081,42 @@ pub(super) struct StartTransaction { options: Option, } -#[async_trait] impl TestOperation for StartTransaction { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, session: &mut ClientSession) -> Result> { - session - .start_transaction(self.options.clone()) - .await - .map(|_| None) + fn execute_on_session<'a>( + &'a self, + session: &'a mut ClientSession, + ) -> BoxFuture<'a, Result>> { + async move { + session + .start_transaction(self.options.clone()) + .await + .map(|_| None) + } + .boxed() } } #[derive(Debug, Deserialize)] pub(super) struct CommitTransaction {} -#[async_trait] impl TestOperation for CommitTransaction { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, session: &mut ClientSession) -> Result> { - session.commit_transaction().await.map(|_| None) + fn execute_on_session<'a>( + &'a self, + session: &'a mut ClientSession, + ) -> BoxFuture<'a, Result>> { + async move { session.commit_transaction().await.map(|_| None) }.boxed() } } #[derive(Debug, Deserialize)] pub(super) struct AbortTransaction {} -#[async_trait] impl TestOperation for AbortTransaction { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, session: &mut ClientSession) -> Result> { - session.abort_transaction().await.map(|_| None) + fn execute_on_session<'a>( + &'a self, + session: &'a mut ClientSession, + ) -> BoxFuture<'a, Result>> { + async move { session.abort_transaction().await.map(|_| None) }.boxed() } } @@ -1446,42 +1127,28 @@ pub(super) struct RunCommand { read_preference: Option, } -#[async_trait] impl TestOperation for RunCommand { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - database: &Database, - session: Option<&mut ClientSession>, - ) -> Result> { - let selection_criteria = self - .read_preference - .as_ref() - .map(|read_preference| SelectionCriteria::ReadPreference(read_preference.clone())); - let result = match session { - Some(session) => { - database - .run_command_with_session(self.command.clone(), selection_criteria, session) - .await - } - None => database.run_command(self.command.clone(), None).await, - }; - result.map(|doc| Some(Bson::Document(doc))) - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_database<'a>( + &'a self, + database: &'a Database, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let selection_criteria = self + .read_preference + .as_ref() + .map(|read_preference| SelectionCriteria::ReadPreference(read_preference.clone())); + let result = match session { + Some(session) => { + database + .run_command_with_session(self.command.clone(), selection_criteria, session) + .await + } + None => database.run_command(self.command.clone(), None).await, + }; + result.map(|doc| Some(Bson::Document(doc))) + } + .boxed() } } @@ -1492,44 +1159,30 @@ pub(super) struct DropCollection { options: Option, } -#[async_trait] impl TestOperation for DropCollection { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - database: &Database, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - database - .collection::(&self.collection) - .drop_with_session(self.options.clone(), session) - .await - } - None => { - database - .collection::(&self.collection) - .drop(self.options.clone()) - .await - } - }; - result.map(|_| None) - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_database<'a>( + &'a self, + database: &'a Database, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + database + .collection::(&self.collection) + .drop_with_session(self.options.clone(), session) + .await + } + None => { + database + .collection::(&self.collection) + .drop(self.options.clone()) + .await + } + }; + result.map(|_| None) + } + .boxed() } } @@ -1540,42 +1193,32 @@ pub(super) struct CreateCollection { options: Option, } -#[async_trait] impl TestOperation for CreateCollection { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - database: &Database, - session: Option<&mut ClientSession>, - ) -> Result> { - let result = match session { - Some(session) => { - database - .create_collection_with_session(&self.collection, self.options.clone(), session) - .await - } - None => { - database - .create_collection(&self.collection, self.options.clone()) - .await - } - }; - result.map(|_| None) - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_database<'a>( + &'a self, + database: &'a Database, + session: Option<&'a mut ClientSession>, + ) -> BoxFuture<'a, Result>> { + async move { + let result = match session { + Some(session) => { + database + .create_collection_with_session( + &self.collection, + self.options.clone(), + session, + ) + .await + } + None => { + database + .create_collection(&self.collection, self.options.clone()) + .await + } + }; + result.map(|_| None) + } + .boxed() } } @@ -1585,36 +1228,21 @@ pub(super) struct AssertCollectionExists { collection: String, } -#[async_trait] impl TestOperation for AssertCollectionExists { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, client: &TestClient) -> Result> { - let collections = client - .database(&self.database) - .list_collection_names(None) - .await - .unwrap(); - assert!(collections.contains(&self.collection)); - Ok(None) - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_client<'a>( + &'a self, + client: &'a TestClient, + ) -> BoxFuture<'a, Result>> { + async move { + let collections = client + .database(&self.database) + .list_collection_names(None) + .await + .unwrap(); + assert!(collections.contains(&self.collection)); + Ok(None) + } + .boxed() } } @@ -1624,65 +1252,25 @@ pub(super) struct AssertCollectionNotExists { collection: String, } -#[async_trait] impl TestOperation for AssertCollectionNotExists { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, client: &TestClient) -> Result> { - let collections = client - .database(&self.database) - .list_collection_names(None) - .await - .unwrap(); - assert!(!collections.contains(&self.collection)); - Ok(None) - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() + fn execute_on_client<'a>( + &'a self, + client: &'a TestClient, + ) -> BoxFuture<'a, Result>> { + async move { + let collections = client + .database(&self.database) + .list_collection_names(None) + .await + .unwrap(); + assert!(!collections.contains(&self.collection)); + Ok(None) + } + .boxed() } } #[derive(Debug, Deserialize)] pub(super) struct UnimplementedOperation; -#[async_trait] -impl TestOperation for UnimplementedOperation { - async fn execute_on_collection( - &self, - _collection: &Collection, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_database( - &self, - _database: &Database, - _session: Option<&mut ClientSession>, - ) -> Result> { - unimplemented!() - } - - async fn execute_on_client(&self, _client: &TestClient) -> Result> { - unimplemented!() - } - - async fn execute_on_session(&self, _session: &mut ClientSession) -> Result> { - unimplemented!() - } -} +impl TestOperation for UnimplementedOperation {}