Skip to content

Commit

Permalink
resolve race
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Jul 8, 2024
1 parent aecb4ab commit ead70bf
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pd-analysis:
pd-heartbeat-bench:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-heartbeat-bench pd-heartbeat-bench/main.go
simulator:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-simulator pd-simulator/main.go
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_CGO_ENABLED) go build $(BUILD_FLAGS) -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-simulator pd-simulator/main.go
regions-dump:
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/regions-dump regions-dump/main.go
stores-dump:
Expand Down
8 changes: 0 additions & 8 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -2209,11 +2209,3 @@ func NewTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...Regi
}
return NewRegionInfo(metaRegion, leader, opts...)
}

// TraverseRegions executes a function on all regions.
// ONLY for simulator now and function need to be self-locked.
func (r *RegionsInfo) TraverseRegions(lockedFunc func(*RegionInfo)) {
for _, item := range r.regions {
lockedFunc(item.RegionInfo)
}
}
2 changes: 1 addition & 1 deletion pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (r *RegionStatistics) GetRegionStatsByType(typ RegionStatisticType) []*core
if region == nil {
continue
}
res = append(res, region.Clone())
res = append(res, region)
}
return res
}
Expand Down
17 changes: 9 additions & 8 deletions tools/pd-simulator/simulator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -50,7 +51,7 @@ type Client interface {
}

const (
pdTimeout = time.Second
pdTimeout = 3 * time.Second
maxInitClusterRetries = 100
// retry to get leader URL
leaderChangedWaitTime = 100 * time.Millisecond
Expand All @@ -62,13 +63,13 @@ var (
errFailInitClusterID = errors.New("[pd] failed to get cluster id")
PDHTTPClient pdHttp.Client
SD pd.ServiceDiscovery
ClusterID uint64
ClusterID atomic.Uint64
)

// requestHeader returns a header for fixed ClusterID.
func requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: ClusterID,
ClusterId: ClusterID.Load(),
}
}

