etcd.go 3.31 KB
package etcd

import (
	"context"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"pro2d/common"
	"pro2d/common/logger"
	"time"
)

type EtcdClient struct {
	etcd  *clientv3.Client
	lease clientv3.Lease
}

var GlobalEtcd *EtcdClient

func NewEtcdClient(conf *common.Etcd) error {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   conf.Endpoints,
		DialTimeout: time.Duration(conf.DialTimeout) * time.Second,
	})
	if err != nil {
		logger.Error("etcd init err: %v", err)
		return err
	}

	GlobalEtcd = &EtcdClient{
		etcd:  cli,
		lease: clientv3.NewLease(cli),
	}
	return nil
}

func GEtcdClient() *EtcdClient {
	return GlobalEtcd
}

func (e *EtcdClient) PutWithPrefix(prefix, key, val string) {
	_, err := e.etcd.Put(context.TODO(), fmt.Sprintf("/%s/%s/", prefix, key), val)
	if err != nil {
		logger.Error("PutWithPrefix err: %v", err)
		return
	}
}

func (e *EtcdClient) leaseKey(prefix, key, val string, ttl int64) (clientv3.LeaseID, error) {
	leaseResp, err := e.lease.Grant(context.TODO(), ttl)
	if err != nil {
		logger.Error("PutWithLeasePrefix leaseKey fail:%v\n", err)
		return 0, err
	}

	_, err = e.etcd.Put(context.TODO(), fmt.Sprintf("/%s/%s/", prefix, key), val, clientv3.WithLease(leaseResp.ID))
	if err != nil {
		logger.Error("PutWithLeasePrefix err: %v", err)
		return 0, err
	}
	logger.Debug("lease key successful。。。")
	return leaseResp.ID, nil
}

func (e *EtcdClient) PutWithLeasePrefix(prefix, key, val string, ttl int64) error {
	logger.Debug("etcd register...")
	leaseID, err := e.leaseKey(prefix, key, val, ttl)
	if err != nil {
		logger.Error(err)
		panic(err)
	}
	go func() {
		for {
			if leaseID == 0 {
				leaseID, err = e.leaseKey(prefix, key, val, ttl)
				if err != nil {
					logger.Error("leaseKey err: %v", err)
					continue
				}
			}

			keepRespChan, err := e.lease.KeepAlive(context.TODO(), leaseID)
			if err != nil {
				logger.Error("keepalive err: %v", err)
				continue
			}

		EXIT:
			for {
				select {
				case keepChan := <-keepRespChan:
					if keepChan == nil {
						logger.Debug("lease expire")
						leaseID = 0
						break EXIT
					} else {
						//每秒会续租一次,所以就会受到一次应答
						//fmt.Println("收到自动续租应答:", keepResp.ID)
					}
				}
			}

			logger.Debug("exit keepRspChan for select")
		}
	}()

	return nil
}

func (e *EtcdClient) Get(key string) map[string]string {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	resp, err := e.etcd.Get(ctx, fmt.Sprintf("/%s/", key))
	cancel()
	if err != nil {
		logger.Error("etcd get key: %s, err: %v", key, err)
		return nil
	}
	m := make(map[string]string)
	for _, v := range resp.Kvs {
		m[string(v.Key)] = string(v.Value)
	}
	return m
}

func (e *EtcdClient) GetByPrefix(prefix string) map[string]string {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	resp, err := e.etcd.Get(ctx, fmt.Sprintf("/%s/", prefix), clientv3.WithPrefix())
	cancel()
	if err != nil {
		logger.Error("etcd get prefix: %s, err: %v", prefix, err)
		return nil
	}
	m := make(map[string]string)
	for _, v := range resp.Kvs {
		m[string(v.Key)] = string(v.Value)
	}
	return m
}

func (e *EtcdClient) Close() {
	e.etcd.Close()
}

func PutWithLeasePrefix(prefix, key, val string, ttl int64) error {
	return GEtcdClient().PutWithLeasePrefix(prefix, key, val, ttl)
}

func CloseEtcd() {
	GEtcdClient().Close()
}