diff --git a/internal/integration/crud_prose_test.go b/internal/integration/crud_prose_test.go index a814cf26dc..fedf8f2b74 100644 --- a/internal/integration/crud_prose_test.go +++ b/internal/integration/crud_prose_test.go @@ -499,7 +499,10 @@ func TestClientBulkWriteProse(t *testing.T) { assert.Equal(mt, 1, opsCnt[1], "expected %d secondEvent.command.ops, got: %d", 1, opsCnt[1]) }) - mt.Run("5. MongoClient.bulkWrite collects WriteConcernErrors across batches", func(mt *mtest.T) { + // TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded + // topologies. Allow running on sharded topologies once that is fixed. + noShardedOpts := mtest.NewOptions().Topologies(mtest.Single, mtest.ReplicaSet, mtest.LoadBalanced) + mt.RunOpts("5. MongoClient.bulkWrite collects WriteConcernErrors across batches", noShardedOpts, func(mt *mtest.T) { var eventCnt int monitor := &event.CommandMonitor{ Started: func(_ context.Context, e *event.CommandStartedEvent) { @@ -715,7 +718,9 @@ func TestClientBulkWriteProse(t *testing.T) { assert.Equal(mt, 1, getMoreCalled, "expected %d getMore call, got: %d", 1, getMoreCalled) }) - mt.Run("9. MongoClient.bulkWrite handles a getMore error", func(mt *mtest.T) { + // TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded + // topologies. Allow running on sharded topologies once that is fixed. + mt.RunOpts("9. MongoClient.bulkWrite handles a getMore error", noShardedOpts, func(mt *mtest.T) { var getMoreCalled int var killCursorsCalled int monitor := &event.CommandMonitor{ diff --git a/internal/integration/csot_prose_test.go b/internal/integration/csot_prose_test.go index 28d28d60ed..52ea1d453c 100644 --- a/internal/integration/csot_prose_test.go +++ b/internal/integration/csot_prose_test.go @@ -238,7 +238,10 @@ func TestCSOTProse_GridFS(t *testing.T) { mt := mtest.New(t, mtest.NewOptions().CreateClient(false)) mt.RunOpts("6. gridfs - upload", mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) { - mt.Run("uploads via openUploadStream can be timed out", func(mt *mtest.T) { + // TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded + // topologies. Allow running on sharded topologies once that is fixed. + noShardedOpts := mtest.NewOptions().Topologies(mtest.Single, mtest.ReplicaSet, mtest.LoadBalanced) + mt.RunOpts("uploads via openUploadStream can be timed out", noShardedOpts, func(mt *mtest.T) { // Drop and re-create the db.fs.files and db.fs.chunks collections. err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background()) assert.NoError(mt, err, "failed to drop files") @@ -298,7 +301,9 @@ func TestCSOTProse_GridFS(t *testing.T) { assert.Error(t, err, context.DeadlineExceeded) }) - mt.Run("Aborting an upload stream can be timed out", func(mt *mtest.T) { + // TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded + // topologies. Allow running on sharded topologies once that is fixed. + mt.RunOpts("Aborting an upload stream can be timed out", noShardedOpts, func(mt *mtest.T) { // Drop and re-create the db.fs.files and db.fs.chunks collections. err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background()) assert.NoError(mt, err, "failed to drop files") @@ -414,7 +419,12 @@ func TestCSOTProse_GridFS(t *testing.T) { }) const test62 = "6.2 gridfs - upload with operation-level timeout" - mt.RunOpts(test62, mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) { + mtOpts := mtest.NewOptions(). + MinServerVersion("4.4"). + // TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded + // topologies. Allow running on sharded topologies once that is fixed. + Topologies(mtest.Single, mtest.ReplicaSet, mtest.LoadBalanced) + mt.RunOpts(test62, mtOpts, func(mt *mtest.T) { // Drop and re-create the db.fs.files and db.fs.chunks collections. err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background()) assert.NoError(mt, err, "failed to drop files") diff --git a/internal/integration/mtest/mongotest.go b/internal/integration/mtest/mongotest.go index cf89da8180..d739bba895 100644 --- a/internal/integration/mtest/mongotest.go +++ b/internal/integration/mtest/mongotest.go @@ -80,10 +80,11 @@ type T struct { requireAPIVersion *bool // options copied to sub-tests - clientType ClientType - clientOpts *options.ClientOptions - collOpts *options.CollectionOptionsBuilder - shareClient *bool + clientType ClientType + clientOpts *options.ClientOptions + collOpts *options.CollectionOptionsBuilder + shareClient *bool + allowFailPointsOnSharded bool baseOpts *Options // used to create subtests @@ -125,6 +126,9 @@ func newT(wrapped *testing.T, opts ...*Options) *T { if t.shareClient != nil { t.baseOpts.ShareClient(*t.shareClient) } + if t.allowFailPointsOnSharded { + t.baseOpts.AllowFailPointsOnSharded() + } return t } @@ -501,6 +505,21 @@ func (t *T) ClearCollections() { // SetFailPoint sets a fail point for the client associated with T. Commands to create the failpoint will appear // in command monitoring channels. The fail point will automatically be disabled after this test has run. func (t *T) SetFailPoint(fp failpoint.FailPoint) { + // Do not allow failpoints to be used on sharded topologies unless + // specifically configured to allow it. + // + // On sharded topologies, failpoints are applied to only a single mongoS. If + // the driver is connected to multiple mongoS instances, there's a + // possibility a different mongoS will be selected for a subsequent command. + // In that case, the failpoint is effectively ignored, leading to a test + // failure that is extremely difficult to diagnose. + // + // TODO(GODRIVER-3328): Remove this once we set failpoints on every mongoS + // in sharded topologies. + if testContext.topoKind == Sharded && !t.allowFailPointsOnSharded { + t.Fatalf("cannot use failpoints with sharded topologies unless AllowFailPointsOnSharded is set") + } + // ensure mode fields are int32 if modeMap, ok := fp.Mode.(map[string]any); ok { var key string diff --git a/internal/integration/mtest/options.go b/internal/integration/mtest/options.go index 2f41db2bc1..b2a6cb6d2d 100644 --- a/internal/integration/mtest/options.go +++ b/internal/integration/mtest/options.go @@ -281,3 +281,18 @@ func (op *Options) RequireAPIVersion(rav bool) *Options { }) return op } + +// AllowFailPointsOnSharded bypasses the check for failpoints used on sharded +// topologies. +// +// Failpoints are generally unreliable on sharded topologies, but can be used if +// the failpoint is explicitly applied to every mongoS node in the cluster. +// +// TODO(GODRIVER-3328): Remove this option once we set failpoints on every +// mongoS in sharded topologies. +func (op *Options) AllowFailPointsOnSharded() *Options { + op.optFuncs = append(op.optFuncs, func(t *T) { + t.allowFailPointsOnSharded = true + }) + return op +} diff --git a/internal/integration/retryable_reads_prose_test.go b/internal/integration/retryable_reads_prose_test.go index 3b2b50f2c8..a8071695bb 100644 --- a/internal/integration/retryable_reads_prose_test.go +++ b/internal/integration/retryable_reads_prose_test.go @@ -34,14 +34,17 @@ func TestRetryableReadsProse(t *testing.T) { SetPoolMonitor(tpm.PoolMonitor).SetHeartbeatInterval(500 * time.Millisecond). SetHosts(hosts[:1]) - mtOpts := mtest.NewOptions().ClientOptions(clientOpts).MinServerVersion("4.3") - mt := mtest.New(t, mtOpts) - - mt.Run("PoolClearedError retryability", func(mt *mtest.T) { - if mtest.ClusterTopologyKind() == mtest.LoadBalanced { - mt.Skip("skipping as load balanced topology has different pool clearing behavior") - } - + mt := mtest.New(t, mtest.NewOptions().ClientOptions(clientOpts)) + + mtOpts := mtest.NewOptions(). + MinServerVersion("4.3"). + // Load-balanced topologies have a different behavior for clearing the + // pool, so don't run the test on load-balanced topologies + // + // TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded + // topologies. Allow running on sharded topologies once that is fixed. + Topologies(mtest.Single, mtest.ReplicaSet) + mt.RunOpts("PoolClearedError retryability", mtOpts, func(mt *mtest.T) { // Insert a document to test collection. _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}}) assert.Nil(mt, err, "InsertOne error: %v", err) @@ -106,7 +109,7 @@ func TestRetryableReadsProse(t *testing.T) { } }) - mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2") + mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2").AllowFailPointsOnSharded() mt.RunOpts("retrying in sharded cluster", mtOpts, func(mt *mtest.T) { tests := []struct { name string diff --git a/internal/integration/retryable_writes_prose_test.go b/internal/integration/retryable_writes_prose_test.go index 4fa3fab309..d38569e35a 100644 --- a/internal/integration/retryable_writes_prose_test.go +++ b/internal/integration/retryable_writes_prose_test.go @@ -155,8 +155,12 @@ func TestRetryableWritesProse(t *testing.T) { SetPoolMonitor(tpm.PoolMonitor).SetHeartbeatInterval(500 * time.Millisecond). SetHosts(hosts[:1]) - mtPceOpts := mtest.NewOptions().ClientOptions(pceOpts).MinServerVersion("4.3"). - Topologies(mtest.ReplicaSet, mtest.Sharded) + mtPceOpts := mtest.NewOptions(). + ClientOptions(pceOpts). + MinServerVersion("4.3"). + // TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded + // topologies. Allow running on sharded topologies once that is fixed. + Topologies(mtest.ReplicaSet) mt.RunOpts("PoolClearedError retryability", mtPceOpts, func(mt *mtest.T) { // Force Find to block for 1 second once. mt.SetFailPoint(failpoint.FailPoint{ @@ -287,7 +291,7 @@ func TestRetryableWritesProse(t *testing.T) { require.True(mt, err.(mongo.WriteException).HasErrorCode(int(shutdownInProgressErrorCode))) }) - mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2") + mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2").AllowFailPointsOnSharded() mt.RunOpts("retrying in sharded cluster", mtOpts, func(mt *mtest.T) { tests := []struct { name string diff --git a/internal/integration/sdam_prose_test.go b/internal/integration/sdam_prose_test.go index 4995b7e90b..274d6c0abb 100644 --- a/internal/integration/sdam_prose_test.go +++ b/internal/integration/sdam_prose_test.go @@ -98,7 +98,10 @@ func TestSDAMProse(t *testing.T) { SetAppName("streamingRttTest") mtOpts := mtest.NewOptions(). MinServerVersion("4.4"). - ClientOptions(clientOpts) + ClientOptions(clientOpts). + // TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded + // clusters. Remove this exclusion once we fix that. + Topologies(mtest.Single, mtest.ReplicaSet, mtest.LoadBalanced) mt.RunOpts("rtt is continuously updated", mtOpts, func(mt *mtest.T) { // Test that the RTT monitor updates the RTT for server descriptions. diff --git a/internal/integration/server_selection_prose_test.go b/internal/integration/server_selection_prose_test.go index 8bed374785..07dc4254e0 100644 --- a/internal/integration/server_selection_prose_test.go +++ b/internal/integration/server_selection_prose_test.go @@ -112,7 +112,7 @@ func TestServerSelectionProse(t *testing.T) { mt := mtest.New(t, mtest.NewOptions().CreateClient(false)) - mtOpts := mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.9") + mtOpts := mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.9").AllowFailPointsOnSharded() mt.RunOpts("operationCount-based selection within latency window, with failpoint", mtOpts, func(mt *mtest.T) { _, err := mt.Coll.InsertOne(context.Background(), bson.D{}) require.NoError(mt, err, "InsertOne() error")