Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Mar 6, 2025
1 parent 09f4388 commit 53a3936
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 44 deletions.
9 changes: 4 additions & 5 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,20 @@ func (m *GCStateManager) advanceGCSafePointImpl(keyspaceID uint32, target uint64
return
}

func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) (AdvanceTxnSafePointResult, error) {
func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64, now time.Time) (AdvanceTxnSafePointResult, error) {
keyspaceID, err := m.redirectKeyspace(keyspaceID, false)
if err != nil {
return AdvanceTxnSafePointResult{}, err
}
m.lock.Lock()
m.lock.Unlock()

return m.advanceTxnSafePointImpl(keyspaceID, target)
return m.advanceTxnSafePointImpl(keyspaceID, target, now)
}

// advanceTxnSafePointImpl is the internal implementation of AdvanceTxnSafePoint, assuming keyspaceID has been checked
// and the mutex has been acquired.
func (m *GCStateManager) advanceTxnSafePointImpl(keyspaceID uint32, target uint64) (AdvanceTxnSafePointResult, error) {
func (m *GCStateManager) advanceTxnSafePointImpl(keyspaceID uint32, target uint64, now time.Time) (AdvanceTxnSafePointResult, error) {
isCompatibleMode := false

// A helper function for handling the compatibility of the service safe point of "gc_worker", which is needed
Expand Down Expand Up @@ -204,7 +204,6 @@ func (m *GCStateManager) advanceTxnSafePointImpl(keyspaceID uint32, target uint6
return err1
}

now := time.Now()
for _, barrier := range barriers {
if barrier.BarrierID == keypath.GCWorkerServiceSafePointID {
err1 = keepGCWorkerServiceSafePointCompatible(wb, barrier)
Expand Down Expand Up @@ -503,7 +502,7 @@ func (m *GCStateManager) CompatibleUpdateServiceGCSafePoint(serviceID string, ne
return nil, false, errors.New("TTL of gc_worker's service safe point must be infinity")
}

res, err := m.advanceTxnSafePointImpl(keyspaceID, newServiceSafePoint)
res, err := m.advanceTxnSafePointImpl(keyspaceID, newServiceSafePoint, now)
if err != nil {
return nil, false, err
}
Expand Down
152 changes: 114 additions & 38 deletions pkg/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (s *gcManagerTestSuite) TearDownTest() {

func (s *gcManagerTestSuite) TestAdvanceTxnSafePointBasic() {
re := s.Require()
now := time.Now()

checkTxnSafePoint := func(keyspaceID uint32, expectedTxnSafePoint uint64) {
state, err := s.manager.GetGCState(keyspaceID)
Expand All @@ -157,7 +158,7 @@ func (s *gcManagerTestSuite) TestAdvanceTxnSafePointBasic() {
}

for _, keyspaceID := range s.keyspacePresets.manageable {
res, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 10)
res, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 10, now)
re.NoError(err)
re.Equal(uint64(0), res.OldTxnSafePoint)
re.Equal(uint64(10), res.NewTxnSafePoint)
Expand All @@ -167,38 +168,38 @@ func (s *gcManagerTestSuite) TestAdvanceTxnSafePointBasic() {
checkTxnSafePoint(keyspaceID, 10)

// Allows updating with the same value (no effect).
res, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 10)
res, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 10, now)
re.NoError(err)
re.Equal(uint64(10), res.OldTxnSafePoint)
re.Equal(uint64(10), res.NewTxnSafePoint)
re.Equal(uint64(10), res.Target)
re.Empty(res.BlockerDescription)

// Does not allow decreasing.
_, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 9)
_, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 9, now)
re.Error(err)
re.ErrorIs(err, errs.ErrDecreasingTxnSafePoint)

// Does not test blocking by GC barriers here. It will be separated in another test case.
}

for _, keyspaceID := range s.keyspacePresets.unmanageable {
_, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 20)
_, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 20, now)
re.Error(err)
re.ErrorIs(err, errs.ErrGCOnInvalidKeyspace)
// Updated in previous loop when updating NullKeyspaceID.
checkTxnSafePoint(keyspaceID, 10)
}

for _, keyspaceID := range s.keyspacePresets.notExisting {
_, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 30)
_, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 30, now)
re.Error(err)
re.ErrorIs(err, errs.ErrKeyspaceNotFound)
}

