Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Mar 4, 2025
1 parent ed5efbf commit 6b96cc7
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 19 deletions.
6 changes: 3 additions & 3 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ type GCStateManager struct {
lock syncutil.RWMutex
gcMetaStorage endpoint.GCStateProvider
cfg config.PDServerConfig
keyspaceManager keyspace.Manager
keyspaceManager *keyspace.Manager
}

// NewGCStateManager creates a GCStateManager of GC and services.
func NewGCStateManager(store endpoint.GCStateProvider, cfg config.PDServerConfig) *GCStateManager {
return &GCStateManager{gcMetaStorage: store, cfg: cfg}
func NewGCStateManager(store endpoint.GCStateProvider, cfg config.PDServerConfig, keyspaceManager *keyspace.Manager) *GCStateManager {
return &GCStateManager{gcMetaStorage: store, cfg: cfg, keyspaceManager: keyspaceManager}
}

func (m *GCStateManager) redirectKeyspace(keyspaceID uint32, isUserAPI bool) (uint32, error) {
Expand Down
140 changes: 127 additions & 13 deletions pkg/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,93 @@
package gc

import (
"context"
"math"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/server/config"
)

func newGCStateProvider(t *testing.T) (provider endpoint.GCStateProvider, clean func()) {
func newGCStateManager(t *testing.T) (provider endpoint.GCStateProvider, gccStateManager *GCStateManager, clean func()) {
cfg := config.NewConfig()
re := require.New(t)

_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
kvBase := kv.NewEtcdKVBase(client)

// Simulate a member which id.Allocator may need to check.
err := kvBase.Save(keypath.LeaderPath(nil), "member1")
re.NoError(err)

s := endpoint.NewStorageEndpoint(kvBase, nil)
allocator := id.NewAllocator(&id.AllocatorParams{
Client: client,
Label: id.KeyspaceLabel,
Member: "member1",
Step: keyspace.AllocStep,
})
kgm := keyspace.NewKeyspaceGroupManager(context.Background(), s, client)
keyspaceManager := keyspace.NewKeyspaceManager(context.Background(), s, mockcluster.NewCluster(context.Background(), config.NewPersistOptions(cfg)), allocator, &config.KeyspaceConfig{}, kgm)
gcStateManager := NewGCStateManager(s.GetGCStateProvider(), cfg.PDServerCfg, keyspaceManager)

err = kgm.Bootstrap(context.Background())
re.NoError(err)
err = keyspaceManager.Bootstrap()
re.NoError(err)

ks1, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
Name: "ks1",
Config: map[string]string{"gc_management_type": "global_gc"},
CreateTime: time.Now().Unix(),
IsPreAlloc: false,
})
re.NoError(err)
re.Equal(uint32(1), ks1.Id)

ks2, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
Name: "ks2",
Config: map[string]string{"gc_management_type": "keyspace_level_gc"},
CreateTime: time.Now().Unix(),
IsPreAlloc: false,
})
re.NoError(err)
re.Equal(uint32(2), ks2.Id)

ks3, err := keyspaceManager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{
Name: "ks3",
Config: map[string]string{},
CreateTime: time.Now().Unix(),
IsPreAlloc: false,
})
re.NoError(err)
re.Equal(uint32(3), ks3.Id)

return s.GetGCStateProvider(), gcStateManager, clean
}

func TestAdvanceGCSafePointBasic(t *testing.T) {

return s.GetGCStateProvider(), clean
}

func testGCSafePointUpdateSequentiallyImpl(t *testing.T,
loadFunc func(m *GCStateManager) (uint64, error),
advanceFunc func(m *GCStateManager, target uint64) (uint64, uint64, error)) {

storage, clean := newGCStateProvider(t)
_, gcStateManager, clean := newGCStateManager(t)
defer clean()
gcStateManager := NewGCStateManager(storage, config.PDServerConfig{})
re := require.New(t)
curGCSafePoint := uint64(0)
// Update GC safe point with asc value.
Expand Down Expand Up @@ -98,9 +155,8 @@ func TestGCSafePointUpdateSequentially(t *testing.T) {
}

func TestGCSafePointUpdateConcurrently(t *testing.T) {
storage, clean := newGCStateProvider(t)
_, manager, clean := newGCStateManager(t)
defer clean()
gcSafePointManager := NewGCStateManager(storage, config.PDServerConfig{})
maxSafePoint := uint64(1000)
wg := sync.WaitGroup{}
re := require.New(t)
Expand All @@ -113,25 +169,24 @@ func TestGCSafePointUpdateConcurrently(t *testing.T) {
// Mix using new and legacy API
var err error
if (gcSafePoint/step)%2 == 0 {
_, _, err = gcSafePointManager.AdvanceGCSafePoint(constant.NullKeyspaceID, gcSafePoint)
_, _, err = manager.AdvanceGCSafePoint(constant.NullKeyspaceID, gcSafePoint)
} else {
_, _, err = gcSafePointManager.CompatibleUpdateGCSafePoint(gcSafePoint)
_, _, err = manager.CompatibleUpdateGCSafePoint(gcSafePoint)
}
re.NoError(err)
}
wg.Done()
}(uint64(id + 1))
}
wg.Wait()
gcSafePoint, err := gcSafePointManager.CompatibleLoadGCSafePoint()
gcSafePoint, err := manager.CompatibleLoadGCSafePoint()
re.NoError(err)
re.Equal(maxSafePoint, gcSafePoint)
}

