Skip to content

Commit

Permalink
ch4-04: 完善
Browse files Browse the repository at this point in the history
  • Loading branch information
chai2010 committed Jul 9, 2018
1 parent 504b58f commit 9c90459
Showing 1 changed file with 159 additions and 16 deletions.
175 changes: 159 additions & 16 deletions ch4-rpc/ch4-04-grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,33 +206,176 @@ for {

这样就完成了完整的流接收和发送支持。

## 发布和订阅模式

<!--
Publish
Watch
在前一节中,我们基于Go内置的RPC库实现了一个简化版的Watch方法。基于Watch的思路虽然也可以构造发布和订阅系统,但是因为RPC缺乏流机制导致每次只能返回一个结果。在发布和订阅模式中,由调用者主动发起的发布行为类似一个普通函数调用,而被动的订阅者则类似GRPC客户端单向流中的接收者。现在我们可以尝试基于GRPC的流特性构造一个发布和订阅系统。

TODO
发布订阅是一个常见的设计模式,开源社区中已经存在很多该模式的实现。其中docker项目中提供了一个pubsub的极简实现,下面是基于pubsub包实现的本地发布订阅代码:

## 认证
```go
import (
"github.com/docker/docker/pkg/pubsub"
)

TODO
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)

golang := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.Hasprefix("golang:") {
return true
}
}
return false
})
docker := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.Hasprefix("docker:") {
return true
}
}
return false
})

go p.Publish("hi")
go p.Publish("golang: https://golang.org")
go p.Publish("docker: https://www.docker.com/")
time.Sleep(1)

go func () {
fmt.Println("golang topic:", <-golang)
} ()
go func () {
fmt.Println("docker topic:", <-docker)
} ()

<-make(chan bool)
}
```

其中`pubsub.NewPublisher`构造一个发布对象,`p.SubscribeTopic()`可以通过函数筛选感兴趣的主题进行订阅。

入门/流/认证
现在尝试基于gRPC和pubsub包,提供一个跨网络的发布和订阅系统。首先通过Protobuf定义一个发布订阅服务接口:

--
```protobuf
service PubsubService {
rpc Publish (String) returns (String);
rpc SubscribeTopic (String) returns (stream String);
}
```

简单介绍
其中Publish是普通的RPC方法,SubscribeTopic则是一个单向的流服务。然后grpc插件会为服务端和客户端生成对应的接口:

同步/异步
```go
type PubsubServiceServer interface {
Publish(context.Context, *String) (*String, error)
Subscribe(*String, PubsubService_SubscribeServer) error
}
type PubsubServiceClient interface {
Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubsubService_SubscribeClient, error)
}

type HelloService_SubscribeServer interface {
Send(*String) error
grpc.ServerStream
}
```

然后就可以实现发布和订阅服务了:

验证/密码
```go
type PubsubService struct {
pub *pubsub.Publisher
}

日志截取器,panic 捕获
func NewPubsubService() *PubsubService {
return &PubsubService{
pub: pubsub.NewPublisher(100*time.Millisecond, 10),
}
}
```

然后是实现发布方法和订阅方法:

```go
func (p *PubsubService) Publish(ctx context.Context, arg *String) (*String, error) {
p.pub.Publish(arg.GetValue())
return &String{}, nil
}

func (p *PubsubService) Subscribe(arg *String, stream PubsubService_SubscribeServer) error {
ch := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.Hasprefix(arg.GetValue()) {
return true
}
}
return false
})

for v := range ch {
if err := stream.Send(&String{Value: v.(string)}); err != nil {
return err
}
}

return nil
}
```

这样就可以从客户端向服务器发布信息了:

```go
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()

client := NewPubsubServiceClient(conn)

reply, err := client.Publish(context.Background(), &String{Value: "golang: hello Go"})
if err != nil {
log.Fatal(err)
}
reply, err := client.Publish(context.Background(), &String{Value: "docker: hello Docker"})
if err != nil {
log.Fatal(err)
}
}
```

然后就可以在新的客户端进行订阅信息了:

```go
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()

client := NewPubsubServiceClient(conn)
stream, err := client.Channel(context.Background(), &String{Value: "golang:"})
if err != nil {
log.Fatal(err)
}

for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}

fmt.Println(reply.GetValue())
}
}
```

gtpc到rest扩展
到此我们就基于GRPC简单实现了一个跨网络的发布和订阅服务。

参数的自动验证,在截取器进行
-->

0 comments on commit 9c90459

Please sign in to comment.