gRPC server 端请求响应处理分析

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"context"
"fmt"
"log"
"net"

"google.golang.org/grpc"

pb "github.com/zhiruchen/grpc-pool/example/app"
)

type app struct{}

func (a *app) Hello(ctx context.Context, req *pb.AppReq) (*pb.AppResp, error) {
return &pb.AppResp{
Msg: req.Hello + " My Friend ",
}, nil
}

func main() {
lis, err := net.Listen("tcp", ":8989")
if err != nil {
log.Println(err)
return
}

s := grpc.NewServer()
pb.RegisterAppServer(s, &app{})

if err = s.Serve(lis); err != nil {
fmt.Println("serve error: %v", err)
}
}

在main.go中主要做了三件事

  • 实现pb中的接口
  • 注册实现了接口的struct
  • 启动gRPC Server

pb接口

由protoc生成的pb中定义的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
type AppReq struct {
Hello string `protobuf:"bytes,1,opt,name=hello" json:"hello,omitempty"`
}

type AppResp struct {
Msg string `protobuf:"bytes,1,opt,name=msg" json:"msg,omitempty"`
}

// Server API for App service

type AppServer interface {
Hello(context.Context, *AppReq) (*AppResp, error)
}

注册实现了接口的struct app

1
2
s := grpc.NewServer()
pb.RegisterAppServer(s, &app{})

grpc.NewServer 主要是初始化了一个Server实例。

pb.RegisterAppServer 调用了Server的 RegisterService方法。
RegisterService 主要是将服务于实现注册到 gRPC Server.

如何注册实现

在pb.go文件中的RegisterAppServer
参数是grpc server实例s, 以及实现了AppServer接口的app
这个方法又调用server的RegisterService实现注册。
传入_App_serviceDesc参数。

1
2
3
func RegisterAppServer(s *grpc.Server, srv AppServer) {
s.RegisterService(&_App_serviceDesc, srv)
}

App_serviceDesc

1
2
3
4
5
6
7
8
9
10
11
12
var _App_serviceDesc = grpc.ServiceDesc{
ServiceName: "app.App",
HandlerType: (*AppServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Hello",
Handler: _App_Hello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "test.proto",
}

里面有一个_App_Hello_Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func _App_Hello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AppReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AppServer).Hello(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/app.App/Hello",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AppServer).Hello(ctx, req.(*AppReq))
}
return interceptor(ctx, in, info, handler)
}

grpc server 的RegisterService方法

1
2
3
4
5
6
7
8
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) { // 检查类型st有没有实现ht中的接口
grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
s.register(sd, ss)
}

register

在service的md中保存普通的rpc方法,在sd中保存流rpc方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
...
srv := &service{
server: ss,
md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}

// 遍历sd(也就是pb中定义的_App_serviceDesc)的Methods方法列表
// 将实现的方法名称和方法handler关联到srv.md
// 在这个例子就是srv.md["Hello"] = _App_Hello_Handler
// _App_Hello_Handler 中完成了对Server的Hello方法的调用
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}

// 关联service和srv, 也就是一个grpc server可以关联多个service
// 这个例子就是 s.m["app.App"] = srv
s.m[sd.ServiceName] = srv
...
}

s.Serve 方法

Serve方法接受来自lis上的连接,为每个连接创建一个ServerTransport和service goroutine
service goroutine 读取rpc 的请求,并调用register方法中注册的handler完成rpc调用请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for {
rawConn, err := lis.Accept()
...
// Start a new goroutine to deal with rawConn so we don't stall this Accept
// loop goroutine.
//
// Make sure we account for the goroutine so GracefulStop doesn't nil out
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}

handleRawConn 创建一个goroutine处理这个连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (s *Server) handleRawConn(rawConn net.Conn) {
...

// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}

rawConn.SetDeadline(time.Time{})
if !s.addConn(st) {
return
}
go func() {
s.serveStreams(st)
s.removeConn(st)
}()
}

serveStreams

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}

handleStream 根据method从srv.md中获取MethodDesc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
pos := strings.LastIndex(sm, "/")
...
service := sm[:pos]
method := sm[pos+1:]
srv, ok := s.m[service]
...
// Unary RPC or Streaming RPC?
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
...
}

