Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Nov 16, 2023
1 parent 7dbe607 commit 041c6ba
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 10 deletions.
132 changes: 132 additions & 0 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@
package discovery

import (
"go.uber.org/zap"
"path"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
)
Expand All @@ -35,3 +47,123 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
}
return values, nil
}

func getMCSPrimaryPath(name string, keyspaceGroupID uint32, client *clientv3.Client) (string, error) {
switch name {
case utils.TSOServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
rootPath := endpoint.TSOSvcRootPath(clusterID)
primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, keyspaceGroupID)
return primaryPath, nil
case utils.SchedulingServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
return path.Join(endpoint.SchedulingSvcRootPath(clusterID), utils.PrimaryKey), nil
case utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
return path.Join(endpoint.ResourceManagerSvcRootPath(clusterID), utils.PrimaryKey), nil
default:
}
return "", errors.Errorf("unknown service name %s", name)
}

func GetMCSPrimary(name string, keyspaceGroupID uint32, client *clientv3.Client) (*pdpb.Member, int64, error) {
primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client)
if err != nil {
return nil, 0, err
}

return election.GetLeader(client, primaryPath)
}

func GetMembers(name string, client *clientv3.Client) (*clientv3.TxnResponse, error) {
switch name {
case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
}
if !resps.Succeeded {
return nil, errs.ErrEtcdTxnConflict.FastGenByArgs()
}
if len(resps.Responses) == 0 {
return nil, errors.Errorf("no member found for service %s", name)
}
return resps, nil
}
return nil, errors.Errorf("unknown service name %s", name)
}

func DeleteMemberByName(service, ip string, client *clientv3.Client) error {
resps, err := GetMembers(service, client)
if err != nil {
return err
}
for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
var entry ServiceRegistryEntry
if err = entry.Deserialize(keyValue.Value); err != nil {
log.Error("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)), zap.String("ip", ip), zap.Error(err))
return err
}

log.Info("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(entry.ServiceAddr)), zap.String("ip", ip))

Check failure on line 123 in pkg/mcs/discovery/discover.go

View workflow job for this annotation

GitHub Actions / statics

unnecessary conversion (unconvert)
if ip == entry.ServiceAddr {
// delete member
_, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(string(keyValue.Key))).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
return nil
}
}
}
return errors.Errorf("no ip %s found for service %s", ip, service)
}

func TransferPrimaryLeader(name, newLeader string, keyspaceGroupID uint32, client *clientv3.Client) error {
// delete old leader
primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client)
if err != nil {
return err
}
resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
// set new leader
resps, err := GetMembers(name, client)
if err != nil {
return err
}
for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
if newLeader == string(keyValue.Value) {
// set new leader
_, err := kv.NewSlowLogTxn(client).Then(clientv3.OpPut(primaryPath, string(keyValue.Value))).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
return nil
}
}
}

