您现在的位置是:首页 > 技术教程 正文

go-zero的服务发现源码阅读

admin 阅读: 2024-03-18
后台-插件-广告管理-内容页头部广告(手机)

服务发现原理与grpc源码解析_wangxiaoangg的博客-CSDN博客

 

go-zero rpc demo官方文档:rpc编写与调用 | go-zero

目录

一 服务注册

1. 创建rpc服务

2. 启动rpc服务

3. registerEtcd做了什么

4. discov.NewPublisher 服务发布者

二 服务发现

1.定义&注册resolver

2.解析etcd地址&创建链接

3.update方法


一 服务注册

在看rpc服务端服务注册前,可以先看下go-zero的官方的 user rpc服务 demo。

在rpc的配置文件中配置了Etcd信息,以及服务对应的key,如下:user.yaml

  1. Name: user.rpc
  2. ListenOn: 127.0.0.1:8080
  3. Etcd:
  4. Hosts:
  5. - $etcdHost
  6. Key: user.rpc


1. 创建rpc服务


创建rpc服务调用了 zrpc/internal/rpcpubserver.go 中 NewRpcPubServer方法。

该方法返回一个server对象,并将registerEtcd方法注入到该sever。

  1. // NewRpcPubServer returns a Server.
  2. func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, middlewares ServerMiddlewaresConf,
  3. opts ...ServerOption) (Server, error) {
  4. registerEtcd := func() error {
  5. pubListenOn := figureOutListenOn(listenOn)
  6. var pubOpts []discov.PubOption
  7. if etcd.HasAccount() {
  8. pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))
  9. }
  10. if etcd.HasTLS() {
  11. pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
  12. etcd.CACertFile, etcd.InsecureSkipVerify))
  13. }
  14. if etcd.HasID() {
  15. pubOpts = append(pubOpts, discov.WithId(etcd.ID))
  16. }
  17. pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
  18. return pubClient.KeepAlive()
  19. }
  20. server := keepAliveServer{
  21. registerEtcd: registerEtcd,
  22. Server: NewRpcServer(listenOn, middlewares, opts...),
  23. }
  24. return server, nil
  25. }

2. 启动rpc服务

在启动Server的时候,调用Start方法,在Start方法中会调用registerEtcd进行真正的服务注册。
go-zerozrpc/internal/rpcpubserver.go

  1. type keepAliveServer struct {
  2. registerEtcd func() error
  3. Server
  4. }
  5. func (s keepAliveServer) Start(fn RegisterFn) error {
  6. if err := s.registerEtcd(); err != nil {
  7. return err
  8. }
  9. return s.Server.Start(fn)
  10. }

3. registerEtcd做了什么

  1. registerEtcd := func() error {
  2. //解析服务监听的地址
  3. pubListenOn := figureOutListenOn(listenOn)
  4. var pubOpts []discov.PubOption
  5. //etcd的链接方式
  6. if etcd.HasAccount() {
  7. pubOpts = append(pubOpts, discov.WithPubEtcdAccount(etcd.User, etcd.Pass))
  8. }
  9. if etcd.HasTLS() {
  10. pubOpts = append(pubOpts, discov.WithPubEtcdTLS(etcd.CertFile, etcd.CertKeyFile,
  11. etcd.CACertFile, etcd.InsecureSkipVerify))
  12. }
  13. if etcd.HasID() {
  14. pubOpts = append(pubOpts, discov.WithId(etcd.ID))
  15. }
  16. //新建puslisher
  17. pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
  18. //异步etcd 保活
  19. return pubClient.KeepAlive()
  20. }

4. discov.NewPublisher 服务发布者

代码路径core/discov/publisher.go

