jinzh notes
jinzh notes

Go操作etcd v3

Go操作etcd v3
内容纲要

采用etc官方提供的go包来对etcd进行操作

注意
本文采用的是etcd go v3.5文档

v3文档

etcd特性

包安装

go get go.etcd.io/etcd/client/v3
import clientv3 "go.etcd.io/etcd/client/v3"

etcd官方提供的包分为v2和v3,v2采用的是restful api,v3采用的是grpc方案,因此v2和v3数据存储访问并不通用,同时v3版本增加了持久化存储功能

Etcd v2 和 v3 本质上是共享同一套 raft 协议代码的两个独立的应用,接口不一样,存储不一样,数据互相隔离。也就是说如果从 Etcd v2 升级到 Etcd v3,原来 v2 的数据还是只能用 v2 的接口访问,v3 的接口创建的数据也只能访问通过 v3 的接口访问。所以我们按照 v2 和 v3 分别分析。

值得注意的是,etcd的go包采用的是grpc-go这个包


连接etcd

以下的其他操作前提均为已连接etcd

cli, err := clientv3.New(clientv3.Config{
    Endpoints:   []string{"127.0.0.1:2379"},
    DialTimeout: 5 * time.Second,
})
if err != nil {
    // handle error!
    fmt.Printf("connect to etcd failed, err:%v\n", err)
    return
}
fmt.Println("connect to etcd success")
defer cli.Close()

这里采用grpc-go连接etcd,返回的err错误类型,不能通过它是否为nil来判断连接是否建立成功,因为如果etcd服务期并没有上线,返回的err也是nil类型

连接使用完成后一定要进行close操作,不然会造成goroutine泄露,如果想要指定请求超时结束的话,可以使用context的WithTimeout函数

错误处理

etcd可能返回两种错误,一种是上下文错误(即context包的错误),另一种是grpc错误,也就是请求错误

以下是一种简短的操作方式:

resp, err := cli.Put(ctx, "", "")
if err != nil {
    switch err {
    case context.Canceled:
        log.Fatalf("ctx is canceled by another routine: %v", err)
    case context.DeadlineExceeded:
        log.Fatalf("ctx is attached with a deadline is exceeded: %v", err)
    case rpctypes.ErrEmptyKey:
        log.Fatalf("client-side error: %v", err)
    default:
        log.Fatalf("bad cluster endpoints, which are not etcd servers: %v", err)
    }
}

在官方文档中,额外提出了一种处理错误的方法:

_, err := kvc.Get(ctx, "a")
if err != nil {
    // with etcd clientv3 >= v3.4
    if clientv3.IsConnCanceled(err) {
        // gRPC client connection is closed
    }
}

命名空间 namespace

包 namespace 是一个 clientv3 包装器,它将所有键转换为以给定前缀开头。

官方文档

其实我更习惯将这个功能称之为拦截器,因为它实际就是将你所有发出的请求添加前缀,实际作用和axios的拦截器机制是一样的,不过作用范围明显比axios的拦截器小了很多

//这里操作前提是你已经进行过最初的etcd连接
//保留未封装/覆盖的client 接口
unprefixedKV := cli.KV
//对接口进行封装/覆盖
cli.KV = namespace.NewKV(cli.KV, "my-prefix/")
cli.Watcher = namespace.NewWatcher(cli.Watcher, "my-prefix/")
cli.Lease = namespace.NewLease(cli.Lease, "my-prefix/")
//封装完成后的接口和封装之前是一样的,只是多了自动添加前缀prefix,我们也就不需要自己添加前缀了
cli.Put(context.TODO(), "abc", "123")
resp, _ := unprefixedKV.Get(context.TODO(), "my-prefix/abc")
fmt.Printf("%s\n", resp.Kvs[0].Value)
// Output: 123
unprefixedKV.Put(context.TODO(), "my-prefix/abc", "456")
resp, _ = cli.Get(context.TODO(), "abc")
fmt.Printf("%s\n", resp.Kvs[0].Value)
// Output: 456

请求大小限制

客户端可以设定请求的开销数据大小,通过clientv3.Config.MaxCallSendMsgSizeMaxCallRecvMsgSize设定,单位为字节,

前缀/范围操作

在相应的函数中使用clientv3.WithPrefix()clientv3.WithRange()两个函数,其中范围函数需要参数end

使用示例:

rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())

PUT操作

// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "q1mi", "dsb")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
    return
}

GET操作

// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "q1mi")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
    return
}
for _, ev := range resp.Kvs {
    fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}

Watch操作

// watch key:q1mi change
rch := cli.Watch(context.Background(), "q1mi") // <-chan WatchResponse
for wresp := range rch {
    for _, ev := range wresp.Events {
        fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        //这里获取到的是操作类型,操作键,操作值
    }
}

Lease续约

// 创建一个5秒的租约
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
    log.Fatal(err)
}

// 5秒钟之后, /nazha/ 这个key就会被移除
_, err = cli.Put(context.TODO(), "/nazha/", "dsb", clientv3.WithLease(resp.ID))
if err != nil {
    log.Fatal(err)
}

keepAlive

// 键 key 将会一直有效
ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
    log.Fatal(kaerr)
}
for {
    ka := <-ch
    fmt.Println("ttl:", ka.TTL)
}

实现简单的分布式锁

etcd官方提供了concurrency包来实现分布式锁,地址:godoc

思路

简单使用就是首先使用创建好的etcd连接创建新的会话,然后通过会话对路径加锁,然后等待获取锁的会话会一直处于阻塞状态

主要原理还是通过etcd的craft算法的强一致性实现

代码

