From 156e9244ac9207a7dc5500a92b5bcef962af3af8 Mon Sep 17 00:00:00 2001 From: husharp Date: Wed, 15 Nov 2023 18:08:29 +0800 Subject: [PATCH] add api Signed-off-by: husharp --- pkg/mcs/discovery/discover.go | 106 +++++++++++++++++++++++++++++++ pkg/mcs/utils/util.go | 6 +- pkg/member/participant.go | 6 +- pkg/storage/endpoint/key_path.go | 8 +-- pkg/tso/allocator_manager.go | 6 +- server/api/member.go | 61 +++++++++++++++++- 6 files changed, 179 insertions(+), 14 deletions(-) diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 00e168114b06..a6e9569b1f03 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -15,6 +15,16 @@ package discovery import ( + "path" + "strconv" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/pkg/election" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" ) @@ -35,3 +45,99 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er } return values, nil } + +func GetMCSPrimary(name string, client *clientv3.Client) (*pdpb.Member, int64, error) { + switch name { + case utils.TSOServiceName, utils.APIServiceName, utils.ResourceManagerServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return nil, 0, err + } + primaryPath := path.Join(endpoint.SvcRootPath(clusterID, name), utils.PrimaryKey) + return election.GetLeader(client, primaryPath) + default: + } + return nil, 0, errors.Errorf("unknown service name %s", name) +} + +func GetMembers(name string, client *clientv3.Client) (*clientv3.TxnResponse, error) { + switch name { + case utils.TSOServiceName, utils.APIServiceName, utils.ResourceManagerServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return nil, err + } + servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name) + resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit() + if err != nil { + return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause() + } + if !resps.Succeeded { + return nil, errs.ErrEtcdTxnConflict.FastGenByArgs() + } + if len(resps.Responses) == 0 { + return nil, errors.Errorf("no member found for service %s", name) + } + return resps, nil + } + return nil, errors.Errorf("unknown service name %s", name) +} + +func DeleteMemberByName(name string, client *clientv3.Client) error { + resps, err := GetMembers(name, client) + if err != nil { + return err + } + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + if name == string(keyValue.Value) { + // delete member + _, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(string(keyValue.Key))).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + break + } + } + } + return nil +} + +func TransferPrimaryLeader(name, newLeader string, client *clientv3.Client) error { + switch name { + case utils.TSOServiceName, utils.APIServiceName, utils.ResourceManagerServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return err + } + // delete leader + primaryPath := path.Join(endpoint.SvcRootPath(clusterID, name), utils.PrimaryKey) + resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + if !resp.Succeeded { + return errs.ErrEtcdTxnConflict.FastGenByArgs() + } + // set new leader + resps, err := GetMembers(name, client) + if err != nil { + return err + } + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + if newLeader == string(keyValue.Value) { + // set new leader + _, err := kv.NewSlowLogTxn(client).Then(clientv3.OpPut(primaryPath, string(keyValue.Value))).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + return nil + } + } + } + + default: + } + return errors.Errorf("unknown service name %s", name) +} diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index 682e73f20ae3..a0708f9bf884 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -45,8 +45,8 @@ import ( const ( // maxRetryTimes is the max retry times for initializing the cluster ID. maxRetryTimes = 5 - // clusterIDPath is the path to store cluster id - clusterIDPath = "/pd/cluster_id" + // ClusterIDPath is the path to store cluster id + ClusterIDPath = "/pd/cluster_id" // retryInterval is the interval to retry. retryInterval = time.Second ) @@ -56,7 +56,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err ticker := time.NewTicker(retryInterval) defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { - if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 { + if clusterID, err := etcdutil.GetClusterID(client, ClusterIDPath); err == nil && clusterID != 0 { return clusterID, nil } select { diff --git a/pkg/member/participant.go b/pkg/member/participant.go index b3034a868070..c6396fceffd1 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -215,8 +215,8 @@ func (m *Participant) PreCheckLeader() error { return nil } -// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). -func (m *Participant) getPersistentLeader() (participant, int64, error) { +// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). +func (m *Participant) GetPersistentLeader() (participant, int64, error) { leader := NewParticipantByService(m.serviceName) ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { @@ -238,7 +238,7 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) { return nil, true } - leader, revision, err := m.getPersistentLeader() + leader, revision, err := m.GetPersistentLeader() if err != nil { log.Error("getting the leader meets error", errs.ZapError(err)) time.Sleep(200 * time.Millisecond) diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index cac40db29c54..2da8aabc2ca1 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -291,22 +291,22 @@ func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp { // ResourceManagerSvcRootPath returns the root path of resource manager service. // Path: /ms/{cluster_id}/resource_manager func ResourceManagerSvcRootPath(clusterID uint64) string { - return svcRootPath(clusterID, utils.ResourceManagerServiceName) + return SvcRootPath(clusterID, utils.ResourceManagerServiceName) } // SchedulingSvcRootPath returns the root path of scheduling service. // Path: /ms/{cluster_id}/scheduling func SchedulingSvcRootPath(clusterID uint64) string { - return svcRootPath(clusterID, utils.SchedulingServiceName) + return SvcRootPath(clusterID, utils.SchedulingServiceName) } // TSOSvcRootPath returns the root path of tso service. // Path: /ms/{cluster_id}/tso func TSOSvcRootPath(clusterID uint64) string { - return svcRootPath(clusterID, utils.TSOServiceName) + return SvcRootPath(clusterID, utils.TSOServiceName) } -func svcRootPath(clusterID uint64, svcName string) string { +func SvcRootPath(clusterID uint64, svcName string) string { c := strconv.FormatUint(clusterID, 10) return path.Join(utils.MicroserviceRootPath, c, svcName) } diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index df0ca0affc97..7f21b8659079 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo { type ElectionMember interface { // ID returns the unique ID in the election group. For example, it can be unique // server id of a cluster or the unique keyspace group replica id of the election - // group comprised of the replicas of a keyspace group. + // group composed of the replicas of a keyspace group. ID() uint64 - // ID returns the unique name in the election group. + // Name returns the unique name in the election group. Name() string // MemberValue returns the member value. MemberValue() string - // GetMember() returns the current member + // GetMember returns the current member GetMember() interface{} // Client returns the etcd client. Client() *clientv3.Client diff --git a/server/api/member.go b/server/api/member.go index 3016b76088b6..88577f962fa7 100644 --- a/server/api/member.go +++ b/server/api/member.go @@ -17,6 +17,7 @@ package api import ( "context" "fmt" + "github.com/tikv/pd/pkg/mcs/discovery" "net/http" "strconv" @@ -52,6 +53,28 @@ func newMemberHandler(svr *server.Server, rd *render.Render) *memberHandler { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /members [get] func (h *memberHandler) GetMembers(w http.ResponseWriter, r *http.Request) { + if service := r.URL.Query().Get("service"); len(service) > 0 { + log.Info("get leader", zap.String("service", service)) + resps, err := discovery.GetMembers(service, h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if resps == nil { + h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no members for %s", service)) + return + } + + var apis []string + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + apis = append(apis, string(keyValue.Value)) + } + } + h.rd.JSON(w, http.StatusOK, apis) + return + } + members, err := getMembers(h.svr) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) @@ -128,6 +151,16 @@ func (h *memberHandler) DeleteMemberByName(w http.ResponseWriter, r *http.Reques // Get etcd ID by name. var id uint64 name := mux.Vars(r)["name"] + if service := r.URL.Query().Get("service"); len(service) > 0 { + err := discovery.DeleteMemberByName(service, client) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", service)) + return + } + listResp, err := etcdutil.ListEtcdMembers(client) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) @@ -277,6 +310,20 @@ func newLeaderHandler(svr *server.Server, rd *render.Render) *leaderHandler { // @Success 200 {object} pdpb.Member // @Router /leader [get] func (h *leaderHandler) GetLeader(w http.ResponseWriter, r *http.Request) { + if service := r.URL.Query().Get("service"); len(service) > 0 { + log.Info("get leader", zap.String("service", service)) + leader, _, err := discovery.GetMCSPrimary(service, h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if leader == nil { + h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no leader for %s", service)) + return + } + h.rd.JSON(w, http.StatusOK, leader) + return + } h.rd.JSON(w, http.StatusOK, h.svr.GetLeader()) } @@ -304,7 +351,19 @@ func (h *leaderHandler) ResignLeader(w http.ResponseWriter, r *http.Request) { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /leader/transfer/{nextLeader} [post] func (h *leaderHandler) TransferLeader(w http.ResponseWriter, r *http.Request) { - err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), mux.Vars(r)["next_leader"]) + newLeader := mux.Vars(r)["next_leader"] + if service := r.URL.Query().Get("service"); len(service) > 0 { + log.Info("transfer leader", zap.String("service", service), zap.String("nextLeader", newLeader)) + err := discovery.TransferPrimaryLeader(service, newLeader, h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, fmt.Sprintf("The transfer command is submitted. %s", newLeader)) + return + } + + err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), newLeader) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return