gRPC server 端请求响应处理分析 main.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package mainimport ( "context" "fmt" "log" "net" "google.golang.org/grpc" pb "github.com/zhiruchen/grpc-pool/example/app" ) type app struct {}func (a *app) Hello (ctx context.Context, req *pb.AppReq) (*pb.AppResp, error) { return &pb.AppResp{ Msg: req.Hello + " My Friend " , }, nil } func main () { lis, err := net.Listen("tcp" , ":8989" ) if err != nil { log.Println(err) return } s := grpc.NewServer() pb.RegisterAppServer(s, &app{}) if err = s.Serve(lis); err != nil { fmt.Println("serve error: %v" , err) } }
在main.go中主要做了三件事
实现pb 中的接口
注册实现了接口的struct
启动gRPC Server
pb接口 由protoc生成的pb中定义的接口
1 2 3 4 5 6 7 8 9 10 11 12 13 type AppReq struct { Hello string `protobuf:"bytes,1,opt,name=hello" json:"hello,omitempty"` } type AppResp struct { Msg string `protobuf:"bytes,1,opt,name=msg" json:"msg,omitempty"` } type AppServer interface { Hello(context.Context, *AppReq) (*AppResp, error) }
注册实现了接口的struct app 1 2 s := grpc.NewServer() pb.RegisterAppServer(s, &app{})
grpc.NewServer 主要是初始化了一个Server实例。
pb.RegisterAppServer 调用了Server的 RegisterService方法。 RegisterService 主要是将服务于实现注册到 gRPC Server.
如何注册实现 在pb.go文件中的RegisterAppServer 参数是grpc server实例s, 以及实现了AppServer接口的app 这个方法又调用server的RegisterService实现注册。 传入_App_serviceDesc参数。
1 2 3 func RegisterAppServer (s *grpc.Server, srv AppServer) { s.RegisterService(&_App_serviceDesc, srv) }
App_serviceDesc
1 2 3 4 5 6 7 8 9 10 11 12 var _App_serviceDesc = grpc.ServiceDesc{ ServiceName: "app.App" , HandlerType: (*AppServer)(nil ), Methods: []grpc.MethodDesc{ { MethodName: "Hello" , Handler: _App_Hello_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "test.proto" , }
里面有一个_App_Hello_Handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func _App_Hello_Handler (srv interface {}, ctx context.Context, dec func (interface {}) error , interceptor grpc .UnaryServerInterceptor ) (interface {}, error) { in := new (AppReq) if err := dec(in); err != nil { return nil , err } if interceptor == nil { return srv.(AppServer).Hello(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/app.App/Hello" , } handler := func (ctx context.Context, req interface {}) (interface {}, error) { return srv.(AppServer).Hello(ctx, req.(*AppReq)) } return interceptor(ctx, in, info, handler) }
grpc server 的RegisterService方法
1 2 3 4 5 6 7 8 func (s *Server) RegisterService (sd *ServiceDesc, ss interface {}) { ht := reflect.TypeOf(sd.HandlerType).Elem() st := reflect.TypeOf(ss) if !st.Implements(ht) { grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v" , st, ht) } s.register(sd, ss) }
register
在service的md中保存普通的rpc方法,在sd中保存流rpc方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (s *Server) register (sd *ServiceDesc, ss interface {}) { ... srv := &service{ server: ss, md: make (map [string ]*MethodDesc), sd: make (map [string ]*StreamDesc), mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] srv.md[d.MethodName] = d } for i := range sd.Streams { d := &sd.Streams[i] srv.sd[d.StreamName] = d } s.m[sd.ServiceName] = srv ... }
s.Serve 方法 Serve方法接受来自lis上的连接,为每个连接创建一个ServerTransport和service goroutine service goroutine 读取rpc 的请求,并调用register方法中注册的handler完成rpc调用请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 for { rawConn, err := lis.Accept() ... s.serveWG.Add(1 ) go func () { s.handleRawConn(rawConn) s.serveWG.Done() }() }
handleRawConn 创建一个goroutine处理这个连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (s *Server) handleRawConn (rawConn net.Conn) { ... st := s.newHTTP2Transport(conn, authInfo) if st == nil { return } rawConn.SetDeadline(time.Time{}) if !s.addConn(st) { return } go func () { s.serveStreams(st) s.removeConn(st) }() }
serveStreams
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (s *Server) serveStreams (st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup st.HandleStreams(func (stream *transport.Stream) { wg.Add(1 ) go func () { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() }, func (ctx context.Context, method string ) context .Context { if !EnableTracing { return ctx } tr := trace.New("grpc.Recv." +methodFamily(method), method) return trace.NewContext(ctx, tr) }) wg.Wait() }
handleStream 根据method从srv.md中获取MethodDesc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (s *Server) handleStream (t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { sm := stream.Method() if sm != "" && sm[0 ] == '/' { sm = sm[1 :] } pos := strings.LastIndex(sm, "/" ) ... service := sm[:pos] method := sm[pos+1 :] srv, ok := s.m[service] ... if md, ok := srv.md[method]; ok { s.processUnaryRPC(t, stream, srv, md, trInfo) return } if sd, ok := srv.sd[method]; ok { s.processStreamingRPC(t, stream, srv, sd, trInfo) return } ... }
processUnaryRPC 主要完成
从stream中读取请求的payload
创建用来解码payload的方法df
调用注册的Handler, 在这个例子中如果Method是Hello,那handler就是: _App_Hello_Handler
发送Handler的处理结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (s *Server) processUnaryRPC (t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { p := &parser{r: stream} pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize) if err == io.EOF { return err } ... df := func (v interface {}) error {...} ctx := NewContextWithServerTransportStream(stream.Context(), stream) reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) ... if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {...} }
sendResponse 主要完成对handler返回的reply编码,压缩最后写入stream中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (s *Server) sendResponse (t transport.ServerTransport, stream *transport.Stream, msg interface {}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { data, err := encode(s.getCodec(stream.ContentSubtype()), msg) ... compData, err := compress(data, cp, comp) ... err = t.Write(stream, hdr, payload, opts) if err == nil && s.opts.statsHandler != nil { s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false , msg, data, payload, time.Now())) } return err }
调用总结 启动main.go Server 端打印日志
1 2 3 4 5 6 7 8 9 INFO[0000 ] ----------------Create New gRPC Server---------- name=grpc 调用分析 INFO[0000 ] Created a New Server s: &{{<nil > <nil > <nil > <nil > <nil > <nil > <nil > <nil > 0 4194304 2147483647 false <nil > {0 0 0 0 0 } {0 false } 0 0 32768 32768 120000000000 <nil >} {0 0 } map [] map [] false false 0xc42002e2c0 map [] <nil > 0xc420098120 0xc420098180 {{0 0 } 0 } {{0 0 } 0 } {{0 0 } 0 } {{} [0 0 0 0 0 0 0 0 0 0 0 0 ] 0 } 0 {{0 0 } 0 0 0 0 } 0 0 0 {0 0 <nil >}} name=grpc 调用分析 INFO[0000 ] ----------------Server:RegisterService-------------- name=grpc 调用分析 INFO[0000 ] Server:RegisterService: sd: &{app.App <nil > [{Hello 0x139c440 }] [] test.proto}, ss: &{} name=grpc 调用分析 INFO[0000 ] Server:RegisterService: ht: app.AppServer, st: *main.app name=grpc 调用分析 INFO[0000 ] -----------Server:register---------- name=grpc 调用分析 INFO[0000 ] Server:register registered server to s.m: map [app.App:0xc4200a3c80 ] name=grpc 调用分析 INFO[0000 ] --------------------Server:Serve------------------- name=grpc 调用分析 INFO[0000 ] Server:Serve start for loop name=grpc 调用分析
客户端发送请求后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 INFO[3278 ] Server:Serve: 开启一个goroutine, 调用s.handleRawConn处理rawConn name=grpc 调用分析 INFO[3278 ] --------------------Server:handleRawConn---------------- name=grpc 调用分析 INFO[3278 ] Server:handleRawConn 调用s.serveStreams处理ServerTransport name=grpc 调用分析 INFO[3278 ] ------------------Server:serveStreams------------- name=grpc 调用分析 INFO[3278 ] Server:serveStreams 调用s.handleStream name=grpc 调用分析 INFO[3278 ] ------------------Server:handleStream---------------------- name=grpc 调用分析 INFO[3278 ] Server:handleStream: stream.Method: /app.App/Hello name=grpc 调用分析 INFO[3278 ] Server:handleStream: service: app.App name=grpc 调用分析 INFO[3278 ] Server:handleStream: method: Hello name=grpc 调用分析 INFO[3278 ] Server:handleStream: srv: &{0x174b6b8 map [Hello:0x171d a70] map [] test.proto} name=grpc 调用分析 INFO[3278 ] Server:handleStream: 调用processUnaryRPC name=grpc 调用分析 INFO[3278 ] --------------------------------Server:processUnaryRPC------------------------------ name=grpc 调用分析 INFO[3278 ] Server:processUnaryRPC: 解析得到的req: Hello name=grpc 调用分析 INFO[3278 ] Server:processUnaryRPC: 开始调用注册的Handler: 0x139c440 name=grpc 调用分析 INFO[3278 ] ------------------------proto:_App_Hello_Handler----------------- name=grpc 调用分析 INFO[3278 ] _App_Hello_Handler: 调用AppServer Hello name=grpc 调用分析 INFO[3278 ] -------------------app:Hello----------------------- name=grpc 调用分析 INFO[3278 ] Server:processUnaryRPC: 调用s.sendResponse发送reply: msg:"Hello My Friend " name=grpc 调用分析 INFO[3278 ] --------------------Server:sendResponse----------------- name=grpc 调用分析 INFO[3278 ] Server:sendResponse: 发送数据: Hello My Friend name=grpc 调用分析
调用trace