ETCD数据库源码分析——客户服务端通信serveCtx

网友投稿 493 2022-11-13

ETCD数据库源码分析——客户服务端通信serveCtx

ETCD数据库源码分析——客户服务端通信serveCtx

在etcd.go文件中的StartEtcd函数负责启动etcd的服务端,每个节点都会对外提供两组URL地址,一组是与集群中其他节点交互的URL地址(Peer URL),另一组是与客户端交互的URL地址(Client URL)。本篇博客就是讲解客户服务端通信,首先介绍一下服务端通信建立流程。

func StartEtcd(inCfg *Config) (e *Etcd, err error) { // 校验配置inCfg,其实就是config结构体的embed.Config类型的ec成员 serving := false // 标识是否正在提供服务 e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})} // 创建Etcd // 初始化Etcd.Peers字段,其中为每个Peer URL创建相应的Listener实例 if e.Peers, err = configurePeerListeners(cfg); err != nil { return e, err } // 1. 初始化Etcd.sctxs字段,其中为每个Client URL创建相应的Listener实例 if e.sctxs, err = configureClientListeners(cfg); err != nil { return e, err } // 2. 将sctxs字段中记录的Listener实例添加到Clients字段中 for _, sctx := range e.sctxs { e.Clients = append(e.Clients, sctx.l) } ... // 3. 服务端服务启动 if err = e.serveClients(); err != nil { return e, err }

为每个Client URL创建相应的Listener实例

​​func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error)​​函数会为每个Client URL地址创建相应的serveCtx实例,也就是该函数返回的sctxs map。在serveCtx实例中记录了相应的Listener实例和用户自定义Handler等信息。

// server/embed/serve.gotype serveCtx struct { lg *zap.Logger l net.Listener // Listener实例 addr string // 主机address或套接字路径 network string // "tcp" or "unix" secure bool // "or "unixs"设置为true insecure bool // 其他设置为true ctx context.Context cancel context.CancelFunc userHandlers map[string] // 用户自定义Handler serviceRegister func(*grpc.Server) serversC chan *servers // make(chan *servers, 2) 如果sctx.insecure,sctx.secure都为true,需要两个servers通道}type servers struct { secure bool grpc *grpc.Server *Url都创建一个serveCtx实例,并根据不同的系统和协议,对sctx.l中记录的Listerner进行调整,对sctx.userHandler设置用户自定义的Handler,对sctx.serviceRegister设置配置中的ServiceRegister。

