Skip to content

GODRIVER-3638 Prohibit using failpoints on sharded topologies. #2168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions internal/integration/crud_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,10 @@ func TestClientBulkWriteProse(t *testing.T) {
assert.Equal(mt, 1, opsCnt[1], "expected %d secondEvent.command.ops, got: %d", 1, opsCnt[1])
})

mt.Run("5. MongoClient.bulkWrite collects WriteConcernErrors across batches", func(mt *mtest.T) {
// TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded
// topologies. Allow running on sharded topologies once that is fixed.
noShardedOpts := mtest.NewOptions().Topologies(mtest.Single, mtest.ReplicaSet, mtest.LoadBalanced)
mt.RunOpts("5. MongoClient.bulkWrite collects WriteConcernErrors across batches", noShardedOpts, func(mt *mtest.T) {
var eventCnt int
monitor := &event.CommandMonitor{
Started: func(_ context.Context, e *event.CommandStartedEvent) {
Expand Down Expand Up @@ -715,7 +718,9 @@ func TestClientBulkWriteProse(t *testing.T) {
assert.Equal(mt, 1, getMoreCalled, "expected %d getMore call, got: %d", 1, getMoreCalled)
})

mt.Run("9. MongoClient.bulkWrite handles a getMore error", func(mt *mtest.T) {
// TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded
// topologies. Allow running on sharded topologies once that is fixed.
mt.RunOpts("9. MongoClient.bulkWrite handles a getMore error", noShardedOpts, func(mt *mtest.T) {
var getMoreCalled int
var killCursorsCalled int
monitor := &event.CommandMonitor{
Expand Down
16 changes: 13 additions & 3 deletions internal/integration/csot_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ func TestCSOTProse_GridFS(t *testing.T) {
mt := mtest.New(t, mtest.NewOptions().CreateClient(false))

mt.RunOpts("6. gridfs - upload", mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) {
mt.Run("uploads via openUploadStream can be timed out", func(mt *mtest.T) {
// TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded
// topologies. Allow running on sharded topologies once that is fixed.
noShardedOpts := mtest.NewOptions().Topologies(mtest.Single, mtest.ReplicaSet, mtest.LoadBalanced)
mt.RunOpts("uploads via openUploadStream can be timed out", noShardedOpts, func(mt *mtest.T) {
// Drop and re-create the db.fs.files and db.fs.chunks collections.
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background())
assert.NoError(mt, err, "failed to drop files")
Expand Down Expand Up @@ -298,7 +301,9 @@ func TestCSOTProse_GridFS(t *testing.T) {
assert.Error(t, err, context.DeadlineExceeded)
})

mt.Run("Aborting an upload stream can be timed out", func(mt *mtest.T) {
// TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded
// topologies. Allow running on sharded topologies once that is fixed.
mt.RunOpts("Aborting an upload stream can be timed out", noShardedOpts, func(mt *mtest.T) {
// Drop and re-create the db.fs.files and db.fs.chunks collections.
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background())
assert.NoError(mt, err, "failed to drop files")
Expand Down Expand Up @@ -414,7 +419,12 @@ func TestCSOTProse_GridFS(t *testing.T) {
})

const test62 = "6.2 gridfs - upload with operation-level timeout"
mt.RunOpts(test62, mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) {
mtOpts := mtest.NewOptions().
MinServerVersion("4.4").
// TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded
// topologies. Allow running on sharded topologies once that is fixed.
Topologies(mtest.Single, mtest.ReplicaSet, mtest.LoadBalanced)
mt.RunOpts(test62, mtOpts, func(mt *mtest.T) {
// Drop and re-create the db.fs.files and db.fs.chunks collections.
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background())
assert.NoError(mt, err, "failed to drop files")
Expand Down
52 changes: 34 additions & 18 deletions internal/integration/mtest/mongotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,25 @@ type T struct {
*testing.T

// members for only this T instance
createClient *bool
createCollection *bool
runOn []RunOnBlock
mockDeployment *drivertest.MockDeployment // nil if the test is not being run against a mock
mockResponses []bson.D
createdColls []*Collection // collections created in this test
proxyDialer *proxyDialer
dbName, collName string
failPointNames []string
minServerVersion string
maxServerVersion string
validTopologies []TopologyKind
auth *bool
enterprise *bool
dataLake *bool
ssl *bool
collCreateOpts *options.CreateCollectionOptionsBuilder
requireAPIVersion *bool
createClient *bool
createCollection *bool
runOn []RunOnBlock
mockDeployment *drivertest.MockDeployment // nil if the test is not being run against a mock
mockResponses []bson.D
createdColls []*Collection // collections created in this test
proxyDialer *proxyDialer
dbName, collName string
failPointNames []string
minServerVersion string
maxServerVersion string
validTopologies []TopologyKind
auth *bool
enterprise *bool
dataLake *bool
ssl *bool
collCreateOpts *options.CreateCollectionOptionsBuilder
requireAPIVersion *bool
allowFailPointsOnSharded bool

// options copied to sub-tests
clientType ClientType
Expand Down Expand Up @@ -501,6 +502,21 @@ func (t *T) ClearCollections() {
// SetFailPoint sets a fail point for the client associated with T. Commands to create the failpoint will appear
// in command monitoring channels. The fail point will automatically be disabled after this test has run.
func (t *T) SetFailPoint(fp failpoint.FailPoint) {
// Do not allow failpoints to be used on sharded topologies unless
// specifically configured to allow it.
//
// On sharded topologies, failpoints are applied to only a single mongoS. If
// the driver is connected to multiple mongoS instances, there's a
// possibility a different mongoS will be selected for a subsequent command.
// In that case, the failpoint is effectively ignored, leading to a test
// failure that is extremely difficult to diagnose.
//
// TODO(GODRIVER-3328): Remove this once we set failpoints on every mongoS
// in sharded topologies.
if testContext.topoKind == Sharded && !t.allowFailPointsOnSharded {
t.Fatalf("cannot use failpoints with sharded topologies unless AllowFailPointsOnSharded is set")
}

// ensure mode fields are int32
if modeMap, ok := fp.Mode.(map[string]any); ok {
var key string
Expand Down
15 changes: 15 additions & 0 deletions internal/integration/mtest/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,18 @@ func (op *Options) RequireAPIVersion(rav bool) *Options {
})
return op
}

// AllowFailPointsOnSharded bypasses the check for failpoints used on sharded
// topologies.
//
// Failpoints are generally unreliable on sharded topologies, but can be used if
// the failpoint is explicitly applied to every mongoS node in the cluster.
//
// TODO(GODRIVER-3328): Remove this option once we set failpoints on every
// mongoS in sharded topologies.
func (op *Options) AllowFailPointsOnSharded() *Options {
op.optFuncs = append(op.optFuncs, func(t *T) {
t.allowFailPointsOnSharded = true
})
return op
}
21 changes: 12 additions & 9 deletions internal/integration/retryable_reads_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ func TestRetryableReadsProse(t *testing.T) {
SetPoolMonitor(tpm.PoolMonitor).SetHeartbeatInterval(500 * time.Millisecond).
SetHosts(hosts[:1])

mtOpts := mtest.NewOptions().ClientOptions(clientOpts).MinServerVersion("4.3")
mt := mtest.New(t, mtOpts)

mt.Run("PoolClearedError retryability", func(mt *mtest.T) {
if mtest.ClusterTopologyKind() == mtest.LoadBalanced {
mt.Skip("skipping as load balanced topology has different pool clearing behavior")
}

mt := mtest.New(t, mtest.NewOptions().ClientOptions(clientOpts))

mtOpts := mtest.NewOptions().
MinServerVersion("4.3").
// Load-balanced topologies have a different behavior for clearing the
// pool, so don't run the test on load-balanced topologies
//
// TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded
// topologies. Allow running on sharded topologies once that is fixed.
Topologies(mtest.Single, mtest.ReplicaSet)
mt.RunOpts("PoolClearedError retryability", mtOpts, func(mt *mtest.T) {
// Insert a document to test collection.
_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
assert.Nil(mt, err, "InsertOne error: %v", err)
Expand Down Expand Up @@ -106,7 +109,7 @@ func TestRetryableReadsProse(t *testing.T) {
}
})

mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2")
mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2").AllowFailPointsOnSharded()
mt.RunOpts("retrying in sharded cluster", mtOpts, func(mt *mtest.T) {
tests := []struct {
name string
Expand Down
10 changes: 7 additions & 3 deletions internal/integration/retryable_writes_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,12 @@ func TestRetryableWritesProse(t *testing.T) {
SetPoolMonitor(tpm.PoolMonitor).SetHeartbeatInterval(500 * time.Millisecond).
SetHosts(hosts[:1])

mtPceOpts := mtest.NewOptions().ClientOptions(pceOpts).MinServerVersion("4.3").
Topologies(mtest.ReplicaSet, mtest.Sharded)
mtPceOpts := mtest.NewOptions().
ClientOptions(pceOpts).
MinServerVersion("4.3").
// TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded
// topologies. Allow running on sharded topologies once that is fixed.
Topologies(mtest.ReplicaSet)
mt.RunOpts("PoolClearedError retryability", mtPceOpts, func(mt *mtest.T) {
// Force Find to block for 1 second once.
mt.SetFailPoint(failpoint.FailPoint{
Expand Down Expand Up @@ -287,7 +291,7 @@ func TestRetryableWritesProse(t *testing.T) {
require.True(mt, err.(mongo.WriteException).HasErrorCode(int(shutdownInProgressErrorCode)))
})

mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2")
mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2").AllowFailPointsOnSharded()
mt.RunOpts("retrying in sharded cluster", mtOpts, func(mt *mtest.T) {
tests := []struct {
name string
Expand Down
5 changes: 4 additions & 1 deletion internal/integration/sdam_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ func TestSDAMProse(t *testing.T) {
SetAppName("streamingRttTest")
mtOpts := mtest.NewOptions().
MinServerVersion("4.4").
ClientOptions(clientOpts)
ClientOptions(clientOpts).
// TODO(GODRIVER-3328): FailPoints are not currently reliable on sharded
// clusters. Remove this exclusion once we fix that.
Topologies(mtest.Single, mtest.ReplicaSet, mtest.LoadBalanced)
mt.RunOpts("rtt is continuously updated", mtOpts, func(mt *mtest.T) {
// Test that the RTT monitor updates the RTT for server descriptions.

Expand Down
2 changes: 1 addition & 1 deletion internal/integration/server_selection_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestServerSelectionProse(t *testing.T) {

mt := mtest.New(t, mtest.NewOptions().CreateClient(false))

mtOpts := mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.9")
mtOpts := mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.9").AllowFailPointsOnSharded()
mt.RunOpts("operationCount-based selection within latency window, with failpoint", mtOpts, func(mt *mtest.T) {
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
require.NoError(mt, err, "InsertOne() error")
Expand Down
Loading