采用etc官方提供的go包来对etcd进行操作
注意
本文采用的是etcd go v3.5文档

包安装
go get go.etcd.io/etcd/client/v3
import clientv3 "go.etcd.io/etcd/client/v3"
etcd官方提供的包分为v2和v3,v2采用的是restful api,v3采用的是grpc方案,因此v2和v3数据存储访问并不通用,同时v3版本增加了持久化存储功能
Etcd v2 和 v3 本质上是共享同一套 raft 协议代码的两个独立的应用,接口不一样,存储不一样,数据互相隔离。也就是说如果从 Etcd v2 升级到 Etcd v3,原来 v2 的数据还是只能用 v2 的接口访问,v3 的接口创建的数据也只能访问通过 v3 的接口访问。所以我们按照 v2 和 v3 分别分析。
值得注意的是,etcd的go包采用的是grpc-go
这个包
连接etcd
以下的其他操作前提均为已连接etcd
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
这里采用grpc-go
连接etcd,返回的err错误类型,不能通过它是否为nil来判断连接是否建立成功,因为如果etcd服务期并没有上线,返回的err也是nil类型
连接使用完成后一定要进行close操作,不然会造成goroutine泄露,如果想要指定请求超时结束的话,可以使用context的WithTimeout函数
错误处理
etcd可能返回两种错误,一种是上下文错误(即context包的错误),另一种是grpc错误,也就是请求错误
以下是一种简短的操作方式:
resp, err := cli.Put(ctx, "", "")
if err != nil {
switch err {
case context.Canceled:
log.Fatalf("ctx is canceled by another routine: %v", err)
case context.DeadlineExceeded:
log.Fatalf("ctx is attached with a deadline is exceeded: %v", err)
case rpctypes.ErrEmptyKey:
log.Fatalf("client-side error: %v", err)
default:
log.Fatalf("bad cluster endpoints, which are not etcd servers: %v", err)
}
}
在官方文档中,额外提出了一种处理错误的方法:
_, err := kvc.Get(ctx, "a")
if err != nil {
// with etcd clientv3 >= v3.4
if clientv3.IsConnCanceled(err) {
// gRPC client connection is closed
}
}
命名空间 namespace
包 namespace 是一个 clientv3 包装器,它将所有键转换为以给定前缀开头。
其实我更习惯将这个功能称之为拦截器,因为它实际就是将你所有发出的请求添加前缀,实际作用和axios的拦截器机制是一样的,不过作用范围明显比axios的拦截器小了很多
//这里操作前提是你已经进行过最初的etcd连接
//保留未封装/覆盖的client 接口
unprefixedKV := cli.KV
//对接口进行封装/覆盖
cli.KV = namespace.NewKV(cli.KV, "my-prefix/")
cli.Watcher = namespace.NewWatcher(cli.Watcher, "my-prefix/")
cli.Lease = namespace.NewLease(cli.Lease, "my-prefix/")
//封装完成后的接口和封装之前是一样的,只是多了自动添加前缀prefix,我们也就不需要自己添加前缀了
cli.Put(context.TODO(), "abc", "123")
resp, _ := unprefixedKV.Get(context.TODO(), "my-prefix/abc")
fmt.Printf("%s\n", resp.Kvs[0].Value)
// Output: 123
unprefixedKV.Put(context.TODO(), "my-prefix/abc", "456")
resp, _ = cli.Get(context.TODO(), "abc")
fmt.Printf("%s\n", resp.Kvs[0].Value)
// Output: 456
请求大小限制
客户端可以设定请求的开销数据大小,通过clientv3.Config.MaxCallSendMsgSize
和MaxCallRecvMsgSize
设定,单位为字节,
前缀/范围操作
在相应的函数中使用clientv3.WithPrefix()
和clientv3.WithRange()
两个函数,其中范围函数需要参数end
使用示例:
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
PUT操作
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "q1mi", "dsb")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
GET操作
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "q1mi")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
Watch操作
// watch key:q1mi change
rch := cli.Watch(context.Background(), "q1mi") // <-chan WatchResponse
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
//这里获取到的是操作类型,操作键,操作值
}
}
Lease续约
// 创建一个5秒的租约
resp, err := cli.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// 5秒钟之后, /nazha/ 这个key就会被移除
_, err = cli.Put(context.TODO(), "/nazha/", "dsb", clientv3.WithLease(resp.ID))
if err != nil {
log.Fatal(err)
}
keepAlive
// 键 key 将会一直有效
ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
for {
ka := <-ch
fmt.Println("ttl:", ka.TTL)
}
实现简单的分布式锁
etcd官方提供了concurrency包来实现分布式锁,地址:godoc
思路
简单使用就是首先使用创建好的etcd连接创建新的会话,然后通过会话对路径加锁,然后等待获取锁的会话会一直处于阻塞状态
主要原理还是通过etcd的craft算法的强一致性实现
代码
// 创建两个单独的会话用来演示锁竞争
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")
s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")
// 会话s1获取锁
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
m2Locked := make(chan struct{})
//这里单独起一个m2Locked是为了阻塞主进程,让其等待协程执行完任务
go func() {
defer close(m2Locked)
// 等待直到会话s1释放了/my-lock/的锁
if err := m2.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
}()
if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
<-m2Locked
fmt.Println("acquired lock for s2")
服务注册与发现
思路
etcd作为注册中心,然后本身支持watch和put以及存活时间(lease),因此服务发现作为一个watcher存在,而服务注册者作为一个putter存在
- putter向etcd(注册中心)不停地put有效的数据,并且为了保持服务注册者灵活的上下线,可以让注册者维护一个租约,定时进行租约的续订
- watcher则只需要进行watch操作即可
服务注册代码
package main
import (
"context"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
cli *clientv3.Client //etcd client
leaseID clientv3.LeaseID //租约ID
//租约keepalieve相应chan
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string //key
val string //value
}
//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
ser := &ServiceRegister{
cli: cli,
key: key,
val: val,
}
//申请租约设置时间keepalive
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//设置租约时间
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
//注册服务并绑定租约
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
//设置续租 定期发送需求请求
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
log.Println(s.leaseID)
s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("续约成功", leaseKeepResp)
}
log.Println("关闭续租")
}
// Close 注销服务
func (s *ServiceRegister) Close() error {
//撤销租约
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser, err := NewServiceRegister(endpoints, "/web/node1", "localhost:8000", 5)
if err != nil {
log.Fatalln(err)
}
//监听续租相应chan
go ser.ListenLeaseRespChan()
select {
// case <-time.After(20 * time.Second):
// ser.Close()
}
}
服务发现代码
package main
import (
"context"
"log"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
cli *clientv3.Client //etcd client
serverList map[string]string //服务列表
lock sync.Mutex
}
//NewServiceDiscovery 新建发现服务
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
serverList: make(map[string]string),
}
}
//WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
//根据前缀获取现有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
//监视前缀,修改变更的server
go s.watcher(prefix)
return nil
}
//watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case clientv3.EventTypePut: //修改或者新增
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case clientv3.EventTypeDelete: //删除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.lock.Lock()
defer s.lock.Unlock()
s.serverList[key] = string(val)
log.Println("put key :", key, "val:", val)
}
//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.serverList, key)
log.Println("del key:", key)
}
//GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
s.lock.Lock()
defer s.lock.Unlock()
addrs := make([]string, 0)
for _, v := range s.serverList {
addrs = append(addrs, v)
}
return addrs
}
//Close 关闭服务
func (s *ServiceDiscovery) Close() error {
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser := NewServiceDiscovery(endpoints)
defer ser.Close()
ser.WatchService("/web/")
ser.WatchService("/gRPC/")
for {
select {
case <-time.Tick(10 * time.Second):
log.Println(ser.GetServices())
}
}
}
发表评论