Skip to content

Commit

Permalink
Fix and add mroe tests to CompatibleUpdateServiceGCSafePoint
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Mar 3, 2025
1 parent 58d49a2 commit ed5efbf
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,6 @@ var (
ErrDecreasingGCSafePoint = errors.Normalize("trying to update GC safe point to a smaller value, current value: %v, given: %v", errors.RFCCodeText("PD:gc:ErrDecreasingGCSafePoint"))
ErrGCSafePointExceedsTxnSafePoint = errors.Normalize("trying to update GC safe point to a too large value that exceeds the txn safe point, current value: %v, given: %v, current txn safe point: %v", errors.RFCCodeText("PD:gc:ErrGCSafePointExceedsTxnSafePoint"))
ErrDecreasingTxnSafePoint = errors.Normalize("trying to update txn safe point to a smaller value, current value: %v, given: %v", errors.RFCCodeText("PD:gc:ErrDecreasingTxnSafePoint"))
ErrInvalidGCBarrier = errors.Normalize("trying to set a GC barrier on ts %d which is already behind the txn safe point %d", errors.RFCCodeText("PD:gc:ErrInvalidGCBarrier"))
ErrGCBarrierTSBehindTxnSafePoint = errors.Normalize("trying to set a GC barrier on ts %d which is already behind the txn safe point %d", errors.RFCCodeText("PD:gc:ErrGCBarrierTSBehindTxnSafePoint"))
ErrReservedGCBarrierID = errors.Normalize("trying to set a GC barrier with a barrier ID that is reserved: %v", errors.RFCCodeText("PD:gc:ErrReservedGCBarrierID"))
)
50 changes: 43 additions & 7 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package gc