return errors.Errorf("no new leader found for service %s", name)
}
6 changes: 3 additions & 3 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
const (
// maxRetryTimes is the max retry times for initializing the cluster ID.
maxRetryTimes = 5
// clusterIDPath is the path to store cluster id
clusterIDPath = "/pd/cluster_id"
// ClusterIDPath is the path to store cluster id
ClusterIDPath = "/pd/cluster_id"
// retryInterval is the interval to retry.
retryInterval = time.Second
)
Expand All @@ -56,7 +56,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 {
if clusterID, err := etcdutil.GetClusterID(client, ClusterIDPath); err == nil && clusterID != 0 {
return clusterID, nil
}
select {
Expand Down
6 changes: 3 additions & 3 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func (m *Participant) PreCheckLeader() error {
return nil
}

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) getPersistentLeader() (participant, int64, error) {
// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) GetPersistentLeader() (participant, int64, error) {
leader := NewParticipantByService(m.serviceName)
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
Expand All @@ -238,7 +238,7 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) {
return nil, true
}

leader, revision, err := m.getPersistentLeader()
leader, revision, err := m.GetPersistentLeader()
if err != nil {
log.Error("getting the leader meets error", errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
Expand Down
6 changes: 3 additions & 3 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo {
type ElectionMember interface {
// ID returns the unique ID in the election group. For example, it can be unique
// server id of a cluster or the unique keyspace group replica id of the election
// group comprised of the replicas of a keyspace group.
// group composed of the replicas of a keyspace group.
ID() uint64
// ID returns the unique name in the election group.
// Name returns the unique name in the election group.
Name() string
// MemberValue returns the member value.
MemberValue() string
// GetMember() returns the current member
// GetMember returns the current member
GetMember() interface{}
// Client returns the etcd client.
Client() *clientv3.Client
Expand Down
91 changes: 90 additions & 1 deletion server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand Down Expand Up @@ -52,6 +54,28 @@ func newMemberHandler(svr *server.Server, rd *render.Render) *memberHandler {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /members [get]
func (h *memberHandler) GetMembers(w http.ResponseWriter, r *http.Request) {
if service := r.URL.Query().Get("service"); len(service) > 0 {
log.Info("get members", zap.String("service", service))
resps, err := discovery.GetMembers(service, h.svr.GetClient())
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if resps == nil {
h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no members for %s", service))
return
}

var apis []string
for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
apis = append(apis, string(keyValue.Value))
}
}
h.rd.JSON(w, http.StatusOK, apis)
return
}

members, err := getMembers(h.svr)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
Expand Down Expand Up @@ -167,6 +191,24 @@ func (h *memberHandler) DeleteMemberByName(w http.ResponseWriter, r *http.Reques
h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", name))
}

func (h *memberHandler) DeleteMemberByMCS(w http.ResponseWriter, r *http.Request) {
client := h.svr.GetClient()

if service := r.URL.Query().Get("service"); len(service) > 0 {
if ip := r.URL.Query().Get("ip"); len(service) > 0 {
err := discovery.DeleteMemberByName(service, ip, client)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", service))
return
}

h.rd.JSON(w, http.StatusInternalServerError, "not support service")
}

// @Tags member
// @Summary Remove a PD server from the cluster.
// @Param id path integer true "PD server Id"
Expand Down Expand Up @@ -277,9 +319,35 @@ func newLeaderHandler(svr *server.Server, rd *render.Render) *leaderHandler {
// @Success 200 {object} pdpb.Member
// @Router /leader [get]
func (h *leaderHandler) GetLeader(w http.ResponseWriter, r *http.Request) {
if service := r.URL.Query().Get("service"); len(service) > 0 {
keyspaceGroupID := utils.DefaultKeyspaceGroupID
if id := r.URL.Query().Get("group_id"); len(id) > 0 {
id, err := strconv.ParseUint(id, 10, 64)
if err != nil || !isValid(uint32(id)) {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
keyspaceGroupID = uint32(id)
}
leader, _, err := discovery.GetMCSPrimary(service, keyspaceGroupID, h.svr.GetClient())
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if leader == nil {
h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no leader for %s", service))
return
}
h.rd.JSON(w, http.StatusOK, leader)
return
}
h.rd.JSON(w, http.StatusOK, h.svr.GetLeader())
}

func isValid(id uint32) bool {
return id >= utils.DefaultKeyspaceGroupID && id <= utils.MaxKeyspaceGroupCountInUse
}

// @Tags leader
// @Summary Transfer etcd leadership to another PD server.
// @Produce json
Expand All @@ -304,7 +372,28 @@ func (h *leaderHandler) ResignLeader(w http.ResponseWriter, r *http.Request) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /leader/transfer/{nextLeader} [post]
func (h *leaderHandler) TransferLeader(w http.ResponseWriter, r *http.Request) {
err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), mux.Vars(r)["next_leader"])
newLeader := mux.Vars(r)["next_leader"]
if service := r.URL.Query().Get("service"); len(service) > 0 {
log.Info("transfer leader", zap.String("service", service), zap.String("nextLeader", newLeader))
keyspaceGroupID := utils.DefaultKeyspaceGroupID
if id := r.URL.Query().Get("group_id"); len(id) > 0 {
id, err := strconv.ParseUint(id, 10, 64)
if err != nil || !isValid(uint32(id)) {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
keyspaceGroupID = uint32(id)
}
err := discovery.TransferPrimaryLeader(service, newLeader, keyspaceGroupID, h.svr.GetClient())
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, fmt.Sprintf("The transfer command is submitted. %s", newLeader))
return
}

err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), newLeader)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(apiRouter, "/members/name/{name}", memberHandler.DeleteMemberByName, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/members/id/{id}", memberHandler.DeleteMemberByID, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/members/name/{name}", memberHandler.SetMemberPropertyByName, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/members/mcs", memberHandler.DeleteMemberByMCS, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))

leaderHandler := newLeaderHandler(svr, rd)
registerFunc(apiRouter, "/leader", leaderHandler.GetLeader, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down

0 comments on commit 041c6ba

Please sign in to comment.