在KeepAlive方法中,
1.首先创建etcd连接,
2.用register方法进行服务注册。
3.register创建租约,租约默认时间为10秒钟
4.最后通过Put方法进行注册。
5.调用 keepAliveAsync 进行租约的续期,保证服务一直是存活的状态,如果服务异常退出了,那么也就无法进行续期,服务发现也就能自动识别到该服务异常下线了。

  1. // KeepAlive keeps key:value alive.
  2. func (p *Publisher) KeepAlive() error {
  3. cli, err := p.doRegister()
  4. if err != nil {
  5. return err
  6. }
  7. proc.AddWrapUpListener(func() {
  8. p.Stop()
  9. })
  10. return p.keepAliveAsync(cli)
  11. }
  12. func (p *Publisher) doRegister() (internal.EtcdClient, error) {
  13. //链接etcd
  14. cli, err := internal.GetRegistry().GetConn(p.endpoints)
  15. if err != nil {
  16. return nil, err
  17. }
  18. p.lease, err = p.register(cli)
  19. return cli, err
  20. }
  21. func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
  22. //创建租约
  23. resp, err := client.Grant(client.Ctx(), TimeToLive)
  24. if err != nil {
  25. return clientv3.NoLease, err
  26. }
  27. lease := resp.ID
  28. if p.id > 0 {
  29. p.fullKey = makeEtcdKey(p.key, p.id)
  30. } else {
  31. p.fullKey = makeEtcdKey(p.key, int64(lease))
  32. }
  33. //put key 注册
  34. _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
  35. return lease, err
  36. }
  37. //异步续租 保活
  38. func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
  39. ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
  40. if err != nil {
  41. return err
  42. }
  43. threading.GoSafe(func() {
  44. for {
  45. select {
  46. case _, ok := <-ch:
  47. if !ok {
  48. p.revoke(cli)
  49. if err := p.doKeepAlive(); err != nil {
  50. logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
  51. }
  52. return
  53. }
  54. case <-p.pauseChan:
  55. logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
  56. p.revoke(cli)
  57. select {
  58. case <-p.resumeChan:
  59. if err := p.doKeepAlive(); err != nil {
  60. logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
  61. }
  62. return
  63. case <-p.quit.Done():
  64. return
  65. }
  66. case <-p.quit.Done():
  67. p.revoke(cli)
  68. return
  69. }
  70. }
  71. })
  72. return nil
  73. }

二 服务发现

前面的已经介绍了,rpc服务启动时候是如何将服务注册到etcd中的。

在rpc的服务调用方 配置服务提供方的Etcd信息,以及服务对应的key,如下:user.yaml

  1. Name: search-api
  2. Host: 0.0.0.0
  3. Port: 8889
  4. Auth:
  5. AccessSecret: $AccessSecret
  6. AccessExpire: $AccessExpire
  7. UserRpc:
  8. Etcd:
  9. Hosts:
  10. - $etcdHost
  11. Key: user.rpc

1.定义&注册resolver

go-zero的服务发现是在客户端实现的。在创建zRPC客户端的时候,通过init方法进行了自定义Resolver的注册。

go-zero/zrpc/internal/client.go

  1. func init() {
  2. resolver.Register()
  3. }

 zrpc/resolver/internal/resolver.go

  1. // RegisterResolver registers the direct and discov schemes to the resolver.
  2. func RegisterResolver() {
  3. resolver.Register(&directResolverBuilder)
  4. resolver.Register(&discovResolverBuilder)
  5. resolver.Register(&etcdResolverBuilder)
  6. resolver.Register(&k8sResolverBuilder)
  7. }

gozero注册了四个revlover builder 这里我们只看etcd reslover。

2.解析etcd地址&创建链接

首先从target中解析出etcd的地址,和服务对应的key。然后创建etcd连接,接着执行update方法,在update方法中,通过调用cc.UpdateState方法进行服务状态的更新。 

zrpc/resolver/internal/discovbuilder.go

  1. func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
  2. resolver.Resolver, error) {
  3. hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {
  4. return r == EndpointSepChar
  5. })
  6. sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))
  7. if err != nil {
  8. return nil, err
  9. }
  10. update := func() {
  11. var addrs []resolver.Address
  12. for _, val := range subset(sub.Values(), subsetSize) {
  13. addrs = append(addrs, resolver.Address{
  14. Addr: val,
  15. })
  16. }
  17. if err := cc.UpdateState(resolver.State{
  18. Addresses: addrs,
  19. }); err != nil {
  20. logx.Error(err)
  21. }
  22. }
  23. sub.AddListener(update)
  24. update()
  25. return &nopResolver{cc: cc}, nil
  26. }

3.update方法

update方法会被添加到事件监听中,当有PUT和DELETE事件触发,都会调用update方法进行服务状态的更新,事件监听是通过etcd的Watch机制实现,代码如下:

  1. func (c *cluster) watchStream(cli EtcdClient, key string) bool {
  2. rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
  3. for {
  4. select {
  5. case wresp, ok := <-rch:
  6. if !ok {
  7. logx.Error("etcd monitor chan has been closed")
  8. return false
  9. }
  10. if wresp.Canceled {
  11. logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err())
  12. return false
  13. }
  14. if wresp.Err() != nil {
  15. logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
  16. return false
  17. }
  18. c.handleWatchEvents(key, wresp.Events)
  19. case <-c.done:
  20. return true
  21. }
  22. }
  23. }

标签:
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

在线投稿:投稿 站长QQ:1888636

后台-插件-广告管理-内容页尾部广告(手机)
关注我们

扫一扫关注我们,了解最新精彩内容

搜索
排行榜