package etcd import ( "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "pro2d/common" "pro2d/common/logger" "time" ) type EtcdClient struct { etcd *clientv3.Client lease clientv3.Lease } var GlobalEtcd *EtcdClient func NewEtcdClient(conf *common.Etcd) error { cli, err := clientv3.New(clientv3.Config{ Endpoints: conf.Endpoints, DialTimeout: time.Duration(conf.DialTimeout) * time.Second, }) if err != nil { logger.Error("etcd init err: %v", err) return err } GlobalEtcd = &EtcdClient{ etcd: cli, lease: clientv3.NewLease(cli), } return nil } func GEtcdClient() *EtcdClient { return GlobalEtcd } func (e *EtcdClient) PutWithPrefix(prefix, key, val string) { _, err := e.etcd.Put(context.TODO(), fmt.Sprintf("/%s/%s/", prefix, key), val) if err != nil { logger.Error("PutWithPrefix err: %v", err) return } } func (e *EtcdClient) leaseKey(prefix, key, val string, ttl int64) (clientv3.LeaseID, error) { leaseResp, err := e.lease.Grant(context.TODO(), ttl) if err != nil { logger.Error("PutWithLeasePrefix leaseKey fail:%v\n", err) return 0, err } _, err = e.etcd.Put(context.TODO(), fmt.Sprintf("/%s/%s/", prefix, key), val, clientv3.WithLease(leaseResp.ID)) if err != nil { logger.Error("PutWithLeasePrefix err: %v", err) return 0, err } logger.Debug("lease key successful。。。") return leaseResp.ID, nil } func (e *EtcdClient) PutWithLeasePrefix(prefix, key, val string, ttl int64) error { logger.Debug("etcd register...") leaseID, err := e.leaseKey(prefix, key, val, ttl) if err != nil { logger.Error(err) panic(err) } go func() { for { if leaseID == 0 { leaseID, err = e.leaseKey(prefix, key, val, ttl) if err != nil { logger.Error("leaseKey err: %v", err) continue } } keepRespChan, err := e.lease.KeepAlive(context.TODO(), leaseID) if err != nil { logger.Error("keepalive err: %v", err) continue } EXIT: for { select { case keepChan := <-keepRespChan: if keepChan == nil { logger.Debug("lease expire") leaseID = 0 break EXIT } else { //每秒会续租一次,所以就会受到一次应答 //fmt.Println("收到自动续租应答:", keepResp.ID) } } } logger.Debug("exit keepRspChan for select") } }() return nil } func (e *EtcdClient) Get(key string) map[string]string { ctx, cancel := context.WithTimeout(context.Background(), time.Second) resp, err := e.etcd.Get(ctx, fmt.Sprintf("/%s/", key)) cancel() if err != nil { logger.Error("etcd get key: %s, err: %v", key, err) return nil } m := make(map[string]string) for _, v := range resp.Kvs { m[string(v.Key)] = string(v.Value) } return m } func (e *EtcdClient) GetByPrefix(prefix string) map[string]string { ctx, cancel := context.WithTimeout(context.Background(), time.Second) resp, err := e.etcd.Get(ctx, fmt.Sprintf("/%s/", prefix), clientv3.WithPrefix()) cancel() if err != nil { logger.Error("etcd get prefix: %s, err: %v", prefix, err) return nil } m := make(map[string]string) for _, v := range resp.Kvs { m[string(v.Key)] = string(v.Value) } return m } func (e *EtcdClient) Close() { e.etcd.Close() } func PutWithLeasePrefix(prefix, key, val string, ttl int64) error { return GEtcdClient().PutWithLeasePrefix(prefix, key, val, ttl) } func CloseEtcd() { GEtcdClient().Close() }