一个 gRPC server 启动主要有以下几行代码,如下一个简单的 gRPC server 就启动起来了,但其流程可不简单,简单的背后意味着封装,一行一行来分析。
func StartServer() {
lis, _ := net.Listen("tcp", "127.0.0.1:8090")
//创建一个grpc服务器对象
gRpcServer := grpc.NewServer()
pb.RegisterHelloServiceServer(gRpcServer, &impl.HelloServiceServer{})
//开启服务端
gRpcServer.Serve(lis)
}
下面一行代码表面上看很简单,创建了一个grpcServer实例,但是这个实例的参数以及入参的参数是非常多了,弄明白了这些参数的含义,后面的代码会好读很多。
gRpcServer := grpc.NewServer()
// NewServer creates a gRPC server which has no service registered and has not started to accept requests yet.
//NewServer创建一个grpc服务器,该服务器没有注册服务,还不能开始接收请求
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range opt {
o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service),
quit: grpcsync.NewEvent(),//退出事件
done: grpcsync.NewEvent(),//完成事件
czData: new(channelzData),
}
s.cv = sync.NewCond(&s.mu)
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server"))
}
if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
}
return s
}
上面的代码,总体四个步骤,当然,其中也有诸多细节,暂不深追究,后面会说到
-
接受入参参数切片,并把默认参数全部赋值到入参上
-
构造 Server 实例
-
判断是否 tracing (链路跟踪),IsOn (返回 channelz 数据收集是否打开)
-
返回 server 实例
type serverOptions struct {
creds credentials.TransportCredentials //cred证书
codec baseCodec //序列化和反序列化
cp Compressor //压缩接口
dc Decompressor //解压缩接口
unaryInt UnaryServerInterceptor //一元拦截器
streamInt StreamServerInterceptor //流拦截器
inTapHandle tap.ServerInHandle //见下面文档
statsHandler stats.Handler //见下面文档
maxConcurrentStreams uint32 //http2中最大的并发流个数
maxReceiveMessageSize int //最大接受消息大小
maxSendMessageSize int //最大发送消息大小
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters //长连接的server参数
keepalivePolicy keepalive.EnforcementPolicy
//初始化stream的Window大小,下限值是64K,上限2^31
initialWindowSize int32
//初始化conn大小,一个conn会有多个stream, 等于上面的值 * 16 ,http2的限 制是大于0,默认一个连接有100个流,超过了就被拒绝
initialConnWindowSize int32
writeBufferSize int //写缓冲大小
readBufferSize int //读缓冲大小
connectionTimeout time.Duration //连接超时时间
maxHeaderListSize *uint32 //最大头部大小
}
inTapHandle tap.ServerInHandle 定义在服务器端创建新流之前运行的函数,如果你定义的这个 ServerInhandler 返回非 nil 错误,则服务端不会创建流,并将一个 rst_stream 流发送给具有 refused_stream 的客户端,它旨在用于您不想服务端创建新的流浪费资源来接受新客户端的情况,比如微服务常见的限流功能,注意,他是在每个 conn 的协程中执行,而不是在每个 rpc 的协程中执行中执行。
statsHandler stats.Handler 这个接口中定义的方法主要是为统计做处理的,比如一次调用中的 rpc 和 conn,默认的实现有如下
ClientHandler //主要是Client端服务的(rpc,conn)跟踪和状态统计
ServerHandler //主要是Server端服务的(rpc,conn)跟踪和状态统计
statshandler //grpc测试用的
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts serverOptions //上面介绍的就是
mu sync.Mutex // guards following
lis map[net.Listener]bool //服务端的监听地址
//server transport是所有grpc服务器端传输实现的通用接口。
conns map[transport.ServerTransport]bool
serve bool //表示服务是否开启,在Serve()方法中赋值为true
drain bool //在调用GracefulStop(优雅的停止服务)方法被赋值为true
cv *sync.Cond // 当连接关闭以正常停止时发出信号
m map[string]*service // service name -> service info
events trace.EventLog //跟踪事件日志
quit *grpcsync.Event //同步退出事件
done *grpcsync.Event //同步完成事件
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup //counts active Serve goroutines for GracefulStop
channelzID int64 // channelz unique identification number
czData *channelzData //存储一些conn的自增id数据
}
我们看到最上面的 StartServer 代码中调用了 pb (proto buffer) 的代码,这是自动生成的,这个方法的作用要把 HelloService 的实现注册到 Server 上,我们看下面的代码,这个方法的入参有两个,一个是 NewServer 创建的 grpcServer 实例,一个是 HelloService 的实现类,然后调用 grpcServer 的 RegisterService 的方法。
func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) {
s.RegisterService(&_HelloService_serviceDesc, srv)
}
RegisterService 方法如下,registerservice 将服务及其实现注册到 grpc 服务器。它是从 idl (接口描述语言 Interface Description Lanauage)的代码中调用的。这必须是在调用 SERVE 方法之前调用。s.register(sd, ss) 方法最终是吧服务的名称的和服务的描述信息注册到上面 Server 中的 map[string]*service 中
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
grpclog.Fatalf("grpc: Server.RegisterService found")
}
//注册服务
s.register(sd, ss)
}
//删除了非核心代码
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
//构造服务的实例
srv := &service{
server: ss, md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc), mdata:sd.Metadata,
}
//放入方法
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
//放入流
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
s.m[sd.ServiceName] = srv
}
Server () 方法就正式开始监听客户端的连接,并开启协程处理客户端连接,方法核心步骤如下
-
加锁,初始化一些参数
-
defer 处理最后的资源情况
-
for 循环接受客户端的连接,每一个客户端的连接,开启一个协程处理
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.serve = true
s.serveWG.Add(1)
//优雅的停止服务
defer func() {
s.serveWG.Done()
if s.quit.HasFired() {
<-s.done.Done()
}
}()
//包装连接对象,并声明为true,代表有效
ls := &listenSocket{Listener: lis}
s.lis[ls] = true
s.mu.Unlock()
//清理资源
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()
//监听客户端连接
for {
rawConn, err := lis.Accept()
s.serveWG.Add(1)
//处理客户端请求
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}
我们可以看到 grpcServer 整体启动流程是非常清晰的,而且 go 的代码阅读起来也比较清爽,但命名上大多都是缩写,需要结合上下文以及注释来仔细观察,还是建议亲自阅读一下源码,这样记忆和理解也更深入,包括前面的一些基础练习也要自己实践,实践才是真理。