diff --git a/dockey/agg.go b/dockey/agg.go new file mode 100644 index 00000000..19400029 --- /dev/null +++ b/dockey/agg.go @@ -0,0 +1,89 @@ +// Package dockey contains logic related to document key determination. +// Its tests use a cluster and thus are stored in internal/verifier. + +package dockey + +import ( + "maps" + "slices" + "strconv" + + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson" +) + +// ExtractTrueDocKeyAgg returns an aggregation expression that extracts the +// document key from the document to which the `docExpr` refers. +// +// NB: This avoids the problem documented in SERVER-109340; as a result, +// the returned key may not always match the change stream’s `documentKey` +// (because the server misreports its own sharding logic). +func ExtractTrueDocKeyAgg(fieldNames []string, docExpr string) bson.D { + assertFieldNameUniqueness(fieldNames) + + var docKeyNumKeys bson.D + numToKeyLookup := map[string]string{} + + for n, name := range fieldNames { + var valExpr = docExpr + "." + name + + // Aggregation forbids direct creation of an object with dotted keys. + // So here we create an object with numeric keys, then below we’ll + // map the numeric keys back to the real ones. + + nStr := strconv.Itoa(n) + docKeyNumKeys = append(docKeyNumKeys, bson.E{nStr, valExpr}) + numToKeyLookup[nStr] = name + } + + // Now convert the numeric keys back to the real ones. + return mapObjectKeysAgg(docKeyNumKeys, numToKeyLookup) +} + +// Potentially reusable: +func mapObjectKeysAgg(expr any, mapping map[string]string) bson.D { + // We would ideally pass mapping into the aggregation and $getField + // to get the mapped key, but pre-v8 server versions required $getField’s + // field parameter to be a constant. (And pre-v5 didn’t have $getField + // at all.) So we use a $switch instead. + mapAgg := bson.D{ + {"$switch", bson.D{ + {"branches", lo.Map( + slices.Collect(maps.Keys(mapping)), + func(key string, _ int) bson.D { + return bson.D{ + {"case", bson.D{ + {"$eq", bson.A{ + key, + "$$numericKey", + }}, + }}, + {"then", mapping[key]}, + } + }, + )}, + }}, + } + + return bson.D{ + {"$arrayToObject", bson.D{ + {"$map", bson.D{ + {"input", bson.D{ + {"$objectToArray", expr}, + }}, + {"in", bson.D{ + {"$let", bson.D{ + {"vars", bson.D{ + {"numericKey", "$$this.k"}, + {"value", "$$this.v"}, + }}, + {"in", bson.D{ + {"k", mapAgg}, + {"v", "$$value"}, + }}, + }}, + }}, + }}, + }}, + } +} diff --git a/dockey/agg_test.go b/dockey/agg_test.go new file mode 100644 index 00000000..5a4d3ec5 --- /dev/null +++ b/dockey/agg_test.go @@ -0,0 +1,20 @@ +package dockey + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAggPanic(t *testing.T) { + assert.Panics( + t, + func() { + ExtractTrueDocKeyAgg( + []string{"foo", "bar", "foo"}, + "$$ROOT", + ) + }, + "duplicate field name should cause panic", + ) +} diff --git a/dockey/raw.go b/dockey/raw.go new file mode 100644 index 00000000..0338100a --- /dev/null +++ b/dockey/raw.go @@ -0,0 +1,55 @@ +package dockey + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" +) + +// This extracts the document key from a document gets its field names. +// +// NB: This avoids the problem documented in SERVER-109340; as a result, +// the returned key may not always match the change stream’s `documentKey` +// (because the server misreports its own sharding logic). +func ExtractTrueDocKeyFromDoc( + fieldNames []string, + doc bson.Raw, +) (bson.Raw, error) { + assertFieldNameUniqueness(fieldNames) + + var dk bson.D + for _, field := range fieldNames { + + // This is how sharding routes documents: it always + // splits on the dot and looks deeply into the document. + parts := strings.Split(field, ".") + val, err := doc.LookupErr(parts...) + + if errors.Is(err, bsoncore.ErrElementNotFound) || errors.As(err, &bsoncore.InvalidDepthTraversalError{}) { + // If the document lacks a value for this field + // then don’t add it to the document key. + continue + } else if err == nil { + dk = append(dk, bson.E{field, val}) + } else { + return nil, errors.Wrapf(err, "extracting doc key field %#q from doc %+v", field, doc) + } + } + + docKey, err := bson.Marshal(dk) + if err != nil { + return nil, errors.Wrapf(err, "marshaling doc key %v from doc %v", dk, docKey) + } + + return docKey, nil +} + +func assertFieldNameUniqueness(fieldNames []string) { + if len(lo.Uniq(fieldNames)) != len(fieldNames) { + panic(fmt.Sprintf("Duplicate field names: %v", fieldNames)) + } +} diff --git a/dockey/raw_test.go b/dockey/raw_test.go new file mode 100644 index 00000000..3e5a9652 --- /dev/null +++ b/dockey/raw_test.go @@ -0,0 +1,60 @@ +package dockey + +import ( + "slices" + "testing" + + "github.com/10gen/migration-verifier/dockey/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/bson" +) + +func TestExtractTrueDocKeyFromDoc(t *testing.T) { + for _, reverseYN := range []bool{false, true} { + fieldNames := slices.Clone(test.FieldNames) + + if reverseYN { + slices.Reverse(fieldNames) + } + + for _, curCase := range test.TestCases { + raw, err := bson.Marshal(curCase.Doc) + require.NoError(t, err) + + computedRaw, err := ExtractTrueDocKeyFromDoc( + fieldNames, + raw, + ) + require.NoError(t, err) + + var computedDocKey bson.D + require.NoError(t, bson.Unmarshal(computedRaw, &computedDocKey)) + + expectedDocKey := slices.Clone(curCase.DocKey) + if reverseYN { + slices.Reverse(expectedDocKey) + } + + assert.Equal( + t, + expectedDocKey, + computedDocKey, + "doc key for %v (fieldNames: %v)", + bson.Raw(raw), + fieldNames, + ) + } + } + + assert.Panics( + t, + func() { + _, _ = ExtractTrueDocKeyFromDoc( + []string{"foo", "bar", "foo"}, + bson.Raw{0}, + ) + }, + "duplicate field name should cause panic", + ) +} diff --git a/dockey/test/cases.go b/dockey/test/cases.go new file mode 100644 index 00000000..a59e2cbd --- /dev/null +++ b/dockey/test/cases.go @@ -0,0 +1,78 @@ +package test + +import ( + "github.com/10gen/migration-verifier/mslices" + "go.mongodb.org/mongo-driver/bson" +) + +type TestCase struct { + Doc bson.D + DocKey bson.D +} + +var FieldNames = mslices.Of("_id", "foo.bar.baz") + +var TestCases = []TestCase{ + { + Doc: bson.D{ + {"_id", "abc"}, + {"foo", bson.D{ + {"bar", bson.D{{"baz", 1}}}, + {"bar.baz", 2}, + }}, + {"foo.bar", bson.D{{"baz", 3}}}, + {"foo.bar.baz", 4}, + }, + DocKey: bson.D{ + {"_id", "abc"}, + {"foo.bar.baz", int32(1)}, + }, + }, + { + Doc: bson.D{ + {"_id", "bbb"}, + {"foo", bson.D{ + {"bar", bson.D{{"baz", 1}}}, + {"bar.baz", 2}, + }}, + {"foo.bar", bson.D{{"baz", 3}}}, + }, + DocKey: bson.D{ + {"_id", "bbb"}, + {"foo.bar.baz", int32(1)}, + }, + }, + { + Doc: bson.D{ + {"_id", "ccc"}, + {"foo", bson.D{ + {"bar.baz", 2}, + }}, + {"foo.bar", bson.D{{"baz", 3}}}, + }, + DocKey: bson.D{ + {"_id", "ccc"}, + }, + }, + { + Doc: bson.D{ + {"_id", "ddd"}, + {"foo", bson.D{ + {"bar", bson.D{{"baz", nil}}}, + }}, + }, + DocKey: bson.D{ + {"_id", "ddd"}, + {"foo.bar.baz", nil}, + }, + }, + { + Doc: bson.D{ + {"_id", "eee"}, + {"foo", "bar"}, + }, + DocKey: bson.D{ + {"_id", "eee"}, + }, + }, +} diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 18b32239..288d6c7e 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -8,13 +8,13 @@ import ( "github.com/10gen/migration-verifier/chanutil" "github.com/10gen/migration-verifier/contextplus" + "github.com/10gen/migration-verifier/dockey" "github.com/10gen/migration-verifier/internal/reportutils" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" - "github.com/samber/lo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -126,16 +126,15 @@ func (verifier *Verifier) compareDocsFromChannels( // b) compares the new doc against its previously-received, cached // counterpart and records any mismatch. handleNewDoc := func(curDocWithTs docWithTs, isSrc bool) error { - docKeyValues := lo.Map( + docKeyValues, err := getDocKeyValues( + verifier.docCompareMethod, + curDocWithTs.doc, mapKeyFieldNames, - func(fieldName string, _ int) bson.RawValue { - return getDocKeyFieldFromComparison( - verifier.docCompareMethod, - curDocWithTs.doc, - fieldName, - ) - }, ) + if err != nil { + return errors.Wrapf(err, "extracting doc key (fields: %v) values from doc %+v", mapKeyFieldNames, curDocWithTs.doc) + } + mapKey := getMapKey(docKeyValues) var ourMap, theirMap map[string]docWithTs @@ -321,10 +320,9 @@ func (verifier *Verifier) compareDocsFromChannels( results = append( results, VerificationResult{ - ID: getDocKeyFieldFromComparison( + ID: getDocIdFromComparison( verifier.docCompareMethod, docWithTs.doc, - "_id", ), Details: Missing, Cluster: ClusterTarget, @@ -339,10 +337,9 @@ func (verifier *Verifier) compareDocsFromChannels( results = append( results, VerificationResult{ - ID: getDocKeyFieldFromComparison( + ID: getDocIdFromComparison( verifier.docCompareMethod, docWithTs.doc, - "_id", ), Details: Missing, Cluster: ClusterSource, @@ -356,21 +353,74 @@ func (verifier *Verifier) compareDocsFromChannels( return results, srcDocCount, srcByteCount, nil } -func getDocKeyFieldFromComparison( +func getDocIdFromComparison( docCompareMethod DocCompareMethod, doc bson.Raw, - fieldName string, ) bson.RawValue { switch docCompareMethod { case DocCompareBinary, DocCompareIgnoreOrder: - return doc.Lookup(fieldName) + return doc.Lookup("_id") case DocCompareToHashedIndexKey: - return doc.Lookup(docKeyInHashedCompare, fieldName) + return doc.Lookup(docKeyInHashedCompare, "_id") default: panic("bad doc compare method: " + docCompareMethod) } } +func getDocKeyValues( + docCompareMethod DocCompareMethod, + doc bson.Raw, + fieldNames []string, +) ([]bson.RawValue, error) { + var docKey bson.Raw + + switch docCompareMethod { + case DocCompareBinary, DocCompareIgnoreOrder: + // If we have the full document, create the document key manually: + var err error + docKey, err = dockey.ExtractTrueDocKeyFromDoc(fieldNames, doc) + if err != nil { + return nil, err + } + case DocCompareToHashedIndexKey: + // If we have a hash, then the aggregation should have extracted the + // document key for us. + docKeyVal, err := doc.LookupErr(docKeyInHashedCompare) + if err != nil { + return nil, errors.Wrapf(err, "fetching %#q from doc %v", docKeyInHashedCompare, doc) + } + + var isDoc bool + docKey, isDoc = docKeyVal.DocumentOK() + if !isDoc { + return nil, fmt.Errorf( + "%#q in doc %v is type %s but should be %s", + docKeyInHashedCompare, + doc, + docKeyVal.Type, + bson.TypeEmbeddedDocument, + ) + } + } + + var values []bson.RawValue + els, err := docKey.Elements() + if err != nil { + return nil, errors.Wrapf(err, "parsing doc key (%+v) of doc %+v", docKey, doc) + } + + for _, el := range els { + val, err := el.ValueErr() + if err != nil { + return nil, errors.Wrapf(err, "parsing doc key element (%+v) of doc %+v", el, doc) + } + + values = append(values, val) + } + + return values, nil +} + func simpleTimerReset(t *time.Timer, dur time.Duration) { if !t.Stop() { <-t.C @@ -607,15 +657,18 @@ func (verifier *Verifier) getDocumentsCursor(ctx mongo.SessionContext, collectio // Suppress this log for recheck tasks because the list of IDs can be // quite long. if !task.IsRecheck() { - extJSON, _ := bson.MarshalExtJSON(cmd, true, false) - - verifier.logger.Debug(). - Any("task", task.PrimaryKey). - Str("cmd", lo.Ternary( - extJSON == nil, - fmt.Sprintf("%s", cmd), - string(extJSON), - )). + + evt := verifier.logger.Debug(). + Any("task", task.PrimaryKey) + + extJSON, err := bson.MarshalExtJSON(cmd, true, false) + if err != nil { + evt = evt.Str("cmd", fmt.Sprintf("%s", cmd)) + } else { + evt = evt.RawJSON("cmd", extJSON) + } + + evt. Str("options", fmt.Sprintf("%v", *runCommandOptions)). Msg("getDocuments command.") } @@ -631,12 +684,10 @@ func transformPipelineForToHashedIndexKey( slices.Clone(in), bson.D{{"$replaceWith", bson.D{ // Single-letter field names minimize the document size. - {docKeyInHashedCompare, bson.D(lo.Map( + {docKeyInHashedCompare, dockey.ExtractTrueDocKeyAgg( task.QueryFilter.GetDocKeyFields(), - func(f string, _ int) bson.E { - return bson.E{f, "$$ROOT." + f} - }, - ))}, + "$$ROOT", + )}, {"h", bson.D{ {"$toHashedIndexKey", bson.D{ {"$_internalKeyStringValue", bson.D{ @@ -658,7 +709,7 @@ func (verifier *Verifier) compareOneDocument(srcClientDoc, dstClientDoc bson.Raw if verifier.docCompareMethod == DocCompareToHashedIndexKey { // With hash comparison, mismatches are opaque. return []VerificationResult{{ - ID: getDocKeyFieldFromComparison(verifier.docCompareMethod, srcClientDoc, "_id"), + ID: getDocIdFromComparison(verifier.docCompareMethod, srcClientDoc), Details: Mismatch, Cluster: ClusterTarget, NameSpace: namespace, diff --git a/internal/verifier/dockey_agg_test.go b/internal/verifier/dockey_agg_test.go new file mode 100644 index 00000000..8031f2f1 --- /dev/null +++ b/internal/verifier/dockey_agg_test.go @@ -0,0 +1,93 @@ +package verifier + +import ( + "github.com/10gen/migration-verifier/dockey" + "github.com/10gen/migration-verifier/dockey/test" + "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "golang.org/x/exp/slices" +) + +func (suite *IntegrationTestSuite) TestExtractTrueDocKeyAgg() { + suite.testExtractTrueDocKeyAgg(false) +} + +func (suite *IntegrationTestSuite) TestExtractTrueDocKeyAgg_Reverse() { + suite.testExtractTrueDocKeyAgg(true) +} + +func (suite *IntegrationTestSuite) testExtractTrueDocKeyAgg(reverseYN bool) { + t := suite.T() + + ctx := t.Context() + + require := require.New(t) + + client := suite.srcMongoClient + + db := client.Database(suite.DBNameForTest()) + defer func() { + err := db.Drop(ctx) + if err != nil { + t.Logf("WARNING: Failed to drop DB %#q: %v", db.Name(), err) + } else { + t.Logf("Dropped DB %#q", db.Name()) + } + }() + + // Insert into a real collection so that we can test on + // server versions that lack the $documents aggregation stage. + coll := db.Collection("stuff") + + _, err := coll.InsertMany( + ctx, + lo.Map( + test.TestCases, + func(tc test.TestCase, _ int) any { + return tc.Doc + }, + ), + ) + require.NoError(err, "should insert") + + fieldNames := slices.Clone(test.FieldNames) + if reverseYN { + slices.Reverse(fieldNames) + } + + computedDocKeyAgg := dockey.ExtractTrueDocKeyAgg( + fieldNames, + "$$ROOT", + ) + + cursor, err := coll.Aggregate( + ctx, + mongo.Pipeline{ + {{"$replaceWith", computedDocKeyAgg}}, + }, + ) + require.NoError(err, "should open cursor to agg") + defer cursor.Close(ctx) + + var computedDocKeys []bson.D + require.NoError(cursor.All(ctx, &computedDocKeys)) + require.Len(computedDocKeys, len(test.TestCases)) + + for c, curCase := range test.TestCases { + expectedDocKey := slices.Clone(curCase.DocKey) + if reverseYN { + slices.Reverse(expectedDocKey) + } + + assert.Equal( + suite.T(), + expectedDocKey, + computedDocKeys[c], + "doc key for %+v", + curCase.Doc, + ) + } +} diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 7eadb93e..05928d58 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -99,6 +99,136 @@ func (suite *IntegrationTestSuite) TestProcessVerifyTask_Failure() { assert.ErrorContains(t, err, expectedIDHex) } +func (suite *IntegrationTestSuite) TestVerifier_Dotted_Shard_Key() { + ctx := suite.T().Context() + require := require.New(suite.T()) + + for _, client := range mslices.Of(suite.srcMongoClient, suite.dstMongoClient) { + if suite.GetTopology(client) != util.TopologySharded { + suite.T().Skip("sharded clusters required") + } + } + + dbName := suite.DBNameForTest() + collName := "coll" + + verifier := suite.BuildVerifier() + srcColl := verifier.srcClient.Database(dbName).Collection(collName) + dstColl := verifier.dstClient.Database(dbName).Collection(collName) + + docs := []bson.D{ + // NB: It’s not possible to test with /foo.bar.baz or /foo/bar.baz + // because the server (as of v8) will route those to the same shard. + // Only /foo/bar/baz actually gets routed as per the shard key, so + // only that value can create a duplicate ID. + {{"_id", 33}, {"foo", bson.D{{"bar", bson.D{{"baz", 100}}}}}}, + {{"_id", 33}, {"foo", bson.D{{"bar", bson.D{{"baz", 200}}}}}}, + } + + shardKey := bson.D{ + {"foo.bar.baz", 1}, + } + + for _, coll := range mslices.Of(srcColl, dstColl) { + db := coll.Database() + client := db.Client() + + // For sharded, pre-v8 clusters we need to create the collection first. + require.NoError(db.CreateCollection(ctx, coll.Name())) + require.NoError(client.Database("admin").RunCommand( + ctx, + bson.D{{"enableSharding", db.Name()}}, + ).Err()) + + require.NoError(client.Database("admin").RunCommand( + ctx, + bson.D{ + {"shardCollection", db.Name() + "." + coll.Name()}, + {"key", shardKey}, + }, + ).Err(), + ) + + shardIds := getShardIds(suite.T(), client) + + admin := client.Database("admin") + keyField := "foo.bar.baz" + splitKey := bson.D{{keyField, 150}} + + require.NoError( + admin.RunCommand(ctx, bson.D{ + {"split", db.Name() + "." + coll.Name()}, + {"middle", splitKey}, + }).Err(), + ) + + require.NoError( + admin.RunCommand(ctx, bson.D{ + {"moveChunk", db.Name() + "." + coll.Name()}, + {"to", shardIds[0]}, + {"find", bson.D{{keyField, 149}}}, + }).Err(), + ) + + require.NoError( + admin.RunCommand(ctx, bson.D{ + {"moveChunk", db.Name() + "." + coll.Name()}, + {"to", shardIds[1]}, + {"find", bson.D{{keyField, 151}}}, + }).Err(), + ) + + _, err := coll.InsertMany(ctx, lo.ToAnySlice(lo.Shuffle(docs))) + require.NoError(err, "should insert all docs") + } + + task := &VerificationTask{ + PrimaryKey: primitive.NewObjectID(), + QueryFilter: QueryFilter{ + Namespace: dbName + "." + collName, + To: dbName + "." + collName, + ShardKeys: lo.Map( + shardKey, + func(el bson.E, _ int) string { + return el.Key + }, + ), + Partition: &partitions.Partition{ + Key: partitions.PartitionKey{ + Lower: primitive.MinKey{}, + }, + Upper: primitive.MaxKey{}, + }, + }, + } + + results, docCount, _, err := verifier.FetchAndCompareDocuments(ctx, 0, task) + require.NoError(err, "should fetch & compare") + assert.EqualValues(suite.T(), len(docs), docCount, "expected # of docs") + assert.Empty(suite.T(), results, "should find no problem") +} + +func getShardIds(t *testing.T, client *mongo.Client) []string { + res, err := runListShards(t.Context(), logger.NewDefaultLogger(), client) + require.NoError(t, err) + + type shardData struct { + Id string `bson:"_id"` + } + var parsed struct { + Shards []shardData + } + + require.NoError(t, res.Decode(&parsed)) + + return lo.Map( + parsed.Shards, + func(sd shardData, _ int) string { + return sd.Id + }, + ) +} + func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() { verifier := suite.BuildVerifier() ctx := suite.Context()