diff --git a/pkg/gc/safepoint.go b/pkg/gc/safepoint.go index 720c18a90ea..43915bdd01a 100644 --- a/pkg/gc/safepoint.go +++ b/pkg/gc/safepoint.go @@ -17,9 +17,9 @@ package gc import ( "fmt" "math" + "math/bits" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "go.uber.org/zap" @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/server/config" ) @@ -152,13 +153,31 @@ func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) ( return AdvanceTxnSafePointResult{}, err } + isCompatibleMode := false + + // A helper function for handling the compatibility of the service safe point of "gc_worker", which is needed + // for making it able to downgrade to previous versions where service safe points are still in use. + keepGCWorkerServiceSafePointCompatible := func(wb *endpoint.GCStateWriteBatch, sspAsGCBarrier *endpoint.GCBarrier) error { + // In old versions, every time TiDB performs GC, it updates the service safe point of "gc_worker" to the GC + // safe point it attempts to advance to, and is allowed to be greater than the minimum service safe point (in + // which case the minimum one will be the actual GC safe point to use). + // Note that in old versions, there wasn't the concept of txn safe point. The step to update the service safe + // point of "gc_worker" is somewhat just like the current procedure of advancing the txn safe point, the most + // important purpose of which is to find the actual GC safe point that's safe to use. + isCompatibleMode = true + sspAsGCBarrier.BarrierTS = target + // Ensure service safe point of "gc_worker" should never expire. + sspAsGCBarrier.ExpirationTime = nil + return wb.SetGCBarrier(keyspaceID, *sspAsGCBarrier) + } + m.lock.Lock() m.lock.Unlock() var oldTxnSafePoint uint64 newTxnSafePoint := target var blockingBarrier *endpoint.GCBarrier - var blockingMinStartTSOwner string + var blockingMinStartTSOwner *string err = m.gcMetaStorage.RunInGCMetaTransaction(func(wb *endpoint.GCStateWriteBatch) error { var err1 error @@ -178,6 +197,14 @@ func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) ( now := time.Now() for _, barrier := range barriers { + if barrier.BarrierID == keypath.GCWorkerServiceSafePointID { + err1 = keepGCWorkerServiceSafePointCompatible(wb, barrier) + if err1 != nil { + return err1 + } + continue + } + if barrier.IsExpired(now) { // Perform lazy delete to the expired GC barriers. // WARNING: It might look like a reasonable optimization idea to perform the lazy-deletion in a lower @@ -211,11 +238,9 @@ func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) ( // considered valid. newTxnSafePoint = minStartTS blockingBarrier = nil - blockingMinStartTSOwner = ownerKey + blockingMinStartTSOwner = &ownerKey } - // TODO: Consider compatibility of the special service safe point "gc_worker". - return wb.SetTxnSafePoint(keyspaceID, newTxnSafePoint) }) if err != nil { @@ -225,14 +250,22 @@ func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) ( blockerDesc := "" if blockingBarrier != nil { blockerDesc = blockingBarrier.String() - } else if len(blockingMinStartTSOwner) > 0 { - blockerDesc = fmt.Sprintf("TiDBMinStartTS { Key: %s, MinStartTS: %d }", blockingMinStartTSOwner, newTxnSafePoint) + } else if blockingMinStartTSOwner != nil { + blockerDesc = fmt.Sprintf("TiDBMinStartTS { Key: %s, MinStartTS: %d }", *blockingMinStartTSOwner, newTxnSafePoint) } - if len(blockerDesc) > 0 { - log.Info("GC advancing txn safe point is blocked", + if newTxnSafePoint != target { + if blockingBarrier == nil && blockingMinStartTSOwner == nil { + panic("unreachable") + } + log.Info("txn safe point advancement is being blocked", zap.Uint64("oldTxnSafePoint", oldTxnSafePoint), zap.Uint64("target", target), - zap.Uint64("newTxnSafePoint", newTxnSafePoint), zap.String("blocker", blockerDesc)) + zap.Uint64("newTxnSafePoint", newTxnSafePoint), zap.String("blocker", blockerDesc), + zap.Bool("isCompatibleMode", isCompatibleMode)) + } else { + log.Info("txn safe point advanced", + zap.Uint64("oldTxnSafePoint", oldTxnSafePoint), zap.Uint64("newTxnSafePoint", newTxnSafePoint), + zap.Bool("isCompatibleMode", isCompatibleMode)) } return AdvanceTxnSafePointResult{ @@ -398,49 +431,61 @@ func (m *GCStateManager) GetGlobalGCState() (map[uint32]GCState, error) { return results, nil } +func saturatingDuration(ratio int64, base time.Duration) time.Duration { + if ratio < 0 && base < 0 { + ratio, base = -ratio, -base + } + if ratio < 0 || base < 0 { + return 0 + } + h, l := bits.Mul64(uint64(ratio), uint64(base)) + if h != 0 || l > uint64(math.MaxInt64) { + return time.Duration(math.MaxInt64) + } + return time.Duration(l) +} + func (m *GCStateManager) CompatibleUpdateServiceGCSafePoint(serviceID string, newServiceSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) { keyspaceID := constant.NullKeyspaceID m.lock.Lock() defer m.lock.Unlock() - err := m.gcMetaStorage.RunInGCMetaTransaction(func(wb *endpoint.GCStateWriteBatch) error { - - }) - if err != nil { - return nil, false, err - } -} - -// _UpdateServiceGCSafePoint update the safepoint for a specific service. -func (m *GCStateManager) _UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) { - if m.cfg.BlockSafePointV1 { - return nil, false, errors.New(blockServiceSafepointErrmsg) - } - // This function won't support keyspace as it's being deprecated. - m.lock.Lock(constant.NullKeyspaceID) - defer m.lock.Unlock(constant.NullKeyspaceID) - minServiceSafePoint, err = m.gcMetaStorage.LoadMinServiceGCSafePoint(now) - if err != nil || ttl <= 0 || newSafePoint < minServiceSafePoint.SafePoint { - return minServiceSafePoint, false, err - } - - ssp := &endpoint.ServiceSafePoint{ - ServiceID: serviceID, - ExpiredAt: now.Unix() + ttl, - SafePoint: newSafePoint, - } - if math.MaxInt64-now.Unix() <= ttl { - ssp.ExpiredAt = math.MaxInt64 - } - if err := m.gcMetaStorage.SaveServiceGCSafePoint(ssp); err != nil { - return nil, false, err + // TODO: After implementing the global GC barrier, redirect the invocation on "native_br" to `SetGlobalGCBarrier`. + if serviceID == keypath.GCWorkerServiceSafePointID { + res, err := m.AdvanceTxnSafePoint(keyspaceID, newServiceSafePoint) + if err != nil { + return nil, false, err + } + if res.NewTxnSafePoint != newServiceSafePoint { + minServiceSafePoint = &endpoint.ServiceSafePoint{ + ServiceID: "__pseudo_service:" + res.BlockerDescription, + ExpiredAt: math.MaxInt64, + SafePoint: res.NewTxnSafePoint, + } + } else { + minServiceSafePoint = &endpoint.ServiceSafePoint{ + ServiceID: keypath.GCWorkerServiceSafePointID, + ExpiredAt: math.MaxInt64, + SafePoint: newServiceSafePoint, + } + } + } else { + _, err := m.SetGCBarrier(keyspaceID, serviceID, newServiceSafePoint, saturatingDuration(ttl, time.Second), now) + if err != nil { + return nil, false, err + } + minTxnSafePoint, err := m.gcMetaStorage.LoadTxnSafePoint(keyspaceID) + if err != nil { + return nil, false, err + } + minServiceSafePoint = &endpoint.ServiceSafePoint{ + ServiceID: "", + ExpiredAt: math.MaxInt64, + SafePoint: minTxnSafePoint, + } } - // If the min safePoint is updated, load the next one. - if serviceID == minServiceSafePoint.ServiceID { - minServiceSafePoint, err = m.gcMetaStorage.LoadMinServiceGCSafePoint(now) - } - return minServiceSafePoint, true, err + return } type AdvanceTxnSafePointResult struct {