Blame view

components/etcd/etcd.go 2.51 KB
286f6dbe   zhangqijia   etcd 保存服务信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  package etcd
  
  import (
  	"context"
  	"fmt"
  	clientv3 "go.etcd.io/etcd/client/v3"
  	"pro2d/conf"
  	"pro2d/utils"
  	"time"
  )
  
  type EtcdClient struct {
  	etcd *clientv3.Client
  }
  
  func NewEtcdClient(conf *conf.Etcd) *EtcdClient {
  	cli, err := clientv3.New(clientv3.Config{
  		Endpoints:   conf.Endpoints,
  		DialTimeout:  time.Duration(conf.DialTimeout) * time.Second,
f7f4beb5   zhangqijia   新增grpc平滑关闭,修复crea...
20
  		Logger: utils.Logger,
286f6dbe   zhangqijia   etcd 保存服务信息
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  	})
  	if err != nil {
  		utils.Sugar.Errorf("etcd init err: %v", err)
  		return nil
  	}
  	return &EtcdClient{
  		etcd: cli,
  	}
  }
  
  func (e *EtcdClient)PutWithPrefix(prefix, key, val string)  {
  	_, err := e.etcd.Put(context.TODO(), fmt.Sprintf("/%s/%s/", prefix, key), val)
  	if err != nil {
  		utils.Sugar.Errorf("PutWithPrefix err: %v", err)
  		return
  	}
  }
  
  func (e *EtcdClient)PutWithLeasePrefix(prefix, key, val string, ttl int64) error {
  	lease := clientv3.NewLease(e.etcd)
  	leaseResp, err := lease.Grant(context.TODO(), ttl)
  	if err != nil {
  		utils.Sugar.Errorf("PutWithLeasePrefix 设置租约时间失败:%v\n", err)
  		return err
  	}
  
  	_, err = e.etcd.Put(context.TODO(), fmt.Sprintf("/%s/%s/", prefix, key), val, clientv3.WithLease(leaseResp.ID))
  	if err != nil {
  		utils.Sugar.Errorf("PutWithLeasePrefix err: %v", err)
  		return err
  	}
  
f7f4beb5   zhangqijia   新增grpc平滑关闭,修复crea...
53
  	keepRespChan, err := lease.KeepAlive(context.TODO(), leaseResp.ID)
286f6dbe   zhangqijia   etcd 保存服务信息
54
55
56
57
  	if err != nil {
  		utils.Sugar.Errorf("keepalive err: %v", err)
  		return err
  	}
f7f4beb5   zhangqijia   新增grpc平滑关闭,修复crea...
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  	go func() {
  		for {
  			select {
  			case _ = <-keepRespChan:
  				if keepRespChan == nil {
  					fmt.Println("租约已经失效")
  					goto END
  				} else { //每秒会续租一次,所以就会受到一次应答
  					//fmt.Println("收到自动续租应答:", keepResp.ID)
  				}
  			}
  		}
  		END:
  	}()
286f6dbe   zhangqijia   etcd 保存服务信息
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
  	return nil
  }
  
  func (e *EtcdClient)Get(key string) map[string]string {
  	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  	resp, err := e.etcd.Get(ctx, fmt.Sprintf("/%s/", key))
  	cancel()
  	if err != nil {
  		utils.Sugar.Errorf("etcd get key: %s, err: %v", key, err)
  		return nil
  	}
  	m := make(map[string]string)
  	for _, v := range resp.Kvs {
  		m[string(v.Key)] = string(v.Value)
  	}
  	return m
  }
  
  func (e *EtcdClient)GetByPrefix(prefix string) map[string]string {
  	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  	resp, err := e.etcd.Get(ctx, fmt.Sprintf("/%s/", prefix), clientv3.WithPrefix())
  	cancel()
  	if err != nil {
  		utils.Sugar.Errorf("etcd get prefix: %s, err: %v", prefix, err)
  		return nil
  	}
  	m := make(map[string]string)
  	for _, v := range resp.Kvs {
  		m[string(v.Key)] = string(v.Value)
  	}
  	return m
f7f4beb5   zhangqijia   新增grpc平滑关闭,修复crea...
103
104
105
106
  }
  
  func (e *EtcdClient)Close()  {
  	e.etcd.Close()
286f6dbe   zhangqijia   etcd 保存服务信息
107
  }