Expand Down Expand Up @@ -382,8 +383,8 @@ func getLeaderURL(ctx context.Context, conn *grpc.ClientConn) (string, *grpc.Cli
if members.GetHeader().GetError() != nil {
return "", nil, errors.New(members.GetHeader().GetError().String())
}
ClusterID = members.GetHeader().GetClusterId()
if ClusterID == 0 {
ClusterID.Store(members.GetHeader().GetClusterId())
if ClusterID.Load() == 0 {
return "", nil, errors.New("cluster id is 0")
}
if members.GetLeader() == nil {
Expand Down Expand Up @@ -466,10 +467,10 @@ retry:
break retry
}
}
if ClusterID == 0 {
if ClusterID.Load() == 0 {
return "", nil, errors.WithStack(errFailInitClusterID)
}
simutil.Logger.Info("get cluster id successfully", zap.Uint64("cluster-id", ClusterID))
simutil.Logger.Info("get cluster id successfully", zap.Uint64("cluster-id", ClusterID.Load()))

// Check if the cluster is already bootstrapped.
ctx, cancel := context.WithTimeout(ctx, pdTimeout)
Expand Down Expand Up @@ -543,7 +544,7 @@ func PutPDConfig(config *sc.PDConfig) error {
}

func ChooseToHaltPDSchedule(halt bool) {
HaltSchedule = halt
HaltSchedule.Store(halt)
PDHTTPClient.SetConfig(context.Background(), map[string]any{
"schedule.halt-scheduling": strconv.FormatBool(halt),
})
Expand Down
23 changes: 11 additions & 12 deletions tools/pd-simulator/simulator/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -122,7 +123,7 @@ func (d *Driver) allocID() error {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
rootPath := path.Join("/pd", strconv.FormatUint(ClusterID, 10))
rootPath := path.Join("/pd", strconv.FormatUint(ClusterID.Load(), 10))
allocIDPath := path.Join(rootPath, "alloc_id")
_, err = etcdClient.Put(ctx, allocIDPath, string(typeutil.Uint64ToBytes(maxID+1000)))
if err != nil {
Expand Down Expand Up @@ -176,24 +177,20 @@ func (d *Driver) Tick() {
d.wg.Wait()
}

var HaltSchedule = false
var HaltSchedule atomic.Bool

// Check checks if the simulation is completed.
func (d *Driver) Check() bool {
if !HaltSchedule {
if !HaltSchedule.Load() {
return false
}
length := uint64(len(d.conn.Nodes) + 1)
var stats []info.StoreStats
var stores []*metapb.Store
for index, s := range d.conn.Nodes {
if index >= length {
length = index + 1
}
for _, s := range d.conn.Nodes {
s.statsMutex.RLock()
stores = append(stores, s.Store)
}
stats := make([]info.StoreStats, length)
for index, node := range d.conn.Nodes {
stats[index] = *node.stats
stats = append(stats, *s.stats)
s.statsMutex.RUnlock()
}
return d.simCase.Checker(stores, d.raftEngine.regionsInfo, stats)
}
Expand Down Expand Up @@ -252,11 +249,13 @@ func (d *Driver) GetBootstrapInfo(r *RaftEngine) (*metapb.Store, *metapb.Region,

func (d *Driver) updateNodeAvailable() {
for storeID, n := range d.conn.Nodes {
n.statsMutex.Lock()
if n.hasExtraUsedSpace {
n.stats.StoreStats.Available = n.stats.StoreStats.Capacity - uint64(d.raftEngine.regionsInfo.GetStoreRegionSize(storeID)) - uint64(d.simConfig.RaftStore.ExtraUsedSpace)
} else {
n.stats.StoreStats.Available = n.stats.StoreStats.Capacity - uint64(d.raftEngine.regionsInfo.GetStoreRegionSize(storeID))
}
n.statsMutex.Unlock()
}
}

Expand Down
5 changes: 3 additions & 2 deletions tools/pd-simulator/simulator/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ func (e *DownNode) Run(raft *RaftEngine, _ int64) bool {
}
node.Stop()

raft.TraverseRegions(func(region *core.RegionInfo) {
regions := raft.GetRegions()
for _, region := range regions {
storeIDs := region.GetStoreIDs()
if _, ok := storeIDs[node.Id]; ok {
downPeer := &pdpb.PeerStats{
Expand All @@ -250,6 +251,6 @@ func (e *DownNode) Run(raft *RaftEngine, _ int64) bool {
region = region.Clone(core.WithDownPeers(append(region.GetDownPeers(), downPeer)))
raft.SetRegion(region)
}
})
}
return true
}
24 changes: 14 additions & 10 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/tools/pd-simulator/simulator/cases"
Expand All @@ -51,7 +50,7 @@ type Node struct {
cancel context.CancelFunc
raftEngine *RaftEngine
limiter *ratelimit.RateLimiter
sizeMutex syncutil.Mutex
statsMutex syncutil.RWMutex
hasExtraUsedSpace bool
snapStats []*pdpb.SnapshotStat
// PD client
Expand Down Expand Up @@ -180,6 +179,7 @@ func (n *Node) storeHeartBeat() {
return
}
ctx, cancel := context.WithTimeout(n.ctx, pdTimeout)
n.statsMutex.Lock()
stats := make([]*pdpb.SnapshotStat, len(n.snapStats))
copy(stats, n.snapStats)
n.snapStats = n.snapStats[:0]
Expand All @@ -190,12 +190,13 @@ func (n *Node) storeHeartBeat() {
zap.Uint64("node-id", n.GetId()),
zap.Error(err))
}
n.statsMutex.Unlock()
cancel()
}

func (n *Node) compaction() {
n.sizeMutex.Lock()
defer n.sizeMutex.Unlock()
n.statsMutex.Lock()
defer n.statsMutex.Unlock()
n.stats.Available += n.stats.ToCompactionSize
n.stats.UsedSize -= n.stats.ToCompactionSize
n.stats.ToCompactionSize = 0
Expand All @@ -205,7 +206,8 @@ func (n *Node) regionHeartBeat() {
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
return
}
n.raftEngine.TraverseRegions(func(region *core.RegionInfo) {
regions := n.raftEngine.GetRegions()
for _, region := range regions {
if region.GetLeader() != nil && region.GetLeader().GetStoreId() == n.Id {
ctx, cancel := context.WithTimeout(n.ctx, pdTimeout)
if region == nil {
Expand All @@ -220,7 +222,7 @@ func (n *Node) regionHeartBeat() {
}
cancel()
}
})
}
}

func (n *Node) reportRegionChange() {
Expand Down Expand Up @@ -267,19 +269,21 @@ func (n *Node) Stop() {
}

func (n *Node) incUsedSize(size uint64) {
n.sizeMutex.Lock()
defer n.sizeMutex.Unlock()
n.statsMutex.Lock()
defer n.statsMutex.Unlock()
n.stats.Available -= size
n.stats.UsedSize += size
}

func (n *Node) decUsedSize(size uint64) {
n.sizeMutex.Lock()
defer n.sizeMutex.Unlock()
n.statsMutex.Lock()
defer n.statsMutex.Unlock()
n.stats.ToCompactionSize += size
}

func (n *Node) registerSnapStats(generate, send, total uint64) {
n.statsMutex.Lock()
defer n.statsMutex.Unlock()
stat := pdpb.SnapshotStat{
GenerateDurationSec: generate,
SendDurationSec: send,
Expand Down
18 changes: 12 additions & 6 deletions tools/pd-simulator/simulator/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ func NewRaftEngine(conf *cases.Case, conn *Connection, storeConfig *config.SimCo
}

func (r *RaftEngine) stepRegions() {
r.TraverseRegions(func(region *core.RegionInfo) {
regions := r.GetRegions()
for _, region := range regions {
r.stepLeader(region)
r.stepSplit(region)
})
}
}

func (r *RaftEngine) stepLeader(region *core.RegionInfo) {
Expand Down Expand Up @@ -228,7 +229,10 @@ func (r *RaftEngine) electNewLeader(region *core.RegionInfo) *metapb.Peer {
func (r *RaftEngine) GetRegion(regionID uint64) *core.RegionInfo {
r.RLock()
defer r.RUnlock()
return r.regionsInfo.GetRegion(regionID)
if region := r.regionsInfo.GetRegion(regionID); region != nil {
return region.Clone()
}
return nil
}

// GetRegionChange returns a list of RegionID for a given store.
Expand All @@ -251,9 +255,11 @@ func (r *RaftEngine) ResetRegionChange(storeID uint64, regionID uint64) {
}
}

// TraverseRegions executes a function on all regions, and function need to be self-locked.
func (r *RaftEngine) TraverseRegions(lockedFunc func(*core.RegionInfo)) {
r.regionsInfo.TraverseRegions(lockedFunc)
// GetRegions gets all RegionInfo from regionMap
func (r *RaftEngine) GetRegions() []*core.RegionInfo {
r.RLock()
defer r.RUnlock()
return r.regionsInfo.GetRegions()
}

// SetRegion sets the RegionInfo with regionID
Expand Down
11 changes: 9 additions & 2 deletions tools/pd-simulator/simulator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,20 +517,25 @@ func processSnapshot(n *Node, stat *snapshotStat, speed uint64) bool {
return true
}
if stat.status == pending {
if stat.action == generate && n.stats.SendingSnapCount > maxSnapGeneratorPoolSize {
n.statsMutex.RLock()
sendSnapshot, receiveSnapshot := n.stats.SendingSnapCount, n.stats.ReceivingSnapCount
n.statsMutex.RUnlock()
if stat.action == generate && sendSnapshot > maxSnapGeneratorPoolSize {
return false
}
if stat.action == receive && n.stats.ReceivingSnapCount > maxSnapReceivePoolSize {
if stat.action == receive && receiveSnapshot > maxSnapReceivePoolSize {
return false
}
stat.status = running
stat.generateStart = time.Now()
n.statsMutex.Lock()
// If the statement is true, it will start to send or Receive the snapshot.
if stat.action == generate {
n.stats.SendingSnapCount++
} else {
n.stats.ReceivingSnapCount++
}
n.statsMutex.Unlock()
}

// store should Generate/Receive snapshot by chunk size.
Expand All @@ -548,11 +553,13 @@ func processSnapshot(n *Node, stat *snapshotStat, speed uint64) bool {
totalSec := uint64(time.Since(stat.start).Seconds()) * speed
generateSec := uint64(time.Since(stat.generateStart).Seconds()) * speed
n.registerSnapStats(generateSec, 0, totalSec)
n.statsMutex.Lock()
if stat.action == generate {
n.stats.SendingSnapCount--
} else {
n.stats.ReceivingSnapCount--
}
n.statsMutex.Unlock()
}
return true
}

0 comments on commit ead70bf

Please sign in to comment.