Skip to content

Commit

Permalink
add compatibility interface to GCStateManager
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Feb 13, 2025
1 parent a76ce69 commit 3cf8b04
Showing 1 changed file with 91 additions and 46 deletions.
137 changes: 91 additions & 46 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ package gc
import (
"fmt"
"math"
"math/bits"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/config"
)
Expand Down Expand Up @@ -152,13 +153,31 @@ func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) (
return AdvanceTxnSafePointResult{}, err
}

isCompatibleMode := false

// A helper function for handling the compatibility of the service safe point of "gc_worker", which is needed
// for making it able to downgrade to previous versions where service safe points are still in use.
keepGCWorkerServiceSafePointCompatible := func(wb *endpoint.GCStateWriteBatch, sspAsGCBarrier *endpoint.GCBarrier) error {
// In old versions, every time TiDB performs GC, it updates the service safe point of "gc_worker" to the GC
// safe point it attempts to advance to, and is allowed to be greater than the minimum service safe point (in
// which case the minimum one will be the actual GC safe point to use).
// Note that in old versions, there wasn't the concept of txn safe point. The step to update the service safe
// point of "gc_worker" is somewhat just like the current procedure of advancing the txn safe point, the most
// important purpose of which is to find the actual GC safe point that's safe to use.
isCompatibleMode = true
sspAsGCBarrier.BarrierTS = target
// Ensure service safe point of "gc_worker" should never expire.
sspAsGCBarrier.ExpirationTime = nil
return wb.SetGCBarrier(keyspaceID, *sspAsGCBarrier)
}

m.lock.Lock()
m.lock.Unlock()

var oldTxnSafePoint uint64
newTxnSafePoint := target
var blockingBarrier *endpoint.GCBarrier
var blockingMinStartTSOwner string
var blockingMinStartTSOwner *string

