用Golang构建gRPC

gRPC 是什么?

gRPC是Google公司基于Protobuf开发的跨语言的开源RPC框架。gRPC基于HTTP/2协议设计,可以基于一个HTTP/2链接提供多个服务,对于移动设备更加友好。

在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得您能够更容易地创建分布式应用和服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。

调用过程如下:

  • 客户端(gRPC Sub)调用 A 方法,发起 RPC 调用

  • 对请求信息使用 Protobuf 进行对象序列化压缩(IDL)

  • 服务端(gRPC Server)接收到请求后,解码请求体,进行业务逻辑处理并返回

  • 对响应结果使用 Protobuf 进行对象序列化压缩(IDL)

  • 客户端接受到服务端响应,解码请求体。回调被调用的 A 方法,唤醒正在等待响应(阻塞)的客户端调用并返回响应结果

Go语言的gRPC技术栈如图所示:

最底层为TCP或Unix Socket协议,在此之上是HTTP/2协议的实现,然后在HTTP/2协议之上又构建了针对Go语言的gRPC核心库。应用程序通过gRPC插件生产的Stub代码和gRPC核心库通信,也可以直接和gRPC核心库通信。

服务的定义

gRPC 基于如下思想:定义一个服务, 指定其可以被远程调用的方法及其参数和返回类型。

RPC是远程函数调用,因此每次调用的函数参数和返回值不能太大,否则将严重影响每次调用的响应时间。因此传统的RPC方法调用对于上传和下载较大数据量场景并不适合。同时传统RPC模式也不适用于对时间不确定的订阅和发布模式。为此,gRPC框架针对服务器端和客户端分别提供了流特性。

单项 RPC

客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用。

1
rpc SayHello(HelloRequest) returns (HelloResponse){}

服务端流式 RPC

客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。

1
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){}

客户端流式 RPC

客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。

1
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) {}

双向流式 RPC

客户端服务器两端都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。

1
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){}

生成服务端客户端接口

如果从 Protobuf 的角度看,gRPC 只不过是一个针对 service 接口生成代码的生成器。

我们先来实现一个简单的服务,新建一个.proto文件hello.proto,定义HelloService接口:

1
2
3
4
5
6
7
8
9
10
11
syntax = "proto3";

package hellogrpc;

message String {
string value = 1;
}

service HelloService {
rpc Hello (String) returns (String);
}

我们需要从 .proto 的服务定义中生成 gRPC 客户端和服务器端的接口,使用protoc-gen-go内置的gRPC插件来生成gRPC代码:

1
protoc --go_out=plugins=grpc:. hello.proto

运行这个命令可以在当前目录中生成下面的文件:

1
hello.pb.go

这个文件包括:

  • 所有用于填充,序列化和获取我们请求和响应消息类型的 protocol buffer 代码
  • 一个为客户端调用定义在RouteGuide服务的方法的接口类型(或者 存根
  • 一个为服务器使用定义在RouteGuide服务的方法去实现的接口类型(或者 存根

gRPC插件会为服务端和客户端生成不同的接口:

1
2
3
4
5
6
7
type HelloServiceClient interface {
Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
}

type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
}

gRPC通过context.Context参数,为每个方法调用提供了上下文支持。客户端在调用方法的时候,可以通过可选的grpc.CallOption类型的参数提供额外的上下文信息。

创建gRPC服务器

基于服务端的HelloServiceServer接口可以重新实现HelloService服务,比如我们创建一个HelloService结构体,这是我们自己定义的结构,然后我们实现HelloServiceServer接口:

1
2
3
4
5
6
7
type HelloService struct {}

func (p *HelloService) Hello(ctx context.Context, args *hellogrpc.String) (*hellogrpc.String, error) {
reply := &hellogrpc.String{Value: "hello" + args.GetValue()}

return reply, nil
}

然后我们运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。

1
2
3
4
5
6
7
8
9
10
11
func main() {
grpcServer := grpc.NewServer()
hellogrpc.RegisterHelloServiceServer(grpcServer, new(HelloService))

listener, err := net.Listen("tcp", ":1107")
if err != nil {
log.Fatal(err)
}

_ = grpcServer.Serve(listener)
}

这里我们通过grpc.NewServer()构造一个gRPC服务对象,然后通过gRPC插件生成的RegisterHelloServiceServer函数注册我们实现的HelloServiceImpl服务。然后通过grpcServer.Serve(listener)在一个监听端口上提供gRPC服务。即:

  1. 使用 lis, err := net.Listen("tcp", ":1107") 指定我们期望客户端请求的监听端口。
  2. 使用grpc.NewServer()创建 gRPC 服务器的一个实例。
  3. 在 gRPC 服务器注册我们的服务实现。
  4. 用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待。

现在我们就把客户端搭起来了,简单吧!那我们开始创建客户端吧!

创建gRPC客户端

现在我们通过客户端来连接grpc服务了!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
conn, err := grpc.Dial("localhost:1107", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()

client := hellogrpc.NewHelloServiceClient(conn)
reply, err := client.Hello(context.Background(), &hellogrpc.String{Value: " grpc"})
if err != nil {
log.Fatal(err)
}

fmt.Println(reply)
}

grpc.Dial负责和gRPC服务建立链接,然后NewHelloServiceClient函数基于已经建立的链接构造HelloServiceClient对象。返回的client其实是一个HelloServiceClient接口对象,通过接口定义的方法就可以调用服务端对应的gRPC服务提供的方法。

gRPC和标准库的RPC框架有一个区别,gRPC生成的接口并不支持异步调用。不过我们可以在多个Goroutine之间安全地共享gRPC底层的HTTP/2链接,因此可以通过在另一个Goroutine阻塞调用的方式模拟异步调用。

这样,我们就完成了简单的grpc调用了

gRPC流

RPC是远程函数调用,因此每次调用的函数参数和返回值不能太大,否则将严重影响每次调用的响应时间。因此传统的RPC方法调用对于上传和下载较大数据量场景并不适合。同时传统RPC模式也不适用于对时间不确定的订阅和发布模式。为此,gRPC框架针对服务器端和客户端分别提供了流特性。

为什么要有流式grpc呢?

  • 数据包过大造成的瞬时压力
  • 接收数据包时,需要所有数据包都接受成功且正确后,才能够回调响应,进行业务处理(无法客户端边发送,服务端边处理)

就像上面我们介绍的那样,流分为客户端流服务器流双向流。服务端或客户端的单向流是双向流的特例,我们在HelloService增加一个支持双向流的Channel方法:

1
2
3
4
5
service HelloService {
rpc Hello (String) returns (String);

rpc Channel (stream String) returns (stream String);
}

关键字stream指定启用流特性,参数部分是接收客户端参数的流,返回值是返回给客户端的流。

重新生成hello.pb.go文件可以看到接口中新增加的Channel方法的定义:

1
2
3
4
5
6
7
8
9
type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
Channel(HelloService_ChannelServer) error
}