// 创建两个单独的会话用来演示锁竞争
s1, err := concurrency.NewSession(cli)
if err != nil {
    log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")

s2, err := concurrency.NewSession(cli)
if err != nil {
    log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")

// 会话s1获取锁
if err := m1.Lock(context.TODO()); err != nil {
    log.Fatal(err)
}
fmt.Println("acquired lock for s1")

m2Locked := make(chan struct{})
//这里单独起一个m2Locked是为了阻塞主进程,让其等待协程执行完任务
go func() {
    defer close(m2Locked)
    // 等待直到会话s1释放了/my-lock/的锁
    if err := m2.Lock(context.TODO()); err != nil {
        log.Fatal(err)
    }
}()

if err := m1.Unlock(context.TODO()); err != nil {
    log.Fatal(err)
}
fmt.Println("released lock for s1")

<-m2Locked
fmt.Println("acquired lock for s2")

服务注册与发现

思路

etcd作为注册中心,然后本身支持watch和put以及存活时间(lease),因此服务发现作为一个watcher存在,而服务注册者作为一个putter存在

  1. putter向etcd(注册中心)不停地put有效的数据,并且为了保持服务注册者灵活的上下线,可以让注册者维护一个租约,定时进行租约的续订
  2. watcher则只需要进行watch操作即可

服务注册代码

package main

import (
    "context"
    "log"
    "time"

    clientv3 "go.etcd.io/etcd/client/v3"
)

//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
    cli     *clientv3.Client //etcd client
    leaseID clientv3.LeaseID //租约ID
    //租约keepalieve相应chan
    keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
    key           string //key
    val           string //value
}

//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }

    ser := &ServiceRegister{
        cli: cli,
        key: key,
        val: val,
    }

    //申请租约设置时间keepalive
    if err := ser.putKeyWithLease(lease); err != nil {
        return nil, err
    }

    return ser, nil
}

//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
    //设置租约时间
    resp, err := s.cli.Grant(context.Background(), lease)
    if err != nil {
        return err
    }
    //注册服务并绑定租约
    _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
    if err != nil {
        return err
    }
    //设置续租 定期发送需求请求
    leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

    if err != nil {
        return err
    }
    s.leaseID = resp.ID
    log.Println(s.leaseID)
    s.keepAliveChan = leaseRespChan
    log.Printf("Put key:%s  val:%s  success!", s.key, s.val)
    return nil
}

//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
    for leaseKeepResp := range s.keepAliveChan {
        log.Println("续约成功", leaseKeepResp)
    }
    log.Println("关闭续租")
}

// Close 注销服务
func (s *ServiceRegister) Close() error {
    //撤销租约
    if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
        return err
    }
    log.Println("撤销租约")
    return s.cli.Close()
}

func main() {
    var endpoints = []string{"localhost:2379"}
    ser, err := NewServiceRegister(endpoints, "/web/node1", "localhost:8000", 5)
    if err != nil {
        log.Fatalln(err)
    }
    //监听续租相应chan
    go ser.ListenLeaseRespChan()
    select {
    // case <-time.After(20 * time.Second):
    //  ser.Close()
    }
}

服务发现代码

package main

import (
    "context"
    "log"
    "sync"
    "time"

    clientv3 "go.etcd.io/etcd/client/v3"
)

//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
    cli        *clientv3.Client  //etcd client
    serverList map[string]string //服务列表
    lock       sync.Mutex
}

//NewServiceDiscovery  新建发现服务
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }

    return &ServiceDiscovery{
        cli:        cli,
        serverList: make(map[string]string),
    }
}

//WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
    //根据前缀获取现有的key
    resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
    if err != nil {
        return err
    }

    for _, ev := range resp.Kvs {
        s.SetServiceList(string(ev.Key), string(ev.Value))
    }

    //监视前缀,修改变更的server
    go s.watcher(prefix)
    return nil
}

//watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
    rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
    log.Printf("watching prefix:%s now...", prefix)
    for wresp := range rch {
        for _, ev := range wresp.Events {
            switch ev.Type {
            case clientv3.EventTypePut: //修改或者新增
                s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
            case clientv3.EventTypeDelete: //删除
                s.DelServiceList(string(ev.Kv.Key))
            }
        }
    }
}

//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
    s.lock.Lock()
    defer s.lock.Unlock()
    s.serverList[key] = string(val)
    log.Println("put key :", key, "val:", val)
}

//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
    s.lock.Lock()
    defer s.lock.Unlock()
    delete(s.serverList, key)
    log.Println("del key:", key)
}

//GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
    s.lock.Lock()
    defer s.lock.Unlock()
    addrs := make([]string, 0)

    for _, v := range s.serverList {
        addrs = append(addrs, v)
    }
    return addrs
}

//Close 关闭服务
func (s *ServiceDiscovery) Close() error {
    return s.cli.Close()
}

func main() {
    var endpoints = []string{"localhost:2379"}
    ser := NewServiceDiscovery(endpoints)
    defer ser.Close()
    ser.WatchService("/web/")
    ser.WatchService("/gRPC/")
    for {
        select {
        case <-time.Tick(10 * time.Second):
            log.Println(ser.GetServices())
        }
    }
}

影翼

文章作者

发表回复

textsms
account_circle
email

jinzh notes

Go操作etcd v3
etcd(读作 et-see-dee)是一种开源的分布式统一键值存储,用于分布式系统或计算机集群的共享配置、服务发现和的调度协调。etcd 有助于促进更加安全的自动更新,协调向主机调度的工作,并帮助设置容器的覆盖网络。
扫描二维码继续阅读
2022-03-29