func TestLegacyServiceGCSafePointUpdate(t *testing.T) {
provider, clean := newGCStateProvider(t)
_, manager, clean := newGCStateManager(t)
defer clean()
manager := NewGCStateManager(provider, config.PDServerConfig{})

re := require.New(t)
gcWorkerServiceID := "gc_worker"
Expand Down Expand Up @@ -214,9 +269,8 @@ func TestLegacyServiceGCSafePointUpdate(t *testing.T) {
}

func TestLegacyServiceGCSafePointRoundingTTL(t *testing.T) {
provider, clean := newGCStateProvider(t)
_, manager, clean := newGCStateManager(t)
defer clean()
manager := NewGCStateManager(provider, config.PDServerConfig{})

re := require.New(t)

Expand Down Expand Up @@ -246,3 +300,63 @@ func TestLegacyServiceGCSafePointRoundingTTL(t *testing.T) {
// Nil in GCBarrier.ExpirationTime represents never expires.
re.False(state.GCBarriers[0].IsExpired(time.Now().Add(time.Hour*24*365*10)), state.GCBarriers[0])
}

func TestAdvanceTxnSafePoint(t *testing.T) {
//_, manager, clean := newGCStateManager(t)
//defer clean()
//
//re := require.New(t)
//
//checkTxnSafePoint := func(keyspaceID uint32, expectedTxnSafePoint uint64) {
// state, err := manager.GetGCState(keyspaceID)
// re.NoError(err)
// re.Equal(expectedTxnSafePoint, state.TxnSafePoint)
//}

}

func TestGCBarriers(t *testing.T) {

}

func TestGCStateConstraints(t *testing.T) {

}

func TestServiceGCSafePointCompatibility(t *testing.T) {
//_, manager, clean := newGCStateManager(t)
//defer clean()
//
//re := require.New(t)

}

func TestRedirectKeyspace(t *testing.T) {
_, manager, clean := newGCStateManager(t)
defer clean()

re := require.New(t)

keyspaces := []uint32{constant.NullKeyspaceID, 1, 2, 3, 0x1000000}
redirectTarget := []uint32{constant.NullKeyspaceID, constant.NullKeyspaceID, 2, constant.NullKeyspaceID, constant.NullKeyspaceID}
systemAPIAllowed := []bool{true, false, true, false, true}
for i, keyspaceID := range keyspaces {
target, err := manager.redirectKeyspace(keyspaceID, true)
re.NoError(err, "index: %d", i)
re.Equal(redirectTarget[i], target, "index: %d", i)

target, err = manager.redirectKeyspace(keyspaceID, false)
if systemAPIAllowed[i] {
re.NoError(err, "index: %d", i)
re.Equal(redirectTarget[i], target, "index: %d", i)
} else {
re.Error(err, "index: %d", i)
re.ErrorIs(err, errs.ErrGCOnInvalidKeyspace, "index: %d", i)
}
}

// Non-null but not existing keyspace id.
_, err := manager.redirectKeyspace(0xffffff, true)
re.Error(err)
re.ErrorIs(err, errs.ErrKeyspaceNotFound)
}
4 changes: 2 additions & 2 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,12 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
// Allocate new keyspaceID.
newID, err := manager.allocID()
if err != nil {
return nil, err
return nil, errors.AddStack(err)
}
userKind := endpoint.StringUserKind(request.Config[UserKindKey])
config, err := manager.kgm.GetKeyspaceConfigByKind(userKind)
if err != nil {
return nil, err
return nil, errors.AddStack(err)
}
if len(config) != 0 {
if request.Config == nil {
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.storage, s)
s.gcStateManager = gc.NewGCStateManager(s.storage.GetGCStateProvider(), s.cfg.PDServerCfg)
s.gcStateManager = gc.NewGCStateManager(s.storage.GetGCStateProvider(), s.cfg.PDServerCfg, s.keyspaceManager)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager)
keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{
Expand Down

0 comments on commit 6b96cc7

Please sign in to comment.