for i, keyspaceID := range s.keyspacePresets.nullSynonyms {
// Previously updated to 10. Update to 10+i+1 in i-th loop.
res, err := s.manager.AdvanceTxnSafePoint(keyspaceID, uint64(10+i+1))
res, err := s.manager.AdvanceTxnSafePoint(keyspaceID, uint64(10+i+1), now)
re.NoError(err)
re.Equal(uint64(10+i), res.OldTxnSafePoint)
re.Equal(uint64(10+i+1), res.NewTxnSafePoint)
Expand Down Expand Up @@ -249,7 +250,7 @@ func (s *gcManagerTestSuite) TestAdvanceGCSafePointBasic() {
}

for _, keyspaceID := range s.keyspacePresets.manageable {
_, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 10)
_, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 10, time.Now())
re.NoError(err)

oldValue, newValue, err := s.manager.AdvanceGCSafePoint(keyspaceID, 5)
Expand Down Expand Up @@ -280,7 +281,7 @@ func (s *gcManagerTestSuite) TestAdvanceGCSafePointBasic() {
re.ErrorIs(err, errs.ErrDecreasingGCSafePoint)
}

_, err := s.manager.AdvanceTxnSafePoint(constant.NullKeyspaceID, 30)
_, err := s.manager.AdvanceTxnSafePoint(constant.NullKeyspaceID, 30, time.Now())
re.NoError(err)
for i, keyspaceID := range s.keyspacePresets.nullSynonyms {
// The GC safe point in Already updated to 10 in previous check. So in i-th loop here, we update from 10+i to
Expand Down Expand Up @@ -501,35 +502,110 @@ func TestLegacyServiceGCSafePointRoundingTTL(t *testing.T) {
}

func (s *gcManagerTestSuite) TestGCBarriers() {
//re := s.Require()
//
//getGCBarrier := func(keyspaceID uint32, barrierID string) *endpoint.GCBarrier {
// state, err := s.manager.GetGCState(keyspaceID)
// re.NoError(err)
// idx := slices.IndexFunc(state.GCBarriers, func(b *endpoint.GCBarrier) bool {
// return b.BarrierID == barrierID
// })
// if idx == -1 {
// return nil
// }
// return state.GCBarriers[idx]
//}
//
//getAllGCBarriers := func(keyspaceID uint32) []*endpoint.GCBarrier {
// state, err := s.manager.GetGCState(keyspaceID)
// re.NoError(err)
// return state.GCBarriers
//}
//
//for _, keyspaceID := range s.keyspacePresets.all {
// re.Len(getAllGCBarriers(keyspaceID), 0)
//}
//
//// Test basic functionality within a single keyspace.
//for _, keyspaceID := range s.keyspacePresets.manageable {
// b, err := s.manager.SetGCBarrier(keyspaceID, "b1", 10, time.Hour, time.Now())
// re.NoError(err)
//}
re := s.Require()

getGCBarrier := func(keyspaceID uint32, barrierID string) *endpoint.GCBarrier {
state, err := s.manager.GetGCState(keyspaceID)
re.NoError(err)
idx := slices.IndexFunc(state.GCBarriers, func(b *endpoint.GCBarrier) bool {
return b.BarrierID == barrierID
})
if idx == -1 {
return nil
}
return state.GCBarriers[idx]
}

getAllGCBarriers := func(keyspaceID uint32) []*endpoint.GCBarrier {
state, err := s.manager.GetGCState(keyspaceID)
re.NoError(err)
return state.GCBarriers
}

checkTxnSafePoint := func(keyspaceID uint32, expectedTxnSafePoint uint64) {
state, err := s.manager.GetGCState(keyspaceID)
re.NoError(err)
re.Equal(expectedTxnSafePoint, state.TxnSafePoint)
}

// Helper for getting pointer of time.
ptime := func(t time.Time) *time.Time { return &t }

now := time.Date(2025, 03, 06, 11, 50, 30, 0, time.Local)

for _, keyspaceID := range s.keyspacePresets.all {
re.Empty(getAllGCBarriers(keyspaceID))
}

// Test basic functionality within a single keyspace.
for _, keyspaceID := range s.keyspacePresets.manageable {
b, err := s.manager.SetGCBarrier(keyspaceID, "b1", 10, time.Hour, now)
re.NoError(err)
expected := endpoint.NewGCBarrier("b1", 10, ptime(now.Add(time.Hour)))
re.Equal(expected, b)
re.Len(getAllGCBarriers(keyspaceID), 1)
re.Equal(expected, getGCBarrier(keyspaceID, "b1"))

// Updating the value of the existing GC barrier
b, err = s.manager.SetGCBarrier(keyspaceID, "b1", 15, time.Hour, now)
re.NoError(err)
expected = endpoint.NewGCBarrier("b1", 15, ptime(now.Add(time.Hour)))
re.Equal(expected, b)
re.Len(getAllGCBarriers(keyspaceID), 1)
re.Equal(expected, getGCBarrier(keyspaceID, "b1"))

b, err = s.manager.SetGCBarrier(keyspaceID, "b1", 15, time.Hour*2, now)
re.NoError(err)
expected = endpoint.NewGCBarrier("b1", 15, ptime(now.Add(time.Hour*2)))
re.Equal(expected, b)
re.Len(getAllGCBarriers(keyspaceID), 1)
re.Equal(expected, getGCBarrier(keyspaceID, "b1"))

// Allows shrinking the barrier ts.
b, err = s.manager.SetGCBarrier(keyspaceID, "b1", 10, time.Hour, now)
re.NoError(err)
expected = endpoint.NewGCBarrier("b1", 10, ptime(now.Add(time.Hour)))
re.Equal(expected, b)
re.Len(getAllGCBarriers(keyspaceID), 1)
re.Equal(expected, getGCBarrier(keyspaceID, "b1"))

// GC barriers blocks the txn safe point.
res, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 5, now)
re.NoError(err)
re.Equal(uint64(0), res.OldTxnSafePoint)
re.Equal(uint64(5), res.NewTxnSafePoint)
re.Empty(res.BlockerDescription)
res, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 10, now)
re.NoError(err)
re.Equal(uint64(5), res.OldTxnSafePoint)
re.Equal(uint64(10), res.NewTxnSafePoint)
re.Empty(res.BlockerDescription)
res, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 15, now)
re.NoError(err)
re.Equal(uint64(10), res.OldTxnSafePoint)
re.Equal(uint64(10), res.NewTxnSafePoint)
re.Equal(uint64(15), res.Target)
re.Contains("BarrierID: \"b1\"", res.BlockerDescription)
checkTxnSafePoint(keyspaceID, 10)