type HelloServiceClient interface {
Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
Channel(ctx context.Context, opts ...grpc.CallOption) (HelloService_ChannelClient, error)
}

在服务端的Channel方法参数是一个新的HelloService_ChannelServer类型的参数,可以用于和客户端双向通信。客户端的Channel方法返回一个HelloService_ChannelClient类型的返回值,可以用于和服务端进行双向通信。

我们来看一下HelloService_ChannelServer和HelloService_ChannelClient,它们是接口:

1
2
3
4
5
6
7
8
9
10
11
type HelloService_ChannelClient interface {
Send(*String) error
Recv() (*String, error)
grpc.ClientStream
}

type HelloService_ChannelServer interface {
Send(*String) error
Recv() (*String, error)
grpc.ServerStream
}

可以发现服务端和客户端的流辅助接口均定义了Send和Recv方法用于流数据的双向通信。

那我们来实现流的服务吧!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (p *HelloService) Channel(stream hellogrpc.HelloService_ChannelServer) error {
for {
args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

reply := &hellogrpc.String{Value: "hello " + args.GetValue()}

err = stream.Send(reply)
if err != nil {
return err
}
}
}

服务端在循环中接收客户端发来的数据,如果遇到io.EOF表示客户端流被关闭,如果函数退出表示服务端流关闭。生成返回的数据通过流发送给客户端,双向流数据的发送和接收都是完全独立的行为。需要注意的是,发送和接收的操作并不需要一一对应,用户可以根据真实场景进行组织代码。

这里我们就实现了带有流功能的grpc服务端了,现在我们来看一下客户端怎么向服务端流发送和接收数据吧!

客户端需要先调用Channel方法获取返回的流对象:

1
stream, err := client.Channel(context.Background())

在客户端我们将发送和接收操作放到两个独立的Goroutine。首先是向服务端发送数据:

1
2
3
4
5
6
7
8
9
go func() {
for {
num := rand.Intn(100)
if err := stream.Send(&hellogrpc.String{Value: "grpc" + strconv.Itoa(num)}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}()

发送数据使用Send方法,这里我们每隔一秒发送一次数据。

然后在循环中接收服务端返回的数据:

1
2
3
4
5
6
7
8
9
10
11
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}

fmt.Println(reply.GetValue())
}

这样就完成了完整的流接收和发送支持。怎么样,简单吧!

发布和订阅模式

发布订阅是一个常见的设计模式,开源社区中已经存在很多该模式的实现。其中docker项目中提供了一个pubsub的极简实现,下面是基于pubsub包实现的本地发布订阅代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import (
"github.com/docker/docker/pkg/pubsub"
)

func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)

golang := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "golang:") {
return true
}
}
return false
})
docker := p.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "docker:") {
return true
}
}
return false
})

