Skip to content

Commit

Permalink
Merge pull request #34 from symsimmy/fix/namespace_kv
Browse files Browse the repository at this point in the history
feature registry namespace
  • Loading branch information
symsimmy authored Jun 25, 2024
2 parents 6600ec6 + f1b6fc2 commit 27b2ad4
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 16 deletions.
3 changes: 2 additions & 1 deletion cluster/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package gate

import (
"context"
"fmt"
"github.com/symsimmy/due/cluster"
"github.com/symsimmy/due/common/link"
"github.com/symsimmy/due/errors"
Expand Down Expand Up @@ -281,7 +282,7 @@ func (g *Gate) stopCatServer() {
func (g *Gate) registerServiceInstance() {
g.instance = &registry.ServiceInstance{
ID: g.opts.id,
Name: string(cluster.Gate),
Name: fmt.Sprintf("%s%s", g.opts.namespace, string(cluster.Gate)),
Kind: cluster.Gate,
Alias: g.opts.name,
State: cluster.Work,
Expand Down
3 changes: 2 additions & 1 deletion cluster/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package master

import (
"context"
"fmt"
"github.com/symsimmy/due/cluster"
"github.com/symsimmy/due/common/endpoint"
xnet "github.com/symsimmy/due/common/net"
Expand Down Expand Up @@ -120,7 +121,7 @@ func (m *Master) registerServiceInstance() {

m.instance = &registry.ServiceInstance{
ID: m.opts.id,
Name: string(cluster.Master),
Name: fmt.Sprintf("%s%s", m.opts.namespace, string(cluster.Master)),
Kind: cluster.Node,
Alias: m.opts.name,
State: m.getState(),
Expand Down
9 changes: 8 additions & 1 deletion cluster/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package node

import (
"context"
"fmt"
"github.com/symsimmy/due/cluster"
"github.com/symsimmy/due/component"
"github.com/symsimmy/due/log"
Expand Down Expand Up @@ -223,9 +224,15 @@ func (n *Node) registerServiceInstance() {
events = append(events, event)
}

var name string
if n.opts.namespace != "" {
name = fmt.Sprintf("%s%s.%s", n.opts.namespace, string(cluster.Node), n.opts.name)
} else {
name = string(cluster.Node)
}
n.instance = &registry.ServiceInstance{
ID: n.opts.id,
Name: string(cluster.Node),
Name: name,
Kind: cluster.Node,
Alias: n.opts.name,
State: n.getState(),
Expand Down
15 changes: 9 additions & 6 deletions common/link/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package link

import (
"context"
"fmt"
"github.com/symsimmy/due/cluster"
"github.com/symsimmy/due/common/dispatcher"
"github.com/symsimmy/due/common/endpoint"
Expand Down Expand Up @@ -253,7 +252,7 @@ func (l *Link) FetchServiceAliasIDs(ctx context.Context, kind cluster.Kind, alia
}

func (l *Link) FetchServiceAliasList(ctx context.Context, kind cluster.Kind, alias string, states ...cluster.State) ([]*registry.ServiceInstance, error) {
services, err := l.opts.Registry.Services(ctx, fmt.Sprintf("%s%s", l.opts.Namespace, string(kind)))
services, err := l.opts.Registry.Services(ctx, l.opts.Namespace, kind, alias)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1023,14 +1022,19 @@ func (l *Link) getNodeClientByNID(nid string) (transport.NodeClient, error) {
// WatchServiceInstance 监听服务实例
func (l *Link) WatchServiceInstance(ctx context.Context, kinds ...cluster.Kind) {
for _, kind := range kinds {
l.watchServiceInstance(ctx, kind)
if kind == cluster.Node {
l.watchServiceInstance(ctx, kind, "center")
l.watchServiceInstance(ctx, kind, "game")
} else {
l.watchServiceInstance(ctx, kind, "")
}
}
}

// 监听服务实例
func (l *Link) watchServiceInstance(ctx context.Context, kind cluster.Kind) {
func (l *Link) watchServiceInstance(ctx context.Context, kind cluster.Kind, alias string) {
rctx, rcancel := context.WithTimeout(ctx, 10*time.Second)
watcher, err := l.opts.Registry.Watch(rctx, string(kind))
watcher, err := l.opts.Registry.Watch(rctx, l.opts.Namespace, kind, alias)
rcancel()
if err != nil {
log.Fatalf("the dispatcher instance watch failed: %v", err)
Expand All @@ -1049,7 +1053,6 @@ func (l *Link) watchServiceInstance(ctx context.Context, kind cluster.Kind) {
if err != nil {
continue
}

if kind == cluster.Node {
l.nodeDispatcher.ReplaceServices(services...)
} else {
Expand Down
2 changes: 1 addition & 1 deletion registry/consul/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *registrar) register(ctx context.Context, ins *registry.ServiceInstance)

registration := &api.AgentServiceRegistration{
ID: ins.ID,
Name: fmt.Sprintf("%s%s", ins.Namespace, ins.Name),
Name: ins.Name,
Tags: make([]string, 0, len(ins.Events)),
Meta: make(map[string]string, 3),
Address: overwriteHost,
Expand Down
14 changes: 12 additions & 2 deletions registry/consul/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func (r *Registry) Deregister(ctx context.Context, ins *registry.ServiceInstance
}

// Services 获取服务实例列表
func (r *Registry) Services(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
func (r *Registry) Services(ctx context.Context, namespace string, kind cluster.Kind, alias string) ([]*registry.ServiceInstance, error) {
serviceName := r.getServicesName(namespace, kind, alias)
if r.err != nil {
return nil, r.err
}
Expand All @@ -102,7 +103,8 @@ func (r *Registry) Services(ctx context.Context, serviceName string) ([]*registr
}

// Watch 监听服务
func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
func (r *Registry) Watch(ctx context.Context, namespace string, kind cluster.Kind, alias string) (registry.Watcher, error) {
serviceName := r.getServicesName(namespace, kind, alias)
if r.err != nil {
return nil, r.err
}
Expand All @@ -121,6 +123,14 @@ func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watc
return w.fork(), nil
}

func (r *Registry) getServicesName(namespace string, kind cluster.Kind, alias string) string {
if alias == "" {
return fmt.Sprintf("%s%s", namespace, string(kind))
} else {
return fmt.Sprintf("%s%s.%s", namespace, string(kind), alias)
}
}

// 获取服务实体列表
func (r *Registry) services(ctx context.Context, serviceName string, waitIndex uint64, passingOnly bool) ([]*registry.ServiceInstance, uint64, error) {
opts := &api.QueryOptions{
Expand Down
8 changes: 4 additions & 4 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ type Registry interface {
// Deregister 解注册服务实例
Deregister(ctx context.Context, ins *ServiceInstance) error
// Watch 监听相同服务名的服务实例变化
Watch(ctx context.Context, serviceName string) (Watcher, error)
Watch(ctx context.Context, namespace string, kind cluster.Kind, alias string) (Watcher, error)
// Services 获取服务实例列表
Services(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
Services(ctx context.Context, namespace string, kind cluster.Kind, alias string) ([]*ServiceInstance, error)
}

type Discovery interface {
// Watch 监听相同服务名的服务实例变化
Watch(ctx context.Context, serviceName string) (Watcher, error)
Watch(ctx context.Context, namespace string, kind cluster.Kind, alias string) (Watcher, error)
// Services 获取服务实例列表
Services(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
Services(ctx context.Context, namespace string, kind cluster.Kind, alias string) ([]*ServiceInstance, error)
}

type Watcher interface {
Expand Down

0 comments on commit 27b2ad4

Please sign in to comment.