Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: add a new scheduler to balance the regions of the given key range #8988

Merged
merged 14 commits into from
Feb 10, 2025
10 changes: 10 additions & 0 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core

import (
"bytes"
"encoding/json"

"github.com/tikv/pd/pkg/core/constant"
)
Expand Down Expand Up @@ -156,6 +157,15 @@ type KeyRange struct {
EndKey []byte `json:"end-key"`
}

// MarshalJSON marshals to json.
func (kr KeyRange) MarshalJSON() ([]byte, error) {
m := map[string]string{
"start-key": HexRegionKeyStr(kr.StartKey),
"end-key": HexRegionKeyStr(kr.EndKey),
}
return json.Marshal(m)
}

// NewKeyRange create a KeyRange with the given start key and end key.
func NewKeyRange(startKey, endKey string) KeyRange {
return KeyRange{
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,11 @@
)
// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
schedulerType := types.ConvertOldStrToType[scheduler.Type]
schedulerType, ok := types.ConvertOldStrToType[scheduler.Type]
if !ok {
log.Error("scheduler not found", zap.String("type", scheduler.Type))
continue

Check warning on line 334 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L333-L334

Added lines #L333 - L334 were not covered by tests
}
s, err := schedulers.CreateScheduler(
schedulerType,
c.coordinator.GetOperatorController(),
Expand Down
6 changes: 0 additions & 6 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ func RegisterScheduler(typ types.CheckerSchedulerType) {
schedulerMap.Store(typ, struct{}{})
}

// IsSchedulerRegistered checks if the named scheduler type is registered.
func IsSchedulerRegistered(typ types.CheckerSchedulerType) bool {
_, ok := schedulerMap.Load(typ)
return ok
}

// SchedulerConfigProvider is the interface for scheduler configurations.
type SchedulerConfigProvider interface {
SharedConfigProvider
Expand Down
237 changes: 237 additions & 0 deletions pkg/schedule/schedulers/balance_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/unrolled/render"

"github.com/pingcap/log"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/syncutil"
)

type balanceRangeSchedulerHandler struct {
rd *render.Render
config *balanceRangeSchedulerConfig
}

func newBalanceRangeHandler(conf *balanceRangeSchedulerConfig) http.Handler {
handler := &balanceRangeSchedulerHandler{
config: conf,
rd: render.New(render.Options{IndentJSON: true}),
}
router := mux.NewRouter()
router.HandleFunc("/config", handler.updateConfig).Methods(http.MethodPost)
router.HandleFunc("/list", handler.listConfig).Methods(http.MethodGet)
return router
}

func (handler *balanceRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) {
handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported")

Check warning on line 54 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}

func (handler *balanceRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.clone()
if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil {
log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err))
}

Check warning on line 61 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L60-L61

Added lines #L60 - L61 were not covered by tests
}

type balanceRangeSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
jobs []*balanceRangeSchedulerJob
}

type balanceRangeSchedulerJob struct {
JobID uint64 `json:"job-id"`
Role Role `json:"role"`
Engine string `json:"engine"`
Timeout time.Duration `json:"timeout"`
Ranges []core.KeyRange `json:"ranges"`
Alias string `json:"alias"`
Status JobStatus `json:"status"`
}

func (conf *balanceRangeSchedulerConfig) clone() []*balanceRangeSchedulerJob {
conf.RLock()
defer conf.RUnlock()
jobs := make([]*balanceRangeSchedulerJob, 0, len(conf.jobs))
for _, job := range conf.jobs {
ranges := make([]core.KeyRange, len(job.Ranges))
copy(ranges, job.Ranges)
jobs = append(jobs, &balanceRangeSchedulerJob{
Ranges: ranges,
Role: job.Role,
Engine: job.Engine,
Timeout: job.Timeout,
Alias: job.Alias,
JobID: job.JobID,
})
}

return jobs
}

// EncodeConfig serializes the config.
func (s *balanceRangeScheduler) EncodeConfig() ([]byte, error) {
s.conf.RLock()
defer s.conf.RUnlock()
return EncodeConfig(s.conf.jobs)
}

// ReloadConfig reloads the config.
func (s *balanceRangeScheduler) ReloadConfig() error {
s.conf.Lock()
defer s.conf.Unlock()

jobs := make([]*balanceRangeSchedulerJob, 0, len(s.conf.jobs))
if err := s.conf.load(jobs); err != nil {
return err
}
s.conf.jobs = jobs
return nil

Check warning on line 117 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L108-L117

Added lines #L108 - L117 were not covered by tests
}

type balanceRangeScheduler struct {
*BaseScheduler
conf *balanceRangeSchedulerConfig
handler http.Handler
filters []filter.Filter
filterCounter *filter.Counter
}

// ServeHTTP implements the http.Handler interface.
func (s *balanceRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}

// Schedule schedules the balance key range operator.
func (*balanceRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) {
log.Debug("balance key range scheduler is scheduling, need to implement")
return nil, nil
}

// IsScheduleAllowed checks if the scheduler is allowed to schedule new operators.
func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit()
if !allowed {
operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange)
}

Check warning on line 144 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L143-L144

Added lines #L143 - L144 were not covered by tests
return allowed
}

// BalanceRangeCreateOption is used to create a scheduler with an option.
type BalanceRangeCreateOption func(s *balanceRangeScheduler)

