Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Import data not considering the initiated cutover during resumption #2369

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions yb-voyager/cmd/cutover.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,4 @@ func ExitIfAlreadyCutover(importerOrExporterRole string) {
panic(fmt.Sprintf("invalid role %s", importerOrExporterRole))
}
}

2 changes: 1 addition & 1 deletion yb-voyager/cmd/eventQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"time"

"github.com/goccy/go-json"

log "github.com/sirupsen/logrus"

"github.com/yugabyte/yb-voyager/yb-voyager/src/metadb"
"github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb"
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils"
Expand Down
1 change: 1 addition & 0 deletions yb-voyager/cmd/exportDataDebezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func isOracleJDBCWalletLocationSet(s srcdb.Source) (bool, error) {
// ---------------------------------------------- Export Data ---------------------------------------//

func debeziumExportData(ctx context.Context, config *dbzm.Config, tableNameToApproxRowCountMap map[string]int64) error {

if config.SnapshotMode != "never" {
err := metaDB.UpdateMigrationStatusRecord(func(record *metadb.MigrationStatusRecord) {
record.SnapshotMechanism = "debezium"
Expand Down
40 changes: 39 additions & 1 deletion yb-voyager/cmd/live_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,36 @@ func init() {
MAX_INTERVAL_BETWEEN_BATCHES = utils.GetEnvAsInt("MAX_INTERVAL_BETWEEN_BATCHES", 2000)
}

func cutoverInitiatedAndCutoverEventProcessed() (bool, error) {
msr, err := metaDB.GetMigrationStatusRecord()
if err != nil {
return false, fmt.Errorf("getting migration status record: %v", err)
}
switch importerRole {
case TARGET_DB_IMPORTER_ROLE:
return msr.CutoverToTargetRequested && msr.CutoverDetectedByTargetImporter, nil
case SOURCE_REPLICA_DB_IMPORTER_ROLE:
return msr.CutoverToSourceReplicaRequested && msr.CutoverDetectedBySourceReplicaImporter, nil
case SOURCE_DB_IMPORTER_ROLE:
return msr.CutoverToSourceRequested && msr.CutoverDetectedBySourceImporter, nil
}

return false, nil
}

func streamChanges(state *ImportDataState, tableNames []sqlname.NameTuple) error {
ok, err := cutoverInitiatedAndCutoverEventProcessed()
if err != nil {
return err
}
if ok {
log.Info("cutover is initiated and the event is detected..")
return nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a log line before return

}
log.Infof("NUM_EVENT_CHANNELS: %d, EVENT_CHANNEL_SIZE: %d, MAX_EVENTS_PER_BATCH: %d, MAX_INTERVAL_BETWEEN_BATCHES: %d",
NUM_EVENT_CHANNELS, EVENT_CHANNEL_SIZE, MAX_EVENTS_PER_BATCH, MAX_INTERVAL_BETWEEN_BATCHES)
// re-initilizing name registry in case it hadn't picked up the names registered on source/target/source-replica
err := namereg.NameReg.Init()
err = namereg.NameReg.Init()
if err != nil {
return fmt.Errorf("init name registry again: %v", err)
}
Expand Down Expand Up @@ -178,6 +203,19 @@ func streamChangesFromSegment(
event.IsCutoverToSourceReplica() && importerRole == SOURCE_REPLICA_DB_IMPORTER_ROLE ||
event.IsCutoverToSource() && importerRole == SOURCE_DB_IMPORTER_ROLE { // cutover or fall-forward command

err := metaDB.UpdateMigrationStatusRecord(func(record *metadb.MigrationStatusRecord) {
switch importerRole {
case TARGET_DB_IMPORTER_ROLE:
record.CutoverDetectedByTargetImporter = true
case SOURCE_REPLICA_DB_IMPORTER_ROLE:
record.CutoverDetectedBySourceReplicaImporter = true
case SOURCE_DB_IMPORTER_ROLE:
record.CutoverDetectedBySourceImporter = true
}
})
if err != nil {
return fmt.Errorf("error updating the migration status record for cutover detected case: %v", err)
}
updateCallhomeImportPhase(event)

eventQueue.EndOfQueue = true
Expand Down
23 changes: 16 additions & 7 deletions yb-voyager/src/metadb/migrationStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,27 @@ type MigrationStatusRecord struct {
TableListExportedFromSource []string `json:"TableListExportedFromSource"`
SourceDBConf *srcdb.Source `json:"SourceDBConf"`

CutoverToTargetRequested bool `json:"CutoverToTargetRequested"`
//All the cutover requested flags by initiate cutover command
CutoverToTargetRequested bool `json:"CutoverToTargetRequested"`
CutoverToSourceRequested bool `json:"CutoverToSourceRequested"`
CutoverToSourceReplicaRequested bool `json:"CutoverToSourceReplicaRequested"`

//All the cutover detected by importer flags (marked when the cutover event is recieved by the importer)
CutoverDetectedByTargetImporter bool `json:"CutoverDetectedByTargetImporter"`
CutoverDetectedBySourceImporter bool `json:"CutoverDetectedBySourceImporter"`
CutoverDetectedBySourceReplicaImporter bool `json:"CutoverDetectedBySourceReplicaImporter"`

//All the cutover processed by importer/exporter flags - indicating that the cutover is completed by that command.
CutoverProcessedBySourceExporter bool `json:"CutoverProcessedBySourceExporter"`
CutoverProcessedByTargetImporter bool `json:"CutoverProcessedByTargetImporter"`
ExportFromTargetFallForwardStarted bool `json:"ExportFromTargetFallForwardStarted"`
CutoverToSourceReplicaRequested bool `json:"CutoverToSourceReplicaRequested"`
CutoverToSourceProcessedByTargetExporter bool `json:"CutoverToSourceProcessedByTargetExporter"`
CutoverToSourceReplicaProcessedByTargetExporter bool `json:"CutoverToSourceReplicaProcessedByTargetExporter"`
CutoverProcessedByTargetImporter bool `json:"CutoverProcessedByTargetImporter"`
CutoverToSourceReplicaProcessedBySRImporter bool `json:"CutoverToSourceReplicaProcessedBySRImporter"`
ExportFromTargetFallBackStarted bool `json:"ExportFromTargetFallBackStarted"`
CutoverToSourceRequested bool `json:"CutoverToSourceRequested"`
CutoverToSourceProcessedByTargetExporter bool `json:"CutoverToSourceProcessedByTargetExporter"`
CutoverToSourceProcessedBySourceImporter bool `json:"CutoverToSourceProcessedBySourceImporter"`

ExportFromTargetFallForwardStarted bool `json:"ExportFromTargetFallForwardStarted"`
ExportFromTargetFallBackStarted bool `json:"ExportFromTargetFallBackStarted"`

ExportSchemaDone bool `json:"ExportSchemaDone"`
ExportDataDone bool `json:"ExportDataDone"` // to be interpreted as export of snapshot data from source is complete
ExportDataSourceDebeziumStarted bool `json:"ExportDataSourceDebeziumStarted"`
Expand Down