Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: add a new scheduler to balance the regions of the given key range #8988

Merged
merged 14 commits into from
Feb 10, 2025
10 changes: 10 additions & 0 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core

import (
"bytes"
"encoding/json"

"github.com/tikv/pd/pkg/core/constant"
)
Expand Down Expand Up @@ -156,6 +157,15 @@ type KeyRange struct {
EndKey []byte `json:"end-key"`
}

// MarshalJSON marshals to json.
func (kr KeyRange) MarshalJSON() ([]byte, error) {
m := map[string]string{
"start-key": HexRegionKeyStr(kr.StartKey),
"end-key": HexRegionKeyStr(kr.EndKey),
}
return json.Marshal(m)
}

// NewKeyRange create a KeyRange with the given start key and end key.
func NewKeyRange(startKey, endKey string) KeyRange {
return KeyRange{
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,11 @@ func (c *Cluster) updateScheduler() {
)
// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
schedulerType := types.ConvertOldStrToType[scheduler.Type]
schedulerType, ok := types.ConvertOldStrToType[scheduler.Type]
if !ok {
log.Error("scheduler not found", zap.String("type", scheduler.Type))
continue
}
s, err := schedulers.CreateScheduler(
schedulerType,
c.coordinator.GetOperatorController(),
Expand Down
6 changes: 0 additions & 6 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ func RegisterScheduler(typ types.CheckerSchedulerType) {
schedulerMap.Store(typ, struct{}{})
}

// IsSchedulerRegistered checks if the named scheduler type is registered.
func IsSchedulerRegistered(typ types.CheckerSchedulerType) bool {
_, ok := schedulerMap.Load(typ)
return ok
}

// SchedulerConfigProvider is the interface for scheduler configurations.
type SchedulerConfigProvider interface {
SharedConfigProvider
Expand Down
237 changes: 237 additions & 0 deletions pkg/schedule/schedulers/balance_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/unrolled/render"

"github.com/pingcap/log"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/syncutil"
)

type balanceRangeSchedulerHandler struct {
rd *render.Render
config *balanceRangeSchedulerConfig
}

func newBalanceRangeHandler(conf *balanceRangeSchedulerConfig) http.Handler {
handler := &balanceRangeSchedulerHandler{
config: conf,
rd: render.New(render.Options{IndentJSON: true}),
}
router := mux.NewRouter()
router.HandleFunc("/config", handler.updateConfig).Methods(http.MethodPost)
router.HandleFunc("/list", handler.listConfig).Methods(http.MethodGet)
return router
}

func (handler *balanceRangeSchedulerHandler) updateConfig(w http.ResponseWriter, _ *http.Request) {
handler.rd.JSON(w, http.StatusBadRequest, "update config is not supported")
}

func (handler *balanceRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.clone()
if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil {
log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err))
}
}

type balanceRangeSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
jobs []*balanceRangeSchedulerJob
}

type balanceRangeSchedulerJob struct {
JobID uint64 `json:"job-id"`
Role Role `json:"role"`
Engine string `json:"engine"`
Timeout time.Duration `json:"timeout"`
Ranges []core.KeyRange `json:"ranges"`
Alias string `json:"alias"`
Status JobStatus `json:"status"`
}

func (conf *balanceRangeSchedulerConfig) clone() []*balanceRangeSchedulerJob {
conf.RLock()
defer conf.RUnlock()
jobs := make([]*balanceRangeSchedulerJob, 0, len(conf.jobs))
for _, job := range conf.jobs {
ranges := make([]core.KeyRange, len(job.Ranges))
copy(ranges, job.Ranges)
jobs = append(jobs, &balanceRangeSchedulerJob{
Ranges: ranges,
Role: job.Role,
Engine: job.Engine,
Timeout: job.Timeout,
Alias: job.Alias,
JobID: job.JobID,
})
}

return jobs
}

// EncodeConfig serializes the config.
func (s *balanceRangeScheduler) EncodeConfig() ([]byte, error) {
s.conf.RLock()
defer s.conf.RUnlock()
return EncodeConfig(s.conf.jobs)
}

// ReloadConfig reloads the config.
func (s *balanceRangeScheduler) ReloadConfig() error {
s.conf.Lock()
defer s.conf.Unlock()

jobs := make([]*balanceRangeSchedulerJob, 0, len(s.conf.jobs))
if err := s.conf.load(jobs); err != nil {
return err
}
s.conf.jobs = jobs
return nil
}

type balanceRangeScheduler struct {
*BaseScheduler
conf *balanceRangeSchedulerConfig
handler http.Handler
filters []filter.Filter
filterCounter *filter.Counter
}