go p.Publish("hi")
go p.Publish("golang: https://golang.org")
go p.Publish("docker: https://www.docker.com/")
time.Sleep(1)

go func() {
fmt.Println("golang topic:", <-golang)
}()
go func() {
fmt.Println("docker topic:", <-docker)
}()

<-make(chan bool)
}

其中pubsub.NewPublisher构造一个发布对象,p.SubscribeTopic()可以通过函数筛选感兴趣的主题进行订阅。

现在尝试基于gRPC和pubsub包,提供一个跨网络的发布和订阅系统。首先通过Protobuf定义一个发布订阅服务接口:

1
2
3
4
5
service PubSubService {
rpc Publish (String) returns (String);

rpc Subscribe (String) returns (stream String);
}

其中Publish是普通的RPC方法,Subscribe则是一个单向的流服务。然后gRPC插件会为服务端和客户端生成对应的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type PubsubServiceServer interface {
Publish(context.Context, *String) (*String, error)
Subscribe(*String, PubsubService_SubscribeServer) error
}
type PubsubServiceClient interface {
Publish(context.Context, *String, ...grpc.CallOption) (*String, error)
Subscribe(context.Context, *String, ...grpc.CallOption) (
PubsubService_SubscribeClient, error,
)
}

type PubsubService_SubscribeServer interface {
Send(*String) error
grpc.ServerStream
}

因为Subscribe是服务端的单向流,因此生成的HelloService_SubscribeServer接口中只有Send方法。

然后就可以实现发布和订阅服务了:

1
2
3
4
5
6
7
8
9
type PubsubService struct {
pub *pubsub.Publisher
}

func NewPubsubService() *PubsubService {
return &PubsubService{
pub: pubsub.NewPublisher(100*time.Millisecond, 10),
}
}

然后是实现发布方法和订阅方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (p *PubsubService) Publish(
ctx context.Context, arg *String,
) (*String, error) {
p.pub.Publish(arg.GetValue())
return &String{}, nil
}

func (p *PubsubService) Subscribe(
arg *String, stream PubsubService_SubscribeServer,
) error {
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key,arg.GetValue()) {
return true
}
}
return false
})

for v := range ch {
if err := stream.Send(&String{Value: v.(string)}); err != nil {
return err
}
}

return nil
}

这样就可以从客户端向服务器发布信息了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()

client := NewPubsubServiceClient(conn)

_, err = client.Publish(
context.Background(), &String{Value: "golang: hello Go"},
)
if err != nil {
log.Fatal(err)
}
_, err = client.Publish(
context.Background(), &String{Value: "docker: hello Docker"},
)
if err != nil {
log.Fatal(err)
}
}

然后就可以在另一个客户端进行订阅信息了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()

client := NewPubsubServiceClient(conn)
stream, err := client.Subscribe(
context.Background(), &String{Value: "golang:"},
)
if err != nil {
log.Fatal(err)
}

for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}

fmt.Println(reply.GetValue())
}
}

到此我们就基于gRPC简单实现了一个跨网络的发布和订阅服务!

gRPC Deadline

我们试想一下这个场景,如果客户端发生了阻塞,就会造成大量正在进行的请求都会被保留,并且所有请求都有可能达到最大超时,这会使服务面临资源耗尽的风险,例如内存,这会增加服务的延迟,或者在最坏的情况下可能导致整个进程崩溃,当未设置 Deadlines 时,将采用默认的 DEADLINE_EXCEEDED(这个时间非常大)。

因此,我们要为客户端添加超时时间限制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
conn, err := grpc.Dial("localhost:1107", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()

d := time.Now().Add(5 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()

client := grpcDeadline.NewHelloServiceClient(conn)

stream, err := client.Channel(ctx)

...
}

对于 context 上下文管理,大家可以看我的一篇博客,这里就不详细解释 context 的用法了,这里我们设置了5秒的超时时间,如果 client 超过5秒没有返回,它就会通知 server 端的方法终止这次调用。

我们来看看 server 端的方法是怎么接受的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (p *HelloService) Channel(stream grpcDeadline.HelloService_ChannelServer) error {
for {
if stream.Context().Err() == context.Canceled {
return status.Errorf(codes.Canceled, "client context is canceled")
}

args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

reply := &grpcDeadline.String{Value: "hello " + args.GetValue()}

err = stream.Send(reply)
if err != nil {
return err
}
}
}

这里是判断 ctx.Err() 是否为 context.Canceled来终止上下文的,我们也可以使用select

1
2
3
4
5
6
7
select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-stream.Context().Done():
fmt.Println(stream.Context().Err())
return status.Errorf(codes.Canceled, "client context is canceled")
}

以上就是gRPC的基础,需要好好学习消化~


参考资料:

Go语言高级编程(Advanced Go Programming)

gRPC - 官方文档

gRPC - 中文文档

欢迎关注我的其它发布渠道