Skip to content

Commit

Permalink
ch4-04: done
Browse files Browse the repository at this point in the history
  • Loading branch information
chai2010 committed Jul 3, 2018
1 parent f227bb7 commit c3c6370
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 10 deletions.
2 changes: 1 addition & 1 deletion SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* [4.1. RPC入门](ch4-rpc/ch4-01-rpc-intro.md)
* [4.2. Protobuf](ch4-rpc/ch4-02-pb-intro.md)
* [4.3. 玩转RPC](ch4-rpc/ch4-03-netrpc-hack.md)
* [4.4. GRPC入门(TODO)](ch4-rpc/ch4-04-grpc.md)
* [4.4. GRPC入门](ch4-rpc/ch4-04-grpc.md)
* [4.5. GRPC进阶(TODO)](ch4-rpc/ch4-05-grpc-hack.md)
* [4.6. Protobuf扩展语法和插件(TODO)](ch4-rpc/ch4-06-pb-option.md)
* [4.7. 其它RPC系统(TODO)](ch4-rpc/ch4-07-other-rpc.md)
Expand Down
118 changes: 117 additions & 1 deletion ch4-rpc/ch4-04-grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,129 @@ GRPC和标准库的RPC框架还有一个区别,GRPC生成的接口并不支持

## GRPC流

RPC是远程函数调用,因此每次调用的函数参数和返回值不能太大,负责将严重影响每次调用的性能。因此传统的RPC方法调用对于上传和下载较大数据量场景并不适合。同时传统RPC模式也不适用于对于时间不确定的订阅和发布模式。为此,GRPC框架分别提供了服务器端和客户端的流特性。

服务端或客户端的单向流是双向流的特例,我们在HelloService增加一个支持双向流的Channel方法:

```proto
service HelloService {
rpc Hello (String) returns (String);
rpc Channel (stream String) returns (stream String);
}
```

关键字stream指定启用流特性,参数部分是接收客户端参数的流,返回值是返回给客户端的流。

重新生成代码可以可以看到接口中新增加的Channel方法的定义:

```go
type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
Channel(HelloService_ChannelServer) error
}
type HelloServiceClient interface {
Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
Channel(ctx context.Context, opts ...grpc.CallOption) (HelloService_ChannelClient, error)
}
```

在服务端的Channel方法参数是一个新的HelloService_ChannelServer类型的参数,可以用于和客户端双向通信。客户端的Channel方法返回一个HelloService_ChannelClient类型的返回值,可以用于和服务端进行双向通信。

HelloService_ChannelServer和HelloService_ChannelClient均为接口类型:

```go
type HelloService_ChannelServer interface {
Send(*String) error
Recv() (*String, error)
grpc.ServerStream
}

type HelloService_ChannelClient interface {
Send(*String) error
Recv() (*String, error)
grpc.ClientStream
}
```

可以发现服务端和客户端的流辅助接口均定义了Send和Recv方法用于流数据的双向通信。

现在我们可以实现流服务:

```go
func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
for {
args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

reply := &String{Value: "hello:" + args.GetValue()}

err = stream.Send(reply)
if err != nil {
return err
}
}
}
```

服务端在循环中接收客户端发来的数据,如果遇到io.EOF表示客户端流被关闭,如果函数退出表示服务端流关闭。然后生成返回的数据通过流发送给客户端。需要主要的是,发送和接收的操作并不需要一一对应,用户可以根据真实场景进行组织代码。

客户端需要先调用Channel方法获取返回的流对象:

```go
stream, err := client.Channel(context.Background())
if err != nil {
log.Fatal(err)
}
```

在客户端我们将发送和接收操作放到两个独立的Goroutine。首先是向服务端发生数据:

```go
go func() {
for {
if err := stream.Send(&String{Value: "hi"}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}()
```

然后在循环中接收服务端返回的数据:

```go
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
```

这样就完成了完整的流接收和发生支持。


<!--
Publish
Watch
TODO
## 认证
TODO
<!--
入门/流/认证
Expand Down
83 changes: 75 additions & 8 deletions examples/ch4-04-grpc/basic/client/hello.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions examples/ch4-04-grpc/basic/client/hello.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ message String {

service HelloService {
rpc Hello (String) returns (String);

rpc Channel (stream String) returns (stream String);
}
51 changes: 51 additions & 0 deletions examples/ch4-04-grpc/basic/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"context"
"fmt"
"io"
"log"
"net"
"time"

"google.golang.org/grpc"
)
Expand All @@ -16,6 +18,26 @@ func (p *HelloServiceImpl) Hello(ctx context.Context, args *String) (*String, er
return reply, nil
}

func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
for {
log.Println(111)
args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

reply := &String{Value: "hello:" + args.GetValue()}

err = stream.Send(reply)
if err != nil {
return err
}
}
}

func main() {
go startGrpcServer()
doClientWork()
Expand Down Expand Up @@ -46,4 +68,33 @@ func doClientWork() {
log.Fatal(err)
}
fmt.Println(reply.GetValue())

stream, err := client.Channel(context.Background())
if err != nil {
log.Fatal(err)
}

go func() {
for {
if err := stream.Send(&String{Value: "hi"}); err != nil {
log.Fatal(err)
}

time.Sleep(time.Second)
}
}()

for {
log.Println(222)

reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}

fmt.Println(reply.GetValue())
}
}

0 comments on commit c3c6370

Please sign in to comment.