processUnaryRPC 主要完成

  • 从stream中读取请求的payload
  • 创建用来解码payload的方法df
  • 调用注册的Handler, 在这个例子中如果Method是Hello,那handler就是: _App_Hello_Handler
  • 发送Handler的处理结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
// 读取payload
p := &parser{r: stream}
pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
}

...

// 解码方法df
df := func(v interface{}) error {...}

// 调用注册的Handler, 将消息解码方法传入Handler
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
...

// 发送响应
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {...}
}

sendResponse 主要完成对handler返回的reply编码,压缩最后写入stream中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
...

compData, err := compress(data, cp, comp)

...

err = t.Write(stream, hdr, payload, opts)
if err == nil && s.opts.statsHandler != nil {
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
}

return err
}

调用总结

启动main.go Server 端打印日志

1
2
3
4
5
6
7
8
9
INFO[0000] ----------------Create New gRPC Server---------- name=grpc 调用分析
INFO[0000] Created a New Server s: &{{<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 0 4194304 2147483647 false <nil> {0 0 0 0 0} {0 false} 0 0 32768 32768 120000000000 <nil>} {0 0} map[] map[] false false 0xc42002e2c0 map[] <nil> 0xc420098120 0xc420098180 {{0 0} 0} {{0 0} 0} {{0 0} 0} {{} [0 0 0 0 0 0 0 0 0 0 0 0] 0} 0 {{0 0} 0 0 0 0} 0 0 0 {0 0 <nil>}} name=grpc 调用分析
INFO[0000] ----------------Server:RegisterService-------------- name=grpc 调用分析
INFO[0000] Server:RegisterService: sd: &{app.App <nil> [{Hello 0x139c440}] [] test.proto}, ss: &{} name=grpc 调用分析
INFO[0000] Server:RegisterService: ht: app.AppServer, st: *main.app name=grpc 调用分析
INFO[0000] -----------Server:register---------- name=grpc 调用分析
INFO[0000] Server:register registered server to s.m: map[app.App:0xc4200a3c80] name=grpc 调用分析
INFO[0000] --------------------Server:Serve------------------- name=grpc 调用分析
INFO[0000] Server:Serve start for loop name=grpc 调用分析

客户端发送请求后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
INFO[3278] Server:Serve: 开启一个goroutine, 调用s.handleRawConn处理rawConn name=grpc 调用分析
INFO[3278] --------------------Server:handleRawConn---------------- name=grpc 调用分析
INFO[3278] Server:handleRawConn 调用s.serveStreams处理ServerTransport name=grpc 调用分析
INFO[3278] ------------------Server:serveStreams------------- name=grpc 调用分析
INFO[3278] Server:serveStreams 调用s.handleStream name=grpc 调用分析
INFO[3278] ------------------Server:handleStream---------------------- name=grpc 调用分析
INFO[3278] Server:handleStream: stream.Method: /app.App/Hello name=grpc 调用分析
INFO[3278] Server:handleStream: service: app.App name=grpc 调用分析
INFO[3278] Server:handleStream: method: Hello name=grpc 调用分析
INFO[3278] Server:handleStream: srv: &{0x174b6b8 map[Hello:0x171da70] map[] test.proto} name=grpc 调用分析
INFO[3278] Server:handleStream: 调用processUnaryRPC name=grpc 调用分析
INFO[3278] --------------------------------Server:processUnaryRPC------------------------------ name=grpc 调用分析
INFO[3278] Server:processUnaryRPC: 解析得到的req:
Hello name=grpc 调用分析
INFO[3278] Server:processUnaryRPC: 开始调用注册的Handler: 0x139c440 name=grpc 调用分析
INFO[3278] ------------------------proto:_App_Hello_Handler----------------- name=grpc 调用分析
INFO[3278] _App_Hello_Handler: 调用AppServer Hello name=grpc 调用分析
INFO[3278] -------------------app:Hello----------------------- name=grpc 调用分析
INFO[3278] Server:processUnaryRPC: 调用s.sendResponse发送reply: msg:"Hello My Friend " name=grpc 调用分析
INFO[3278] --------------------Server:sendResponse----------------- name=grpc 调用分析
INFO[3278] Server:sendResponse: 发送数据:
Hello My Friend name=grpc 调用分析

调用trace

grpc server