_, err = s.manager.SetGCBarrier(keyspaceID, "b1", 15, time.Hour, now)
re.NoError(err)
// AdvanceTxnSafePoint advances the txn safe point as much as possible.
res, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 20, now)
re.NoError(err)
re.Equal(uint64(10), res.OldTxnSafePoint)
re.Equal(uint64(15), res.NewTxnSafePoint)
re.Equal(uint64(20), res.Target)
re.Contains("BarrierID: \"b1\"", res.BlockerDescription)

// Multiple GC barriers
_, err = s.manager.SetGCBarrier(keyspaceID, "b1", 20, time.Hour, now)
re.NoError(err)
_, err = s.manager.SetGCBarrier(keyspaceID, "b2", 20, time.Hour, now)
re.NoError(err)
re.Len(getAllGCBarriers(keyspaceID), 2)
//re
}
}

func TestGCStateConstraints(t *testing.T) {
Expand Down Expand Up @@ -588,7 +664,7 @@ func (s *gcManagerTestSuite) TestRedirectKeyspace() {
return errors.AddStack(err1)
},
func(keyspaceID uint32) error {
_, err1 := s.manager.AdvanceTxnSafePoint(keyspaceID, 10)
_, err1 := s.manager.AdvanceTxnSafePoint(keyspaceID, 10, time.Now())
return errors.AddStack(err1)
},
func(keyspaceID uint32) error {
Expand Down
3 changes: 2 additions & 1 deletion server/gc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"math"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -286,7 +287,7 @@ func (s *GrpcServer) AdvanceTxnSafePoint(ctx context.Context, request *pdpb.Adva

target := request.GetTarget()
keyspaceID := getKeyspaceIDFromReq(request)
res, err := s.gcStateManager.AdvanceTxnSafePoint(keyspaceID, target)
res, err := s.gcStateManager.AdvanceTxnSafePoint(keyspaceID, target, time.Now())
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 53a3936

Please sign in to comment.