// ServeHTTP implements the http.Handler interface.
func (s *balanceRangeScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}

// Schedule schedules the balance key range operator.
func (*balanceRangeScheduler) Schedule(_cluster sche.SchedulerCluster, _dryRun bool) ([]*operator.Operator, []plan.Plan) {
log.Debug("balance key range scheduler is scheduling, need to implement")
return nil, nil
}

// IsScheduleAllowed checks if the scheduler is allowed to schedule new operators.
func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := s.OpController.OperatorCount(operator.OpRange) < cluster.GetSchedulerConfig().GetRegionScheduleLimit()
if !allowed {
operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange)
}
return allowed
}

// BalanceRangeCreateOption is used to create a scheduler with an option.
type BalanceRangeCreateOption func(s *balanceRangeScheduler)

// newBalanceRangeScheduler creates a scheduler that tends to keep given peer Role on
// special store balanced.
func newBalanceRangeScheduler(opController *operator.Controller, conf *balanceRangeSchedulerConfig, options ...BalanceRangeCreateOption) Scheduler {
s := &balanceRangeScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceRangeScheduler, conf),
conf: conf,
handler: newBalanceRangeHandler(conf),
}
for _, option := range options {
option(s)
}
s.filters = []filter.Filter{
&filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium},
filter.NewSpecialUseFilter(s.GetName()),
}
s.filterCounter = filter.NewCounter(s.GetName())
return s
}

// JobStatus is the status of the job.
type JobStatus int

const (
pending JobStatus = iota
running
finished
)

func (s JobStatus) String() string {
switch s {
case pending:
return "pending"
case running:
return "running"
case finished:
return "finished"
}
return "unknown"
}

// MarshalJSON marshals to json.
func (s JobStatus) MarshalJSON() ([]byte, error) {
return []byte(`"` + s.String() + `"`), nil
}

// Role is the role of the region.
type Role int

const (
leader Role = iota
// include leader + voter
follower
learner
unknown
)

// NewRole creates a new role.
func NewRole(role string) Role {
switch role {
case "leader":
return leader
case "follower":
return follower
case "learner":
return learner
default:
return unknown
}
}

func (r Role) String() string {
switch r {
case leader:
return "leader"
case follower:
return "follower"
case learner:
return "learner"
default:
return "unknown"
}
}

// MarshalJSON marshals to json.
func (r Role) MarshalJSON() ([]byte, error) {
return []byte(`"` + r.String() + `"`), nil
}
74 changes: 74 additions & 0 deletions pkg/schedule/schedulers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package schedulers

import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -545,4 +547,76 @@ func schedulersRegister() {
conf.init(sche.GetName(), storage, conf)
return sche, nil
})

// balance key range scheduler
// args: [role, engine, timeout, table-name, range1, range2, ...]
RegisterSliceDecoderBuilder(types.BalanceRangeScheduler, func(args []string) ConfigDecoder {
return func(v any) error {
conf, ok := v.(*balanceRangeSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
if len(args) < 5 {
return errs.ErrSchedulerConfig.FastGenByArgs("args length must be greater than 4")
}
role, err := url.QueryUnescape(args[0])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}
jobRole := NewRole(role)
if jobRole == unknown {
return errs.ErrQueryUnescape.FastGenByArgs("role")
}
engine, err := url.QueryUnescape(args[1])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}
timeout, err := url.QueryUnescape(args[2])
if err != nil {
return errs.ErrQueryUnescape.Wrap(err)
}
duration, err := time.ParseDuration(timeout)
if err != nil {
return errs.ErrURLParse.Wrap(err)
}
alias, err := url.QueryUnescape(args[3])
if err != nil {
return errs.ErrURLParse.Wrap(err)
}
ranges, err := getKeyRanges(args[4:])
if err != nil {
return err
}

id := uint64(0)
if len(conf.jobs) > 0 {
id = conf.jobs[len(conf.jobs)-1].JobID + 1
}

job := &balanceRangeSchedulerJob{
Role: jobRole,
Engine: engine,
Timeout: duration,
Alias: alias,
Ranges: ranges,
Status: pending,
JobID: id,
}
conf.jobs = append(conf.jobs, job)
return nil
}
})

RegisterScheduler(types.BalanceRangeScheduler, func(opController *operator.Controller,
storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) {
conf := &balanceRangeSchedulerConfig{
schedulerConfig: newBaseDefaultSchedulerConfig(),
}
if err := decoder(conf); err != nil {
return nil, err
}
sche := newBalanceRangeScheduler(opController, conf)
conf.init(sche.GetName(), storage, conf)
return sche, nil
})
}
Loading
Loading