Skip to content

Commit

Permalink
add api
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Nov 15, 2023
1 parent 7dbe607 commit 156e924
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 14 deletions.
106 changes: 106 additions & 0 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
package discovery

import (
"path"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
)
Expand All @@ -35,3 +45,99 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
}
return values, nil
}

func GetMCSPrimary(name string, client *clientv3.Client) (*pdpb.Member, int64, error) {
switch name {
case utils.TSOServiceName, utils.APIServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, 0, err
}
primaryPath := path.Join(endpoint.SvcRootPath(clusterID, name), utils.PrimaryKey)
return election.GetLeader(client, primaryPath)
default:
}
return nil, 0, errors.Errorf("unknown service name %s", name)
}

func GetMembers(name string, client *clientv3.Client) (*clientv3.TxnResponse, error) {
switch name {
case utils.TSOServiceName, utils.APIServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
}
if !resps.Succeeded {
return nil, errs.ErrEtcdTxnConflict.FastGenByArgs()
}
if len(resps.Responses) == 0 {
return nil, errors.Errorf("no member found for service %s", name)
}
return resps, nil
}
return nil, errors.Errorf("unknown service name %s", name)
}

func DeleteMemberByName(name string, client *clientv3.Client) error {
resps, err := GetMembers(name, client)
if err != nil {
return err
}
for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
if name == string(keyValue.Value) {
// delete member
_, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(string(keyValue.Key))).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
break
}
}
}
return nil
}

func TransferPrimaryLeader(name, newLeader string, client *clientv3.Client) error {
switch name {
case utils.TSOServiceName, utils.APIServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return err
}
// delete leader
primaryPath := path.Join(endpoint.SvcRootPath(clusterID, name), utils.PrimaryKey)
resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
// set new leader
resps, err := GetMembers(name, client)
if err != nil {
return err
}
for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
if newLeader == string(keyValue.Value) {
// set new leader
_, err := kv.NewSlowLogTxn(client).Then(clientv3.OpPut(primaryPath, string(keyValue.Value))).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
return nil
}
}
}

default:
}
return errors.Errorf("unknown service name %s", name)
}
6 changes: 3 additions & 3 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
const (
// maxRetryTimes is the max retry times for initializing the cluster ID.
maxRetryTimes = 5
// clusterIDPath is the path to store cluster id
clusterIDPath = "/pd/cluster_id"
// ClusterIDPath is the path to store cluster id
ClusterIDPath = "/pd/cluster_id"
// retryInterval is the interval to retry.
retryInterval = time.Second
)
Expand All @@ -56,7 +56,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 {
if clusterID, err := etcdutil.GetClusterID(client, ClusterIDPath); err == nil && clusterID != 0 {
return clusterID, nil
}
select {
Expand Down
6 changes: 3 additions & 3 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func (m *Participant) PreCheckLeader() error {
return nil
}

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) getPersistentLeader() (participant, int64, error) {
// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) GetPersistentLeader() (participant, int64, error) {
leader := NewParticipantByService(m.serviceName)
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
Expand All @@ -238,7 +238,7 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) {
return nil, true
}

leader, revision, err := m.getPersistentLeader()
leader, revision, err := m.GetPersistentLeader()
if err != nil {
log.Error("getting the leader meets error", errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,22 +291,22 @@ func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp {
// ResourceManagerSvcRootPath returns the root path of resource manager service.
// Path: /ms/{cluster_id}/resource_manager
func ResourceManagerSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.ResourceManagerServiceName)
return SvcRootPath(clusterID, utils.ResourceManagerServiceName)
}

// SchedulingSvcRootPath returns the root path of scheduling service.
// Path: /ms/{cluster_id}/scheduling
func SchedulingSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.SchedulingServiceName)
return SvcRootPath(clusterID, utils.SchedulingServiceName)
}

// TSOSvcRootPath returns the root path of tso service.
// Path: /ms/{cluster_id}/tso
func TSOSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.TSOServiceName)
return SvcRootPath(clusterID, utils.TSOServiceName)
}

func svcRootPath(clusterID uint64, svcName string) string {
func SvcRootPath(clusterID uint64, svcName string) string {
c := strconv.FormatUint(clusterID, 10)
return path.Join(utils.MicroserviceRootPath, c, svcName)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo {
type ElectionMember interface {
// ID returns the unique ID in the election group. For example, it can be unique
// server id of a cluster or the unique keyspace group replica id of the election
// group comprised of the replicas of a keyspace group.
// group composed of the replicas of a keyspace group.
ID() uint64
// ID returns the unique name in the election group.
// Name returns the unique name in the election group.
Name() string
// MemberValue returns the member value.
MemberValue() string
// GetMember() returns the current member
// GetMember returns the current member
GetMember() interface{}
// Client returns the etcd client.
Client() *clientv3.Client
Expand Down
61 changes: 60 additions & 1 deletion server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package api
import (
"context"
"fmt"
"github.com/tikv/pd/pkg/mcs/discovery"
"net/http"
"strconv"

Expand Down Expand Up @@ -52,6 +53,28 @@ func newMemberHandler(svr *server.Server, rd *render.Render) *memberHandler {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /members [get]
func (h *memberHandler) GetMembers(w http.ResponseWriter, r *http.Request) {
if service := r.URL.Query().Get("service"); len(service) > 0 {
log.Info("get leader", zap.String("service", service))
resps, err := discovery.GetMembers(service, h.svr.GetClient())
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if resps == nil {
h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no members for %s", service))
return
}

var apis []string
for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
apis = append(apis, string(keyValue.Value))
}
}
h.rd.JSON(w, http.StatusOK, apis)
return
}

members, err := getMembers(h.svr)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
Expand Down Expand Up @@ -128,6 +151,16 @@ func (h *memberHandler) DeleteMemberByName(w http.ResponseWriter, r *http.Reques
// Get etcd ID by name.
var id uint64
name := mux.Vars(r)["name"]
if service := r.URL.Query().Get("service"); len(service) > 0 {
err := discovery.DeleteMemberByName(service, client)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", service))
return
}

listResp, err := etcdutil.ListEtcdMembers(client)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
Expand Down Expand Up @@ -277,6 +310,20 @@ func newLeaderHandler(svr *server.Server, rd *render.Render) *leaderHandler {
// @Success 200 {object} pdpb.Member
// @Router /leader [get]
func (h *leaderHandler) GetLeader(w http.ResponseWriter, r *http.Request) {
if service := r.URL.Query().Get("service"); len(service) > 0 {
log.Info("get leader", zap.String("service", service))
leader, _, err := discovery.GetMCSPrimary(service, h.svr.GetClient())
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if leader == nil {
h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no leader for %s", service))
return
}
h.rd.JSON(w, http.StatusOK, leader)
return
}
h.rd.JSON(w, http.StatusOK, h.svr.GetLeader())
}

Expand Down Expand Up @@ -304,7 +351,19 @@ func (h *leaderHandler) ResignLeader(w http.ResponseWriter, r *http.Request) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /leader/transfer/{nextLeader} [post]
func (h *leaderHandler) TransferLeader(w http.ResponseWriter, r *http.Request) {
err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), mux.Vars(r)["next_leader"])
newLeader := mux.Vars(r)["next_leader"]
if service := r.URL.Query().Get("service"); len(service) > 0 {
log.Info("transfer leader", zap.String("service", service), zap.String("nextLeader", newLeader))
err := discovery.TransferPrimaryLeader(service, newLeader, h.svr.GetClient())
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, fmt.Sprintf("The transfer command is submitted. %s", newLeader))
return
}

err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), newLeader)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down

0 comments on commit 156e924

Please sign in to comment.