// newBalanceRangeScheduler creates a scheduler that tends to keep given peer Role on
// special store balanced.
func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRangeSchedulerConfig, options ...BalanceRangeCreateOption) Scheduler {
s := &balanceRangeScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf),
conf: conf,
handler: newBalanceRangeHandler(conf),
}
for _, option := range options {
option(s)
}

Check warning on line 161 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L160-L161

Added lines #L160 - L161 were not covered by tests
s.filters = []filter.Filter{
&filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium},
filter.NewSpecialUseFilter(s.GetName()),
}
s.filterCounter = filter.NewCounter(s.GetName())
return s
}

// JobStatus is the status of the job.
type JobStatus int

const (
pending JobStatus = iota
running
finished
)

func (s JobStatus) String() string {
switch s {
case pending:
return "pending"
case running:
return "running"
case finished:
return "finished"

Check warning on line 186 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L183-L186

Added lines #L183 - L186 were not covered by tests
}
return "unknown"

Check warning on line 188 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L188

Added line #L188 was not covered by tests
}

// MarshalJSON marshals to json.
func (s JobStatus) MarshalJSON() ([]byte, error) {
return []byte(`"` + s.String() + `"`), nil
}

// Role is the role of the region.
type Role int

const (
leader Role = iota
// include leader + voter
follower
learner
unknown
)

// NewRole creates a new role.
func NewRole(role string) Role {
switch role {
case "leader":
return leader
case "follower":
return follower

Check warning on line 213 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L210-L213

Added lines #L210 - L213 were not covered by tests
case "learner":
return learner
default:
return unknown

Check warning on line 217 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L216-L217

Added lines #L216 - L217 were not covered by tests
}
}

func (r Role) String() string {
switch r {
case leader:
return "leader"
case follower:
return "follower"

Check warning on line 226 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L223-L226

Added lines #L223 - L226 were not covered by tests
case learner:
return "learner"
default:
return "unknown"

Check warning on line 230 in pkg/schedule/schedulers/balance_range.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/balance_range.go#L229-L230

Added lines #L229 - L230 were not covered by tests
}
}

// MarshalJSON marshals to json.
func (r Role) MarshalJSON() ([]byte, error) {
return []byte(`"` + r.String() + `"`), nil
}
74 changes: 74 additions & 0 deletions pkg/schedule/schedulers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package schedulers

import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -545,4 +547,76 @@
conf.init(sche.GetName(), storage, conf)
return sche, nil
})

// balance key range scheduler
// args: [role, engine, timeout, alias, range1, range2, ...]
RegisterSliceDecoderBuilder(types.BalanceRangeScheduler, func(args []string) ConfigDecoder {
return func(v any) error {
conf, ok := v.(*balanceRangeSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

Check warning on line 558 in pkg/schedule/schedulers/init.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/init.go#L557-L558

Added lines #L557 - L558 were not covered by tests
if len(args) < 5 {
return errs.ErrSchedulerConfig.FastGenByArgs("args length must be greater than 4")
}
role, err := url.QueryUnescape(args[0])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}

Check warning on line 565 in pkg/schedule/schedulers/init.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/init.go#L564-L565

Added lines #L564 - L565 were not covered by tests
jobRole := NewRole(role)
if jobRole == unknown {
return errs.ErrQueryUnescape.FastGenByArgs("role")
}

Check warning on line 569 in pkg/schedule/schedulers/init.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/init.go#L568-L569

Added lines #L568 - L569 were not covered by tests
engine, err := url.QueryUnescape(args[1])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}

Check warning on line 573 in pkg/schedule/schedulers/init.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/init.go#L572-L573

Added lines #L572 - L573 were not covered by tests
timeout, err := url.QueryUnescape(args[2])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}

Check warning on line 577 in pkg/schedule/schedulers/init.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/init.go#L576-L577

Added lines #L576 - L577 were not covered by tests
duration, err := time.ParseDuration(timeout)
if err != nil {
return errs.ErrURLParse.Wrap(err)
}

Check warning on line 581 in pkg/schedule/schedulers/init.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/init.go#L580-L581

Added lines #L580 - L581 were not covered by tests
alias, err := url.QueryUnescape(args[3])
if err != nil {
return errs.ErrURLParse.Wrap(err)
}

Check warning on line 585 in pkg/schedule/schedulers/init.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/init.go#L584-L585

Added lines #L584 - L585 were not covered by tests
ranges, err := getKeyRanges(args[4:])
if err != nil {
return err
}

Check warning on line 589 in pkg/schedule/schedulers/init.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/init.go#L588-L589

Added lines #L588 - L589 were not covered by tests

id := uint64(0)
if len(conf.jobs) > 0 {
id = conf.jobs[len(conf.jobs)-1].JobID + 1
}

Check warning on line 594 in pkg/schedule/schedulers/init.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/init.go#L593-L594

Added lines #L593 - L594 were not covered by tests

job := &balanceRangeSchedulerJob{
Role: jobRole,
Engine: engine,
Timeout: duration,
Alias: alias,
Ranges: ranges,
Status: pending,
JobID: id,
}
conf.jobs = append(conf.jobs, job)
return nil
}
})

RegisterScheduler(types.BalanceRangeScheduler, func(opController *operator.Controller,
storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) {
conf := &balanceRangeSchedulerConfig{
schedulerConfig: newBaseDefaultSchedulerConfig(),
}
if err := decoder(conf); err != nil {
return nil, err
}
sche := newBalanceRangeScheduler(opController, conf)
conf.init(sche.GetName(), storage, conf)
return sche, nil
})
}
Loading