Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Feb 7, 2025
1 parent f143c33 commit b0ffd26
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 53 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ error = '''
failed to convert a path to absolute path
'''

["PD:gc:ErrGCOnInvalidKeyspace"]
error = '''
trying to manage GC in keyspace %v where keyspace level GC is not enabled
'''

["PD:gin:ErrBindJSON"]
error = '''
bind JSON error
Expand Down
7 changes: 7 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,10 @@ var (
ErrNotFoundSchedulingPrimary = errors.Normalize("cannot find scheduling primary", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingPrimary"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
)

// GC errors
var (
ErrGCOnInvalidKeyspace = errors.Normalize("trying to manage GC in keyspace %v where keyspace level GC is not enabled", errors.RFCCodeText("PD:gc:ErrGCOnInvalidKeyspace"))
ErrDecreasingGCSafePoint = errors.Normalize("trying to update GC safe point to a smaller value, current value: %v, given: %v", errors.RFCCodeText("PD:gc:ErrDecreasingGCSafePoint"))
ErrGCSafePointExceedsTxnSafePoint = errors.Normalize("trying to update GC safe point to a too large value that exceeds the txn safe point, current value: %v, given: %v, current txn safe point: %v", errors.RFCCodeText("PD:gc:ErrGCSafePointExceedsTxnSafePoint"))
)
176 changes: 140 additions & 36 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/config"
Expand All @@ -31,56 +35,156 @@ var blockServiceSafepointErrmsg = "don't allow update service safe point v1."

// GCStateManager is the manager for safePoint of GC and services.
type GCStateManager struct {
lock *syncutil.RWLockGroup
store endpoint.GCStateStorage
cfg config.PDServerConfig
lock syncutil.RWMutex
gcMetaStorage endpoint.GCStateStorage
cfg config.PDServerConfig
keyspaceManager keyspace.Manager
}

// NewGCStateManager creates a GCStateManager of GC and services.
func NewGCStateManager(store endpoint.GCStateStorage, cfg config.PDServerConfig) *GCStateManager {
return &GCStateManager{store: store, cfg: cfg}
return &GCStateManager{gcMetaStorage: store, cfg: cfg}
}

// LoadGCSafePoint loads current GC safe point from storage.
func (manager *GCStateManager) LoadGCSafePoint(keyspaceID uint32) (uint64, error) {
// No need to acquire the lock as a single read operation is inherently atomic.
return manager.store.LoadGCSafePoint(keyspaceID)
func (m *GCStateManager) redirectKeyspace(keyspaceID uint32, isUserAPI bool) (uint32, error) {
// Regard it as NullKeyspaceID if the given one is invalid (exceeds the valid range of keyspace id), no matter
// whether it exactly matches the NullKeyspaceID.
if keyspaceID & ^constant.ValidKeyspaceIDMask != 0 {
return constant.NullKeyspaceID, nil
}

keyspaceMeta, err := m.keyspaceManager.LoadKeyspaceByID(keyspaceID)
if err != nil {
return 0, err
}
if keyspaceMeta.Config[keyspace.GCManagementType] != keyspace.KeyspaceLevelGC {
if isUserAPI {
// The user API is expected to always work. Operate on the state of global GC instead.
return constant.NullKeyspaceID, nil
}
// Internal API should never be called on keyspaces without keyspace level GC. They won't perform any active
// GC operation and will be managed by the global GC.
return 0, errs.ErrGCOnInvalidKeyspace.GenWithStackByArgs(keyspaceID)
}

return keyspaceID, nil
}

// UpdateGCSafePoint updates the safepoint if it is greater than the previous one
// it returns the old safepoint in the storage.
func (manager *GCStateManager) UpdateGCSafePoint(keyspaceID uint32, newSafePoint uint64) (oldSafePoint uint64, err error) {
manager.lock.Lock(keyspaceID)
defer manager.lock.Unlock(keyspaceID)
// TODO: cache the safepoint in the storage.
oldSafePoint, err = manager.store.LoadGCSafePoint(keyspaceID)
// CompatibleLoadGCSafePoint loads current GC safe point from storage.
func (m *GCStateManager) CompatibleLoadGCSafePoint(keyspaceID uint32) (uint64, error) {
keyspaceID, err := m.redirectKeyspace(keyspaceID, false)
if err != nil {
return
return 0, err
}
if manager.cfg.BlockSafePointV1 {
err = errors.New(blockGCSafePointErrmsg)

// No need to acquire the lock as a single-key read operation is atomic.
return m.gcMetaStorage.LoadGCSafePoint(keyspaceID)
}

// AdvanceGCSafePoint tries to advance the GC safe point to the given target. If the target is less than the current
// value or greater than the txn safe point, it returns an error.
func (m *GCStateManager) AdvanceGCSafePoint(keyspaceID uint32, target uint64) (oldGCSafePoint uint64, newGCSafePoint uint64, err error) {
return m.advanceGCSafePointImpl(keyspaceID, target, false)
}

// CompatibleUpdateGCSafePoint tries to advance the GC safe point to the given target. If the target is less than the
// current value, it returns the current value without updating it.
// This is provided for compatibility purpose, making the existing uses of the deprecated API `UpdateGCSafePoint`
// still work.
func (m *GCStateManager) CompatibleUpdateGCSafePoint(target uint64) (oldGCSafePoint uint64, newGCSafePoint uint64, err error) {
return m.advanceGCSafePointImpl(constant.NullKeyspaceID, target, true)
}

func (m *GCStateManager) advanceGCSafePointImpl(keyspaceID uint32, target uint64, compatible bool) (oldGCSafePoint uint64, newGCSafePoint uint64, err error) {
keyspaceID, err = m.redirectKeyspace(keyspaceID, false)
if err != nil {
return
}

if oldSafePoint >= newSafePoint {
return
m.lock.Lock()
defer m.lock.Unlock()

newGCSafePoint = target

err = m.gcMetaStorage.RunInGCMetaTransaction(func(wb *endpoint.GCStateWriteBatch) error {
var err1 error
oldGCSafePoint, err1 = m.gcMetaStorage.LoadGCSafePoint(keyspaceID)
if err1 != nil {
return err1
}
if target < oldGCSafePoint {
if compatible {
// When in compatible mode, trying to update the safe point to a smaller value fails silently, returning
// the actual value. There exist some use cases that fetches the current value by passing zero.
log.Warn("deprecated API `UpdateGCSafePoint` is called with invalid argument",
zap.Uint64("currentGCSafePoint", oldGCSafePoint), zap.Uint64("attemptedGCSafePoint", target))
newGCSafePoint = oldGCSafePoint
return nil
}
// Otherwise, return error to reject the operation explicitly.
return errs.ErrDecreasingGCSafePoint.GenWithStackByArgs(oldGCSafePoint, target)
}
txnSafePoint, err1 := m.gcMetaStorage.LoadTxnSafePoint(keyspaceID)
if err1 != nil {
return err1
}
if target > txnSafePoint {
return errs.ErrGCSafePointExceedsTxnSafePoint.GenWithStackByArgs(oldGCSafePoint, target, txnSafePoint)
}

err1 = wb.SetGCSafePoint(keyspaceID, target)
if err1 != nil {
return err1
}

return nil
})
if err != nil {
return 0, 0, err
}

if keyspaceID == constant.NullKeyspaceID {
gcSafePointGauge.WithLabelValues("gc_safepoint").Set(float64(target))
}

return
}

func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) (AdvanceTxnSafePointResult, error) {
keyspaceID, err := m.redirectKeyspace(keyspaceID, false)
if err != nil {
return AdvanceTxnSafePointResult{}, err
}
err = manager.store.SaveGCSafePoint(keyspaceID, newSafePoint)
if err == nil && keyspaceID == constant.NullKeyspaceID {
gcSafePointGauge.WithLabelValues("gc_safepoint").Set(float64(newSafePoint))

m.lock.Lock()
m.lock.Unlock()

newTxnSafePoint = target
err = m.gcMetaStorage.RunInGCMetaTransaction(func(wb *endpoint.GCStateWriteBatch) error {
var err1 error
oldTxnSafePoint, err1 = m.gcMetaStorage.LoadTxnSafePoint(keyspaceID)
if err1 != nil {
return err1
}

return nil
})
if err != nil {
return AdvanceTxnSafePointResult{}, err
}

return
}

// UpdateServiceGCSafePoint update the safepoint for a specific service.
func (manager *GCStateManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
if manager.cfg.BlockSafePointV1 {
func (m *GCStateManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
if m.cfg.BlockSafePointV1 {
return nil, false, errors.New(blockServiceSafepointErrmsg)
}
// This function won't support keyspace as it's being deprecated.
manager.lock.Lock(constant.NullKeyspaceID)
defer manager.lock.Unlock(constant.NullKeyspaceID)
minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now)
m.lock.Lock(constant.NullKeyspaceID)
defer m.lock.Unlock(constant.NullKeyspaceID)
minServiceSafePoint, err = m.gcMetaStorage.LoadMinServiceGCSafePoint(now)
if err != nil || ttl <= 0 || newSafePoint < minServiceSafePoint.SafePoint {
return minServiceSafePoint, false, err
}
Expand All @@ -93,20 +197,20 @@ func (manager *GCStateManager) UpdateServiceGCSafePoint(serviceID string, newSaf
if math.MaxInt64-now.Unix() <= ttl {
ssp.ExpiredAt = math.MaxInt64
}
if err := manager.store.SaveServiceGCSafePoint(ssp); err != nil {
if err := m.gcMetaStorage.SaveServiceGCSafePoint(ssp); err != nil {
return nil, false, err
}

// If the min safePoint is updated, load the next one.
if serviceID == minServiceSafePoint.ServiceID {
minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now)
minServiceSafePoint, err = m.gcMetaStorage.LoadMinServiceGCSafePoint(now)
}
return minServiceSafePoint, true, err
}

// UpdateTxnSafePoint updates the txn safe point.
func (manager *GCStateManager) UpdateTxnSafePoint(keyspaceID uint32, target uint64) (uint64, error) {
manager.lock.Lock(keyspaceID)
defer manager.lock.Unlock(keyspaceID)

type AdvanceTxnSafePointResult struct {
OldTxnSafePoint uint64
Target uint64
NewTxnSafePoint uint64
BlockerMessage bool
}
16 changes: 8 additions & 8 deletions pkg/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ func TestGCSafePointUpdateSequentially(t *testing.T) {
curSafePoint := uint64(0)
// update gc safePoint with asc value.
for id := 10; id < 20; id++ {
safePoint, err := gcSafePointManager.LoadGCSafePoint()
safePoint, err := gcSafePointManager.CompatibleLoadGCSafePoint()
re.NoError(err)
re.Equal(curSafePoint, safePoint)
previousSafePoint := curSafePoint
curSafePoint = uint64(id)
oldSafePoint, err := gcSafePointManager.UpdateGCSafePoint(curSafePoint)
oldSafePoint, err := gcSafePointManager.AdvanceGCSafePoint(curSafePoint)
re.NoError(err)
re.Equal(previousSafePoint, oldSafePoint)
}

safePoint, err := gcSafePointManager.LoadGCSafePoint()
safePoint, err := gcSafePointManager.CompatibleLoadGCSafePoint()
re.NoError(err)
re.Equal(curSafePoint, safePoint)
// update with smaller value should be failed.
oldSafePoint, err := gcSafePointManager.UpdateGCSafePoint(safePoint - 5)
oldSafePoint, err := gcSafePointManager.AdvanceGCSafePoint(safePoint - 5)
re.NoError(err)
re.Equal(safePoint, oldSafePoint)
curSafePoint, err = gcSafePointManager.LoadGCSafePoint()
curSafePoint, err = gcSafePointManager.CompatibleLoadGCSafePoint()
re.NoError(err)
// current safePoint should not change since the update value was smaller
re.Equal(safePoint, curSafePoint)
Expand All @@ -71,14 +71,14 @@ func TestGCSafePointUpdateCurrently(t *testing.T) {
wg.Add(1)
go func(step uint64) {
for safePoint := step; safePoint <= maxSafePoint; safePoint += step {
_, err := gcSafePointManager.UpdateGCSafePoint(safePoint)
_, err := gcSafePointManager.AdvanceGCSafePoint(safePoint)
re.NoError(err)
}
wg.Done()
}(uint64(id + 1))
}
wg.Wait()
safePoint, err := gcSafePointManager.LoadGCSafePoint()
safePoint, err := gcSafePointManager.CompatibleLoadGCSafePoint()
re.NoError(err)
re.Equal(maxSafePoint, safePoint)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestBlockUpdateSafePointV1(t *testing.T) {
re.False(updated)
re.Nil(min)

oldSafePoint, err := manager.UpdateGCSafePoint(gcWorkerSafePoint)
oldSafePoint, err := manager.AdvanceGCSafePoint(gcWorkerSafePoint)
re.Error(err)
re.Equal(err.Error(), blockGCSafePointErrmsg)

Expand Down
5 changes: 3 additions & 2 deletions pkg/mcs/utils/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ const (

// DefaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// 0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
// 0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
// and reserved for users who haven't been assigned keyspace.
DefaultKeyspaceID = uint32(0)
DefaultKeyspaceID = uint32(0)
ValidKeyspaceIDMask = uint32(0xFFFFFF)
// NullKeyspaceID is used for api v1 or legacy path where is keyspace agnostic.
NullKeyspaceID = uint32(0xFFFFFFFF)
// DefaultKeyspaceGroupID is the default key space group id.
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type Storage interface {
endpoint.MetaStorage
endpoint.RuleStorage
endpoint.ReplicationStatusStorage
endpoint.GCStateStorage
endpoint.MinResolvedTSStorage
endpoint.ExternalTSStorage
endpoint.SafePointV2Storage
Expand Down
6 changes: 4 additions & 2 deletions pkg/utils/keypath/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const (
keyspaceGcBarrierPrefix = "keyspaces/service_safe_point/%s"
// Compatible with `minstartts` written to etcd directly to etcd by TiDB.
tidbMinStartTSAbsolutePrefix = "/tidb/server/minstartts"
keyspaceTiDBMinStartTSAbsolutePrefix = "/keyspaces/tidb/%s/tidb/server/minstartts"
keyspaceTiDBMinStartTSAbsolutePrefix = "/keyspaces/tidb/%d/tidb/server/minstartts"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
Expand Down Expand Up @@ -260,7 +260,9 @@ func CompatibleTiDBMinStartTSAbsolutePath() string {
}

func CompatibleKeyspaceTiDBMinStartTSAbsolutePath(keyspaceID uint32) string {
return fmt.Sprintf(keyspaceTiDBMinStartTSAbsolutePrefix, EncodeKeyspaceID(keyspaceID))
// Note that when TiDB writes min start ts, it doesn't add leading zeroes to the keyspace ID.
// So use %d to format it, instead of EncodeKeyspaceID.
return fmt.Sprintf(keyspaceTiDBMinStartTSAbsolutePrefix, keyspaceID)
}

// MinResolvedTSPath returns the min resolved ts path.
Expand Down
5 changes: 3 additions & 2 deletions server/gc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"path"
"strings"

"github.com/tikv/pd/pkg/mcs/utils/constant"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/tikv/pd/pkg/mcs/utils/constant"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"

Expand Down Expand Up @@ -289,7 +290,7 @@ func (s *GrpcServer) UpdateGCSafePoint(ctx context.Context, request *pdpb.Update

newSafePoint := request.GetTarget()
keyspaceID := getKeyspaceIDFromReq(request)
oldSafePoint, err := s.gcStateManager.UpdateGCSafePoint(keyspaceID, newSafePoint)
oldSafePoint, err := s.gcStateManager.AdvanceGCSafePoint(keyspaceID, newSafePoint)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2107,7 +2107,7 @@ func (s *GrpcServer) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafe
return &pdpb.GetGCSafePointResponse{Header: notBootstrappedHeader()}, nil
}

safePoint, err := s.gcStateManager.LoadGCSafePoint(constant.NullKeyspaceID)
safePoint, err := s.gcStateManager.CompatibleLoadGCSafePoint(constant.NullKeyspaceID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/realcluster/etcd_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
"",
"/pd//alloc_id",
"/pd//config",
// If not call `UpdateGCSafePoint`, this key will not exist.
// If not call `AdvanceGCSafePoint`, this key will not exist.
// "/pd//gc/safe_point",
"/pd//gc/safe_point/service/gc_worker",
"/pd//keyspaces/id/DEFAULT",
Expand Down

0 comments on commit b0ffd26

Please sign in to comment.