import (
"errors"
"fmt"
"math"
"math/bits"
Expand Down Expand Up @@ -259,7 +260,7 @@ func (m *GCStateManager) advanceTxnSafePointImpl(keyspaceID uint32, target uint6
if blockingBarrier != nil {
blockerDesc = blockingBarrier.String()
} else if blockingMinStartTSOwner != nil {
blockerDesc = fmt.Sprintf("TiDBMinStartTS { Key: %s, MinStartTS: %d }", *blockingMinStartTSOwner, newTxnSafePoint)
blockerDesc = fmt.Sprintf("TiDBMinStartTS { Key: %+q, MinStartTS: %d }", *blockingMinStartTSOwner, newTxnSafePoint)
}

if newTxnSafePoint != target {
Expand Down Expand Up @@ -319,7 +320,7 @@ func (m *GCStateManager) setGCBarrierImpl(keyspaceID uint32, barrierID string, b
return err1
}
if barrierTS < txnSafePoint {
return errs.ErrInvalidGCBarrier.GenWithStackByArgs(barrierTS, txnSafePoint)
return errs.ErrGCBarrierTSBehindTxnSafePoint.GenWithStackByArgs(barrierTS, txnSafePoint)
}
err1 = wb.SetGCBarrier(keyspaceID, newBarrier)
return err1
Expand Down Expand Up @@ -462,13 +463,46 @@ func saturatingDuration(ratio int64, base time.Duration) time.Duration {
return time.Duration(l)
}

// CompatibleUpdateServiceGCSafePoint updates the service safe point of the given serviceID. Service safe points are
// being deprecated, and this method provides compatibility for components that are still using service safe point API.
// This method simulates the behavior of the service safe points in old versions, by internally using txn safe points
// and GC barriers. The behaviors are mapped as follows:
//
// - The service safe point with service ID "gc_worker" is mapped to the txn safe point.
// - The service safe point with other service IDs are mapped to GC barriers with barrier IDs equal to the given
// service IDs.
//
// Note that the behavior of the service safe point of "gc_worker" is not perfectly the same as before: it can no longer
// be advanced over other service safe points, but will be blocked by the minimal one; and if the cluster was running
// with TiDB node that haven't migrated to the new GC APIs, it can also be blocked by the *TiDB min start ts* written by
// those TiDB nodes.
//
// Therefore, the method's behavior is as follows:
// - If the given serviceID is "gc_worker", it internally calls AdvanceTxnSafePoint.
// - If the advancing result is the same as newServiceSafePoint, it's the case that the updated service safe
// point of "gc_worker" is exactly the minimal one. Returns a simulated service safe point with the service ID
// equals to "gc_worker".
// - Otherwise, it's the case that the service safe point of "gc_worker" is successfully updated, but it's not
// the minimal service safe point. Returns a simulated service safe point whose serviceID starts with
// "__pseudo_service:" to simulate the minimal service safe point. It may actually be either a GC barrier or
// a *TiDB min start ts*.
// - If the given serviceID is anything else, it internally calls SetGCBarrier or DeleteGCBarrier, depending on
// whether the `ttl` is positive or not. As the txn safe point is always less or equal to any GC barriers, we
// simulate the case that the service safe point of "gc_worker" is the minimal one, and return a service safe point
// with the service ID equals to "gc_worker".
//
// This function only works on the NullKeyspace.
func (m *GCStateManager) CompatibleUpdateServiceGCSafePoint(serviceID string, newServiceSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
keyspaceID := constant.NullKeyspaceID
m.lock.Lock()
defer m.lock.Unlock()

// TODO: After implementing the global GC barrier, redirect the invocation on "native_br" to `SetGlobalGCBarrier`.
if serviceID == keypath.GCWorkerServiceSafePointID {
if ttl != math.MaxInt64 {
return nil, false, errors.New("TTL of gc_worker's service safe point must be infinity")
}

res, err := m.advanceTxnSafePointImpl(keyspaceID, newServiceSafePoint)
if err != nil {
return nil, false, err
Expand All @@ -486,30 +520,32 @@ func (m *GCStateManager) CompatibleUpdateServiceGCSafePoint(serviceID string, ne
SafePoint: newServiceSafePoint,
}
}
updated = res.OldTxnSafePoint != res.NewTxnSafePoint
} else {
if ttl > 0 {
_, err = m.setGCBarrierImpl(keyspaceID, serviceID, newServiceSafePoint, saturatingDuration(ttl, time.Second), now)
} else {
_, err = m.deleteGCBarrierImpl(keyspaceID, serviceID)
}

if err != nil {
if err != nil && !errors.Is(err, errs.ErrGCBarrierTSBehindTxnSafePoint) {
return nil, false, err
}
// The atomicity between setting/deleting GC barrier and loading the txn safe point is not guaranteed here.
// It doesn't matter much whether it's atomic, but it's important to ensure LoadTxnSafePoint happens *AFTER*
// setting/deleting GC barrier.
minTxnSafePoint, err := m.gcMetaStorage.LoadTxnSafePoint(keyspaceID)
txnSafePoint, err := m.gcMetaStorage.LoadTxnSafePoint(keyspaceID)
if err != nil {
return nil, false, err
}
minServiceSafePoint = &endpoint.ServiceSafePoint{
ServiceID: "<unknown>",
ServiceID: keypath.GCWorkerServiceSafePointID,
ExpiredAt: math.MaxInt64,
SafePoint: minTxnSafePoint,
SafePoint: txnSafePoint,
}
updated = ttl > 0 && txnSafePoint <= newServiceSafePoint
}
return minServiceSafePoint, true, nil
return minServiceSafePoint, updated, nil
}

type AdvanceTxnSafePointResult struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestLegacyServiceGCSafePointUpdate(t *testing.T) {
min, updated, err := manager.CompatibleUpdateServiceGCSafePoint(gcWorkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now())
re.NoError(err)
re.True(updated)
re.Equal(cdcServiceID, min.ServiceID)
re.Contains(min.ServiceID, "BarrierID: \""+cdcServiceID+"\"")
re.Equal(cdcServiceSafePoint, min.SafePoint)

// The value shouldn't be updated with current service safe point smaller than the min safe point.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/endpoint/gc_safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (b *GCBarrier) String() string {
if b.ExpirationTime != nil {
expirationTime = b.ExpirationTime.String()
}
return fmt.Sprintf("GCBarrier { BarrierID: %s, BarrierTS: %d, ExpirationTime: %v }",
return fmt.Sprintf("GCBarrier { BarrierID: %+q, BarrierTS: %d, ExpirationTime: %+q }",
b.BarrierID, b.BarrierTS, expirationTime)
}

Expand Down

0 comments on commit ed5efbf

Please sign in to comment.