Blame view

src/components/etcd/etcd.go 2.46 KB
286f6dbe   zhangqijia   etcd 保存服务信息
1
2
3
4
5
6
7
  package etcd
  
  import (
  	"context"
  	"fmt"
  	clientv3 "go.etcd.io/etcd/client/v3"
  	"pro2d/conf"
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
8
  	"pro2d/src/components/logger"
286f6dbe   zhangqijia   etcd 保存服务信息
9
10
11
12
13
14
15
16
17
18
19
  	"time"
  )
  
  type EtcdClient struct {
  	etcd *clientv3.Client
  }
  
  func NewEtcdClient(conf *conf.Etcd) *EtcdClient {
  	cli, err := clientv3.New(clientv3.Config{
  		Endpoints:   conf.Endpoints,
  		DialTimeout:  time.Duration(conf.DialTimeout) * time.Second,
286f6dbe   zhangqijia   etcd 保存服务信息
20
21
  	})
  	if err != nil {
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
22
  		logger.Error("etcd init err: %v", err)
286f6dbe   zhangqijia   etcd 保存服务信息
23
24
25
26
27
28
29
30
31
32
  		return nil
  	}
  	return &EtcdClient{
  		etcd: cli,
  	}
  }
  
  func (e *EtcdClient)PutWithPrefix(prefix, key, val string)  {
  	_, err := e.etcd.Put(context.TODO(), fmt.Sprintf("/%s/%s/", prefix, key), val)
  	if err != nil {
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
33
  		logger.Error("PutWithPrefix err: %v", err)
286f6dbe   zhangqijia   etcd 保存服务信息
34
35
36
37
38
39
40
41
  		return
  	}
  }
  
  func (e *EtcdClient)PutWithLeasePrefix(prefix, key, val string, ttl int64) error {
  	lease := clientv3.NewLease(e.etcd)
  	leaseResp, err := lease.Grant(context.TODO(), ttl)
  	if err != nil {
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
42
  		logger.Error("PutWithLeasePrefix 设置租约时间失败:%v\n", err)
286f6dbe   zhangqijia   etcd 保存服务信息
43
44
45
46
47
  		return err
  	}
  
  	_, err = e.etcd.Put(context.TODO(), fmt.Sprintf("/%s/%s/", prefix, key), val, clientv3.WithLease(leaseResp.ID))
  	if err != nil {
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
48
  		logger.Error("PutWithLeasePrefix err: %v", err)
286f6dbe   zhangqijia   etcd 保存服务信息
49
50
51
  		return err
  	}
  
f7f4beb5   zhangqijia   新增grpc平滑关闭,修复crea...
52
  	keepRespChan, err := lease.KeepAlive(context.TODO(), leaseResp.ID)
286f6dbe   zhangqijia   etcd 保存服务信息
53
  	if err != nil {
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
54
  		logger.Error("keepalive err: %v", err)
286f6dbe   zhangqijia   etcd 保存服务信息
55
56
  		return err
  	}
f7f4beb5   zhangqijia   新增grpc平滑关闭,修复crea...
57
58
59
60
61
62
63
64
65
66
67
68
69
70
  	go func() {
  		for {
  			select {
  			case _ = <-keepRespChan:
  				if keepRespChan == nil {
  					fmt.Println("租约已经失效")
  					goto END
  				} else { //每秒会续租一次,所以就会受到一次应答
  					//fmt.Println("收到自动续租应答:", keepResp.ID)
  				}
  			}
  		}
  		END:
  	}()
286f6dbe   zhangqijia   etcd 保存服务信息
71
72
73
74
75
76
77
78
  	return nil
  }
  
  func (e *EtcdClient)Get(key string) map[string]string {
  	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  	resp, err := e.etcd.Get(ctx, fmt.Sprintf("/%s/", key))
  	cancel()
  	if err != nil {
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
79
  		logger.Error("etcd get key: %s, err: %v", key, err)
286f6dbe   zhangqijia   etcd 保存服务信息
80
81
82
83
84
85
86
87
88
89
90
91
92
93
  		return nil
  	}
  	m := make(map[string]string)
  	for _, v := range resp.Kvs {
  		m[string(v.Key)] = string(v.Value)
  	}
  	return m
  }
  
  func (e *EtcdClient)GetByPrefix(prefix string) map[string]string {
  	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  	resp, err := e.etcd.Get(ctx, fmt.Sprintf("/%s/", prefix), clientv3.WithPrefix())
  	cancel()
  	if err != nil {
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
94
  		logger.Error("etcd get prefix: %s, err: %v", prefix, err)
286f6dbe   zhangqijia   etcd 保存服务信息
95
96
97
98
99
100
101
  		return nil
  	}
  	m := make(map[string]string)
  	for _, v := range resp.Kvs {
  		m[string(v.Key)] = string(v.Value)
  	}
  	return m
f7f4beb5   zhangqijia   新增grpc平滑关闭,修复crea...
102
103
104
105
  }
  
  func (e *EtcdClient)Close()  {
  	e.etcd.Close()
286f6dbe   zhangqijia   etcd 保存服务信息
106
  }