err = m.gcMetaStorage.RunInGCMetaTransaction(func(wb *endpoint.GCStateWriteBatch) error {
var err1 error
Expand All @@ -178,6 +197,14 @@ func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) (

now := time.Now()
for _, barrier := range barriers {
if barrier.BarrierID == keypath.GCWorkerServiceSafePointID {
err1 = keepGCWorkerServiceSafePointCompatible(wb, barrier)
if err1 != nil {
return err1
}
continue
}

if barrier.IsExpired(now) {
// Perform lazy delete to the expired GC barriers.
// WARNING: It might look like a reasonable optimization idea to perform the lazy-deletion in a lower
Expand Down Expand Up @@ -211,11 +238,9 @@ func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) (
// considered valid.
newTxnSafePoint = minStartTS
blockingBarrier = nil
blockingMinStartTSOwner = ownerKey
blockingMinStartTSOwner = &ownerKey
}

// TODO: Consider compatibility of the special service safe point "gc_worker".

return wb.SetTxnSafePoint(keyspaceID, newTxnSafePoint)
})
if err != nil {
Expand All @@ -225,14 +250,22 @@ func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) (
blockerDesc := ""
if blockingBarrier != nil {
blockerDesc = blockingBarrier.String()
} else if len(blockingMinStartTSOwner) > 0 {
blockerDesc = fmt.Sprintf("TiDBMinStartTS { Key: %s, MinStartTS: %d }", blockingMinStartTSOwner, newTxnSafePoint)
} else if blockingMinStartTSOwner != nil {
blockerDesc = fmt.Sprintf("TiDBMinStartTS { Key: %s, MinStartTS: %d }", *blockingMinStartTSOwner, newTxnSafePoint)
}

if len(blockerDesc) > 0 {
log.Info("GC advancing txn safe point is blocked",
if newTxnSafePoint != target {
if blockingBarrier == nil && blockingMinStartTSOwner == nil {
panic("unreachable")
}
log.Info("txn safe point advancement is being blocked",
zap.Uint64("oldTxnSafePoint", oldTxnSafePoint), zap.Uint64("target", target),
zap.Uint64("newTxnSafePoint", newTxnSafePoint), zap.String("blocker", blockerDesc))
zap.Uint64("newTxnSafePoint", newTxnSafePoint), zap.String("blocker", blockerDesc),
zap.Bool("isCompatibleMode", isCompatibleMode))
} else {
log.Info("txn safe point advanced",
zap.Uint64("oldTxnSafePoint", oldTxnSafePoint), zap.Uint64("newTxnSafePoint", newTxnSafePoint),
zap.Bool("isCompatibleMode", isCompatibleMode))
}

return AdvanceTxnSafePointResult{
Expand Down Expand Up @@ -398,49 +431,61 @@ func (m *GCStateManager) GetGlobalGCState() (map[uint32]GCState, error) {
return results, nil
}

func saturatingDuration(ratio int64, base time.Duration) time.Duration {
if ratio < 0 && base < 0 {
ratio, base = -ratio, -base
}
if ratio < 0 || base < 0 {
return 0
}
h, l := bits.Mul64(uint64(ratio), uint64(base))
if h != 0 || l > uint64(math.MaxInt64) {
return time.Duration(math.MaxInt64)
}
return time.Duration(l)
}

func (m *GCStateManager) CompatibleUpdateServiceGCSafePoint(serviceID string, newServiceSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
keyspaceID := constant.NullKeyspaceID
m.lock.Lock()
defer m.lock.Unlock()

err := m.gcMetaStorage.RunInGCMetaTransaction(func(wb *endpoint.GCStateWriteBatch) error {

})
if err != nil {
return nil, false, err
}
}

// _UpdateServiceGCSafePoint update the safepoint for a specific service.
func (m *GCStateManager) _UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
if m.cfg.BlockSafePointV1 {
return nil, false, errors.New(blockServiceSafepointErrmsg)
}
// This function won't support keyspace as it's being deprecated.
m.lock.Lock(constant.NullKeyspaceID)
defer m.lock.Unlock(constant.NullKeyspaceID)
minServiceSafePoint, err = m.gcMetaStorage.LoadMinServiceGCSafePoint(now)
if err != nil || ttl <= 0 || newSafePoint < minServiceSafePoint.SafePoint {
return minServiceSafePoint, false, err
}

ssp := &endpoint.ServiceSafePoint{
ServiceID: serviceID,
ExpiredAt: now.Unix() + ttl,
SafePoint: newSafePoint,
}
if math.MaxInt64-now.Unix() <= ttl {
ssp.ExpiredAt = math.MaxInt64
}
if err := m.gcMetaStorage.SaveServiceGCSafePoint(ssp); err != nil {
return nil, false, err
// TODO: After implementing the global GC barrier, redirect the invocation on "native_br" to `SetGlobalGCBarrier`.
if serviceID == keypath.GCWorkerServiceSafePointID {
res, err := m.AdvanceTxnSafePoint(keyspaceID, newServiceSafePoint)
if err != nil {
return nil, false, err
}
if res.NewTxnSafePoint != newServiceSafePoint {
minServiceSafePoint = &endpoint.ServiceSafePoint{
ServiceID: "__pseudo_service:" + res.BlockerDescription,
ExpiredAt: math.MaxInt64,
SafePoint: res.NewTxnSafePoint,
}
} else {
minServiceSafePoint = &endpoint.ServiceSafePoint{
ServiceID: keypath.GCWorkerServiceSafePointID,
ExpiredAt: math.MaxInt64,
SafePoint: newServiceSafePoint,
}
}
} else {
_, err := m.SetGCBarrier(keyspaceID, serviceID, newServiceSafePoint, saturatingDuration(ttl, time.Second), now)
if err != nil {
return nil, false, err
}
minTxnSafePoint, err := m.gcMetaStorage.LoadTxnSafePoint(keyspaceID)
if err != nil {
return nil, false, err
}
minServiceSafePoint = &endpoint.ServiceSafePoint{
ServiceID: "<unknown>",
ExpiredAt: math.MaxInt64,
SafePoint: minTxnSafePoint,
}
}

// If the min safePoint is updated, load the next one.
if serviceID == minServiceSafePoint.ServiceID {
minServiceSafePoint, err = m.gcMetaStorage.LoadMinServiceGCSafePoint(now)
}
return minServiceSafePoint, true, err
return
}

type AdvanceTxnSafePointResult struct {
Expand Down

0 comments on commit 3cf8b04

Please sign in to comment.