func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { // 获取tls相关配置和certs(略) sctxs = make(map[string]*serveCtx) // 该map用来记录Client URL与对应serveCtx实例对应关系 for _, u := range cfg.LCUrls { // 每个Client Url都对应一个serveCtx实例 sctx := newServeCtx(cfg.logger) if u.Scheme == "|| u.Scheme == "unix" { // 通信协议scheme if !cfg.ClientTLSInfo.Empty() { cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String())) } if cfg.ClientTLSInfo.ClientCertAuth { cfg.logger.Warn("scheme is HTTP while --client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("client-url", u.String())) } } if (u.Scheme == "|| u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() { return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String()) } // 调整proto和addr network := "tcp" addr := u.Host if u.Scheme == "unix" || u.Scheme == "unixs" { network = "unix" addr = u.Host + u.Path } sctx-work = network sctx.secure = u.Scheme == "|| u.Scheme == "unixs" sctx.insecure = !sctx.secure if oldctx := sctxs[addr]; oldctx != nil { oldctx.secure = oldctx.secure || sctx.secure oldctx.insecure = oldctx.insecure || sctx.insecure continue } // 根据不同的系统和协议,对sctx.l中记录的Listerner进行调整 // 创建Listener,transport.NewListenerWithOpts一直定位到最后就是net包的listen if sctx.l, err = transport.NewListenerWithOpts(addr, u.Scheme, transport.WithSocketOpts(&cfg.SocketOpts), transport.WithSkipTLSInfoCheck(true),); err != nil { return nil, err } // net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking hosts that disable ipv6. So, use the address given by the user. sctx.addr = addr // 更新addr字段 // 如果etcd进程的文件描述符数量限制太低,需要使用Limit Listener if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil { if fdLimit <= reservedInternalFDNum { cfg.logger.Fatal("file descriptor limit of etcd process is too low; please set higher", zap.Uint64("limit", fdLimit), zap.Int("recommended-limit", reservedInternalFDNum),) } sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum)) } // 如果network是tcp,需要使用KeepAlive Listener if network == "tcp" { if sctx.l, err = transport.NewKeepAliveListener(sctx.l, network, nil); err != nil { return nil, err } } defer func(u url.URL) { if err == nil { return } sctx.l.Close() cfg.logger.Warn("closing peer listener", zap.String("address", u.Host), zap.Error(err),) }(u) // 对sctx.userHandler设置用户自定义的Handler for k := range cfg.UserHandlers { sctx.userHandlers[k] = cfg.UserHandlers[k] } // 对sctx.serviceRegister设置配置中的ServiceRegister sctx.serviceRegister = cfg.ServiceRegister sctxs[addr] = sctx } return sctxs, nil}

Listener实例添加到Clients字段中

将每个Client URL创建相应的Listener实例添加到Clients列表

for _, sctx := range e.sctxs { e.Clients = append(e.Clients, sctx.l) }

服务端服务启动

serveClients函数为每个监听地址启动一个客户服务端协程,首先需要初始化Handler实例、根据配置为grpc服务端生成不同的选项,最后为每个serveCtx启动一个客户服务端协程。

// server/embed/etcd.gofunc (e *Etcd) serveClients() (err error) { // Start a client server goroutine for each listen address var h // 初始化v2版本API使用的Handler实例 if e.Config().EnableV2 { // 配置文件中指定v2协议 if e.Config().V2DeprecationEffective().IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) { return fmt.Errorf("--enable-v2 and --v2-deprecation=%s are mutually exclusive", e.Config().V2DeprecationEffective()) } e.cfg.logger.Warn("Flag `enable-v2` is deprecated and will get removed in etcd 3.6.") if len(e.Config().ExperimentalEnableV2V3) > 0 { e.cfg.logger.Warn("Flag `experimental-enable-v2v3` is deprecated and will get removed in etcd 3.6.") srv := v2v3.NewServer(e.cfg.logger, v3client.New(e.Server), e.cfg.ExperimentalEnableV2V3) h = v2srv, e.Server.Cfg.ReqTimeout()) } else { // 注册完整的v2版本的Handler,可以正常响应Client v2的请求 h = v2e.Server, e.Server.Cfg.ReqTimeout()) } } else { // v3协议 mux := // Go语言中的ServeMux实现了Handler 接口,通过url找到对应的路由,然后在ServeHTTP中调用路由实现的ServeHTTP方法去真正处理对应的请求。 etcdmux, e.Server) // 只提供基本的查询功能 etcdmux, e.Server) h = mux } // 根据配置为grpc服务端生成不同的选项 gopts := []grpc.ServerOption{} if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) { gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: e.cfg.GRPCKeepAliveMinTime, PermitWithoutStream: false, }))} if e.cfg.GRPCKeepAliveInterval > time.Duration(0) && e.cfg.GRPCKeepAliveTimeout > time.Duration(0) { gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{ Time: e.cfg.GRPCKeepAliveInterval, Timeout: e.cfg.GRPCKeepAliveTimeout,}))} // start client servers in each goroutine 为每个serveCtx启动一个客户服务端协程 for _, sctx := range e.sctxs { go func(s *serveCtx) { e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) }(sctx) } return nil}

server/embed/server.go serve函数主要的功能就是启动客户服务端,serveCtx.serve方法在开始处就会阻塞监听该通道是否关闭,从而决定是否继续对外提供服务。

func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo, handler errHandler func(error), gopts ...grpc.ServerOption) (err error) { logger := defaultLog.New(ioutil.Discard, "etcd0) <-s.ReadyNotify() // 阻塞监听EtcdServer.readych通道,在EtcdServer的Start函数中新建的协程会调用publish函数关闭EtcdServer.readych通道 sctx.lg.Info("ready to serve client requests") m := cmux.New(sctx.l) // soheilhy/cmux 是一个不错的选择用于同一个端口进行不同的操作还是很有用的,比如一个端口同时提供ssh,服务 v3c := v3client.New(s) servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) var gs *grpc.Server defer func() { if err != nil && gs != nil { gs.Stop() } }() if sctx.insecure { gs = v3rpc.Server(s, nil, nil, gopts...) // 在其中会完成GRPC服务的注册 v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { sctx.serviceRegister(gs) } grpcl := m.Match(cmux.HTTP2()) go func() { errHandler(gs.Serve(grpcl)) }() // 在单独后台的goroutine中启动GRPC服务 var gwmux *gw.ServeMux if s.Cfg.EnableGRPCGateway { gwmux, err = sctx.registerGateway([]grpc.DialOption{grpc.WithInsecure()}) if err != nil { return err } } := sctx.createMux(gwmux, handler) srv:= & Handler: createAccessController(sctx.lg, s, ErrorLog: logger, // do not log user error } := m.Match(cmux.HTTP1()) go func() { errHandler(srv}() // 在单独的后台goroutine中启动一个V2请求 sctx.serversC <- &servers{grpc: gs, srv // 将注册之后的grpc.Server和 sctx.lg.Info( "serving client traffic insecurely; this is strongly discouraged!", zap.String("address", sctx.l.Addr().String()), ) } if sctx.secure { // "or "unixs"设置为true tlscfg, tlsErr := tlsinfo.ServerConfig() if tlsErr != nil { return tlsErr } gs = v3rpc.Server(s, tlscfg, nil, gopts...) // 在其中会完成GRPC服务的注册 v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) if sctx.serviceRegister != nil { sctx.serviceRegister(gs) } handler = grpcHandlerFunc(gs, handler) var gwmux *gw.ServeMux if s.Cfg.EnableGRPCGateway { dtls := tlscfg.Clone() // trust local server dtls.InsecureSkipVerify = true bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls}) opts := []grpc.DialOption{grpc.WithTransportCredentials(bundle.TransportCredentials())} gwmux, err = sctx.registerGateway(opts) if err != nil { return err } } var tlsl net.Listener tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) if err != nil { return err } // TODO: add debug flag; enable logging when debug flag is set := sctx.createMux(gwmux, handler) srv := & Handler: createAccessController(sctx.lg, s, TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } go func() { errHandler(srv.Serve(tlsl)) }() sctx.serversC <- &servers{secure: true, grpc: gs, srv} sctx.lg.Info("serving client traffic securely", zap.String("address", sctx.l.Addr().String()), ) } close(sctx.serversC) // 关闭sctx.serversC通道 return m.Serve() }

上面函数中启动的协程外层的函数就是errHandler,用于监听Etcd的stopc通道,以及向Etcd的errc通道中写入协程执行函数的错误error。

func (e *Etcd) errHandler(err error) { select { case <-e.stopc: return default: } select { case <-e.stopc: case e.errc <- err: }}

关闭服务端

Etcd结构体的Close函数优雅地关闭所有服务器/侦听器,客户端请求将因请求超时而终止,超时后,强制残留请求立即关闭。如下所示的第388行的代码,从serveCtx的serversC通道里读取Server结构体指针,调用stopServers函数关闭server,调用cancel回调函数,遍历Clients中存放的Listener实例。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:从iRedMail 创建web服务学习Nginx
下一篇:Scrapy学习笔记-Anaconda下安装
相关文章

 发表评论

暂时没有评论,来抢沙发吧~