diff --git a/pkg/gc/safepoint.go b/pkg/gc/safepoint.go index e7de32abb17..dee7ebede84 100644 --- a/pkg/gc/safepoint.go +++ b/pkg/gc/safepoint.go @@ -151,7 +151,7 @@ func (m *GCStateManager) advanceGCSafePointImpl(keyspaceID uint32, target uint64 return } -func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) (AdvanceTxnSafePointResult, error) { +func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64, now time.Time) (AdvanceTxnSafePointResult, error) { keyspaceID, err := m.redirectKeyspace(keyspaceID, false) if err != nil { return AdvanceTxnSafePointResult{}, err @@ -159,12 +159,12 @@ func (m *GCStateManager) AdvanceTxnSafePoint(keyspaceID uint32, target uint64) ( m.lock.Lock() m.lock.Unlock() - return m.advanceTxnSafePointImpl(keyspaceID, target) + return m.advanceTxnSafePointImpl(keyspaceID, target, now) } // advanceTxnSafePointImpl is the internal implementation of AdvanceTxnSafePoint, assuming keyspaceID has been checked // and the mutex has been acquired. -func (m *GCStateManager) advanceTxnSafePointImpl(keyspaceID uint32, target uint64) (AdvanceTxnSafePointResult, error) { +func (m *GCStateManager) advanceTxnSafePointImpl(keyspaceID uint32, target uint64, now time.Time) (AdvanceTxnSafePointResult, error) { isCompatibleMode := false // A helper function for handling the compatibility of the service safe point of "gc_worker", which is needed @@ -204,7 +204,6 @@ func (m *GCStateManager) advanceTxnSafePointImpl(keyspaceID uint32, target uint6 return err1 } - now := time.Now() for _, barrier := range barriers { if barrier.BarrierID == keypath.GCWorkerServiceSafePointID { err1 = keepGCWorkerServiceSafePointCompatible(wb, barrier) @@ -503,7 +502,7 @@ func (m *GCStateManager) CompatibleUpdateServiceGCSafePoint(serviceID string, ne return nil, false, errors.New("TTL of gc_worker's service safe point must be infinity") } - res, err := m.advanceTxnSafePointImpl(keyspaceID, newServiceSafePoint) + res, err := m.advanceTxnSafePointImpl(keyspaceID, newServiceSafePoint, now) if err != nil { return nil, false, err } diff --git a/pkg/gc/safepoint_test.go b/pkg/gc/safepoint_test.go index 1946b44af14..609d6eab8cf 100644 --- a/pkg/gc/safepoint_test.go +++ b/pkg/gc/safepoint_test.go @@ -145,6 +145,7 @@ func (s *gcManagerTestSuite) TearDownTest() { func (s *gcManagerTestSuite) TestAdvanceTxnSafePointBasic() { re := s.Require() + now := time.Now() checkTxnSafePoint := func(keyspaceID uint32, expectedTxnSafePoint uint64) { state, err := s.manager.GetGCState(keyspaceID) @@ -157,7 +158,7 @@ func (s *gcManagerTestSuite) TestAdvanceTxnSafePointBasic() { } for _, keyspaceID := range s.keyspacePresets.manageable { - res, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 10) + res, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 10, now) re.NoError(err) re.Equal(uint64(0), res.OldTxnSafePoint) re.Equal(uint64(10), res.NewTxnSafePoint) @@ -167,7 +168,7 @@ func (s *gcManagerTestSuite) TestAdvanceTxnSafePointBasic() { checkTxnSafePoint(keyspaceID, 10) // Allows updating with the same value (no effect). - res, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 10) + res, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 10, now) re.NoError(err) re.Equal(uint64(10), res.OldTxnSafePoint) re.Equal(uint64(10), res.NewTxnSafePoint) @@ -175,7 +176,7 @@ func (s *gcManagerTestSuite) TestAdvanceTxnSafePointBasic() { re.Empty(res.BlockerDescription) // Does not allow decreasing. - _, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 9) + _, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 9, now) re.Error(err) re.ErrorIs(err, errs.ErrDecreasingTxnSafePoint) @@ -183,7 +184,7 @@ func (s *gcManagerTestSuite) TestAdvanceTxnSafePointBasic() { } for _, keyspaceID := range s.keyspacePresets.unmanageable { - _, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 20) + _, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 20, now) re.Error(err) re.ErrorIs(err, errs.ErrGCOnInvalidKeyspace) // Updated in previous loop when updating NullKeyspaceID. @@ -191,14 +192,14 @@ func (s *gcManagerTestSuite) TestAdvanceTxnSafePointBasic() { } for _, keyspaceID := range s.keyspacePresets.notExisting { - _, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 30) + _, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 30, now) re.Error(err) re.ErrorIs(err, errs.ErrKeyspaceNotFound) } for i, keyspaceID := range s.keyspacePresets.nullSynonyms { // Previously updated to 10. Update to 10+i+1 in i-th loop. - res, err := s.manager.AdvanceTxnSafePoint(keyspaceID, uint64(10+i+1)) + res, err := s.manager.AdvanceTxnSafePoint(keyspaceID, uint64(10+i+1), now) re.NoError(err) re.Equal(uint64(10+i), res.OldTxnSafePoint) re.Equal(uint64(10+i+1), res.NewTxnSafePoint) @@ -249,7 +250,7 @@ func (s *gcManagerTestSuite) TestAdvanceGCSafePointBasic() { } for _, keyspaceID := range s.keyspacePresets.manageable { - _, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 10) + _, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 10, time.Now()) re.NoError(err) oldValue, newValue, err := s.manager.AdvanceGCSafePoint(keyspaceID, 5) @@ -280,7 +281,7 @@ func (s *gcManagerTestSuite) TestAdvanceGCSafePointBasic() { re.ErrorIs(err, errs.ErrDecreasingGCSafePoint) } - _, err := s.manager.AdvanceTxnSafePoint(constant.NullKeyspaceID, 30) + _, err := s.manager.AdvanceTxnSafePoint(constant.NullKeyspaceID, 30, time.Now()) re.NoError(err) for i, keyspaceID := range s.keyspacePresets.nullSynonyms { // The GC safe point in Already updated to 10 in previous check. So in i-th loop here, we update from 10+i to @@ -501,35 +502,110 @@ func TestLegacyServiceGCSafePointRoundingTTL(t *testing.T) { } func (s *gcManagerTestSuite) TestGCBarriers() { - //re := s.Require() - // - //getGCBarrier := func(keyspaceID uint32, barrierID string) *endpoint.GCBarrier { - // state, err := s.manager.GetGCState(keyspaceID) - // re.NoError(err) - // idx := slices.IndexFunc(state.GCBarriers, func(b *endpoint.GCBarrier) bool { - // return b.BarrierID == barrierID - // }) - // if idx == -1 { - // return nil - // } - // return state.GCBarriers[idx] - //} - // - //getAllGCBarriers := func(keyspaceID uint32) []*endpoint.GCBarrier { - // state, err := s.manager.GetGCState(keyspaceID) - // re.NoError(err) - // return state.GCBarriers - //} - // - //for _, keyspaceID := range s.keyspacePresets.all { - // re.Len(getAllGCBarriers(keyspaceID), 0) - //} - // - //// Test basic functionality within a single keyspace. - //for _, keyspaceID := range s.keyspacePresets.manageable { - // b, err := s.manager.SetGCBarrier(keyspaceID, "b1", 10, time.Hour, time.Now()) - // re.NoError(err) - //} + re := s.Require() + + getGCBarrier := func(keyspaceID uint32, barrierID string) *endpoint.GCBarrier { + state, err := s.manager.GetGCState(keyspaceID) + re.NoError(err) + idx := slices.IndexFunc(state.GCBarriers, func(b *endpoint.GCBarrier) bool { + return b.BarrierID == barrierID + }) + if idx == -1 { + return nil + } + return state.GCBarriers[idx] + } + + getAllGCBarriers := func(keyspaceID uint32) []*endpoint.GCBarrier { + state, err := s.manager.GetGCState(keyspaceID) + re.NoError(err) + return state.GCBarriers + } + + checkTxnSafePoint := func(keyspaceID uint32, expectedTxnSafePoint uint64) { + state, err := s.manager.GetGCState(keyspaceID) + re.NoError(err) + re.Equal(expectedTxnSafePoint, state.TxnSafePoint) + } + + // Helper for getting pointer of time. + ptime := func(t time.Time) *time.Time { return &t } + + now := time.Date(2025, 03, 06, 11, 50, 30, 0, time.Local) + + for _, keyspaceID := range s.keyspacePresets.all { + re.Empty(getAllGCBarriers(keyspaceID)) + } + + // Test basic functionality within a single keyspace. + for _, keyspaceID := range s.keyspacePresets.manageable { + b, err := s.manager.SetGCBarrier(keyspaceID, "b1", 10, time.Hour, now) + re.NoError(err) + expected := endpoint.NewGCBarrier("b1", 10, ptime(now.Add(time.Hour))) + re.Equal(expected, b) + re.Len(getAllGCBarriers(keyspaceID), 1) + re.Equal(expected, getGCBarrier(keyspaceID, "b1")) + + // Updating the value of the existing GC barrier + b, err = s.manager.SetGCBarrier(keyspaceID, "b1", 15, time.Hour, now) + re.NoError(err) + expected = endpoint.NewGCBarrier("b1", 15, ptime(now.Add(time.Hour))) + re.Equal(expected, b) + re.Len(getAllGCBarriers(keyspaceID), 1) + re.Equal(expected, getGCBarrier(keyspaceID, "b1")) + + b, err = s.manager.SetGCBarrier(keyspaceID, "b1", 15, time.Hour*2, now) + re.NoError(err) + expected = endpoint.NewGCBarrier("b1", 15, ptime(now.Add(time.Hour*2))) + re.Equal(expected, b) + re.Len(getAllGCBarriers(keyspaceID), 1) + re.Equal(expected, getGCBarrier(keyspaceID, "b1")) + + // Allows shrinking the barrier ts. + b, err = s.manager.SetGCBarrier(keyspaceID, "b1", 10, time.Hour, now) + re.NoError(err) + expected = endpoint.NewGCBarrier("b1", 10, ptime(now.Add(time.Hour))) + re.Equal(expected, b) + re.Len(getAllGCBarriers(keyspaceID), 1) + re.Equal(expected, getGCBarrier(keyspaceID, "b1")) + + // GC barriers blocks the txn safe point. + res, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 5, now) + re.NoError(err) + re.Equal(uint64(0), res.OldTxnSafePoint) + re.Equal(uint64(5), res.NewTxnSafePoint) + re.Empty(res.BlockerDescription) + res, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 10, now) + re.NoError(err) + re.Equal(uint64(5), res.OldTxnSafePoint) + re.Equal(uint64(10), res.NewTxnSafePoint) + re.Empty(res.BlockerDescription) + res, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 15, now) + re.NoError(err) + re.Equal(uint64(10), res.OldTxnSafePoint) + re.Equal(uint64(10), res.NewTxnSafePoint) + re.Equal(uint64(15), res.Target) + re.Contains("BarrierID: \"b1\"", res.BlockerDescription) + checkTxnSafePoint(keyspaceID, 10) + + _, err = s.manager.SetGCBarrier(keyspaceID, "b1", 15, time.Hour, now) + re.NoError(err) + // AdvanceTxnSafePoint advances the txn safe point as much as possible. + res, err = s.manager.AdvanceTxnSafePoint(keyspaceID, 20, now) + re.NoError(err) + re.Equal(uint64(10), res.OldTxnSafePoint) + re.Equal(uint64(15), res.NewTxnSafePoint) + re.Equal(uint64(20), res.Target) + re.Contains("BarrierID: \"b1\"", res.BlockerDescription) + + // Multiple GC barriers + _, err = s.manager.SetGCBarrier(keyspaceID, "b1", 20, time.Hour, now) + re.NoError(err) + _, err = s.manager.SetGCBarrier(keyspaceID, "b2", 20, time.Hour, now) + re.NoError(err) + re.Len(getAllGCBarriers(keyspaceID), 2) + //re + } } func TestGCStateConstraints(t *testing.T) { @@ -588,7 +664,7 @@ func (s *gcManagerTestSuite) TestRedirectKeyspace() { return errors.AddStack(err1) }, func(keyspaceID uint32) error { - _, err1 := s.manager.AdvanceTxnSafePoint(keyspaceID, 10) + _, err1 := s.manager.AdvanceTxnSafePoint(keyspaceID, 10, time.Now()) return errors.AddStack(err1) }, func(keyspaceID uint32) error { diff --git a/server/gc_service.go b/server/gc_service.go index 45191a4c817..5e0758c7eee 100644 --- a/server/gc_service.go +++ b/server/gc_service.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "math" + "time" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -286,7 +287,7 @@ func (s *GrpcServer) AdvanceTxnSafePoint(ctx context.Context, request *pdpb.Adva target := request.GetTarget() keyspaceID := getKeyspaceIDFromReq(request) - res, err := s.gcStateManager.AdvanceTxnSafePoint(keyspaceID, target) + res, err := s.gcStateManager.AdvanceTxnSafePoint(keyspaceID, target, time.Now()) if err != nil { return nil, err }