go-zero的服务发现源码阅读
后台-插件-广告管理-内容页头部广告(手机) |
服务发现原理与grpc源码解析_wangxiaoangg的博客-CSDN博客
go-zero rpc demo官方文档:rpc编写与调用 | go-zero
目录
一 服务注册
1. 创建rpc服务
2. 启动rpc服务
3. registerEtcd做了什么
4. discov.NewPublisher 服务发布者
二 服务发现
1.定义&注册resolver
2.解析etcd地址&创建链接
3.update方法
一 服务注册
在看rpc服务端服务注册前,可以先看下go-zero的官方的 user rpc服务 demo。
在rpc的配置文件中配置了Etcd信息,以及服务对应的key,如下:user.yaml
- Name: user.rpc
- ListenOn: 127.0.0.1:8080
- Etcd:
- Hosts:
- - $etcdHost
- Key: user.rpc
1. 创建rpc服务
创建rpc服务调用了 zrpc/internal/rpcpubserver.go 中 NewRpcPubServer方法。
该方法返回一个server对象,并将registerEtcd方法注入到该sever。
- // NewRpcPubServer returns a Server.
- func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, middlewares ServerMiddlewaresConf,
- opts ...ServerOption) (Server, error) {
- registerEtcd := func() error {
- pubListenOn := figureOutListenOn(listenOn)
- var pubOpts []discov.PubOption
- if etcd.HasAccount() {
- pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))
- }
- if etcd.HasTLS() {
- pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
- etcd.CACertFile, etcd.InsecureSkipVerify))
- }
- if etcd.HasID() {
- pubOpts = append(pubOpts, discov.WithId(etcd.ID))
- }
- pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
- return pubClient.KeepAlive()
- }
- server := keepAliveServer{
- registerEtcd: registerEtcd,
- Server: NewRpcServer(listenOn, middlewares, opts...),
- }
- return server, nil
- }
2. 启动rpc服务
在启动Server的时候,调用Start方法,在Start方法中会调用registerEtcd进行真正的服务注册。
go-zerozrpc/internal/rpcpubserver.go
- type keepAliveServer struct {
- registerEtcd func() error
- Server
- }
- func (s keepAliveServer) Start(fn RegisterFn) error {
- if err := s.registerEtcd(); err != nil {
- return err
- }
- return s.Server.Start(fn)
- }
3. registerEtcd做了什么
- registerEtcd := func() error {
- //解析服务监听的地址
- pubListenOn := figureOutListenOn(listenOn)
- var pubOpts []discov.PubOption
- //etcd的链接方式
- if etcd.HasAccount() {
- pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))
- }
- if etcd.HasTLS() {
- pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
- etcd.CACertFile, etcd.InsecureSkipVerify))
- }
- if etcd.HasID() {
- pubOpts = append(pubOpts, discov.WithId(etcd.ID))
- }
- //新建puslisher
- pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
- //异步etcd 保活
- return pubClient.KeepAlive()
- }
4. discov.NewPublisher 服务发布者
代码路径core/discov/publisher.go
在KeepAlive方法中,
1.首先创建etcd连接,
2.用register方法进行服务注册。
3.register创建租约,租约默认时间为10秒钟
4.最后通过Put方法进行注册。
5.调用 keepAliveAsync 进行租约的续期,保证服务一直是存活的状态,如果服务异常退出了,那么也就无法进行续期,服务发现也就能自动识别到该服务异常下线了。
- // KeepAlive keeps key:value alive.
- func (p *Publisher) KeepAlive() error {
- cli, err := p.doRegister()
- if err != nil {
- return err
- }
- proc.AddWrapUpListener(func() {
- p.Stop()
- })
- return p.keepAliveAsync(cli)
- }
- func (p *Publisher) doRegister() (internal.EtcdClient, error) {
- //链接etcd
- cli, err := internal.GetRegistry().GetConn(p.endpoints)
- if err != nil {
- return nil, err
- }
- p.lease, err = p.register(cli)
- return cli, err
- }
- func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
- //创建租约
- resp, err := client.Grant(client.Ctx(), TimeToLive)
- if err != nil {
- return clientv3.NoLease, err
- }
- lease := resp.ID
- if p.id > 0 {
- p.fullKey = makeEtcdKey(p.key, p.id)
- } else {
- p.fullKey = makeEtcdKey(p.key, int64(lease))
- }
- //put key 注册
- _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
- return lease, err
- }
- //异步续租 保活
- func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
- ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
- if err != nil {
- return err
- }
- threading.GoSafe(func() {
- for {
- select {
- case _, ok := <-ch:
- if !ok {
- p.revoke(cli)
- if err := p.doKeepAlive(); err != nil {
- logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
- }
- return
- }
- case <-p.pauseChan:
- logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
- p.revoke(cli)
- select {
- case <-p.resumeChan:
- if err := p.doKeepAlive(); err != nil {
- logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
- }
- return
- case <-p.quit.Done():
- return
- }
- case <-p.quit.Done():
- p.revoke(cli)
- return
- }
- }
- })
- return nil
- }
二 服务发现
前面的已经介绍了,rpc服务启动时候是如何将服务注册到etcd中的。
在rpc的服务调用方 配置服务提供方的Etcd信息,以及服务对应的key,如下:user.yaml
- Name: search-api
- Host: 0.0.0.0
- Port: 8889
- Auth:
- AccessSecret: $AccessSecret
- AccessExpire: $AccessExpire
- UserRpc:
- Etcd:
- Hosts:
- - $etcdHost
- Key: user.rpc
1.定义&注册resolver
go-zero的服务发现是在客户端实现的。在创建zRPC客户端的时候,通过init方法进行了自定义Resolver的注册。
go-zero/zrpc/internal/client.go
- func init() {
- resolver.Register()
- }
zrpc/resolver/internal/resolver.go
- // RegisterResolver registers the direct and discov schemes to the resolver.
- func RegisterResolver() {
- resolver.Register(&directResolverBuilder)
- resolver.Register(&discovResolverBuilder)
- resolver.Register(&etcdResolverBuilder)
- resolver.Register(&k8sResolverBuilder)
- }
gozero注册了四个revlover builder 这里我们只看etcd reslover。
2.解析etcd地址&创建链接
首先从target中解析出etcd的地址,和服务对应的key。然后创建etcd连接,接着执行update方法,在update方法中,通过调用cc.UpdateState方法进行服务状态的更新。
zrpc/resolver/internal/discovbuilder.go
- func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
- resolver.Resolver, error) {
- hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {
- return r == EndpointSepChar
- })
- sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))
- if err != nil {
- return nil, err
- }
- update := func() {
- var addrs []resolver.Address
- for _, val := range subset(sub.Values(), subsetSize) {
- addrs = append(addrs, resolver.Address{
- Addr: val,
- })
- }
- if err := cc.UpdateState(resolver.State{
- Addresses: addrs,
- }); err != nil {
- logx.Error(err)
- }
- }
- sub.AddListener(update)
- update()
- return &nopResolver{cc: cc}, nil
- }
3.update方法
update方法会被添加到事件监听中,当有PUT和DELETE事件触发,都会调用update方法进行服务状态的更新,事件监听是通过etcd的Watch机制实现,代码如下:
- func (c *cluster) watchStream(cli EtcdClient, key string) bool {
- rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
- for {
- select {
- case wresp, ok := <-rch:
- if !ok {
- logx.Error("etcd monitor chan has been closed")
- return false
- }
- if wresp.Canceled {
- logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
- return false
- }
- if wresp.Err() != nil {
- logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
- return false
- }
- c.handleWatchEvents(key, wresp.Events)
- case <-c.done:
- return true
- }
- }
- }
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。
在线投稿:投稿 站长QQ:1888636
后台-插件-广告管理-内容页尾部广告(手机) |