agent.go 4.68 KB
package service

import (
	"fmt"
	"github.com/golang/protobuf/proto"
	"math"
	"pro2d/common"
	"pro2d/common/components"
	"pro2d/common/logger"
	"pro2d/models"
	"pro2d/pb"
	"sync"
	"sync/atomic"
)

type Agent struct {
	components.IConnection
	Server components.IServer
	components.IAgent

	Role               *models.RoleModel
	nextCheckTime      int64 //下一次检查的时间
	lastHeartCheckTime int64
	heartTimeoutCount  int //超时次数
}

var agentPool = sync.Pool{New: func() interface{} { return new(Agent) }}

func NewAgent(s components.IServer) *Agent {
	a := agentPool.Get().(*Agent)
	a.Server = s

	a.nextCheckTime = 0
	a.lastHeartCheckTime = common.Timex()
	a.heartTimeoutCount = 0
	return a
}

func (c *Agent) SetSchema(schema components.ISchema) {
	c.Role = schema.(*models.RoleModel)
	c.Role.SetConn(c)

	c.Server.GetConnManage().AddRID(c.Role.Data.Id, c.IConnection.GetID())
	c.Server.GetConnManage().AddUID(c.Role.Data.Uid, c.IConnection.GetID())
}

func (c *Agent) GetSchema() components.ISchema {
	return c.Role
}

func (c *Agent) SetServer(server components.IServer) {
	c.Server = server
}

func (c *Agent) GetServer() components.IServer {
	return c.Server
}

func (c *Agent) OnConnection(conn components.IConnection) {
	c.IConnection = conn
}

/*OnLoginQuery 登录请求
2 角色不存在
*/
func (c *Agent) OnLoginQuery(msg components.IMessage) (int32, proto.Message) {
	//logger.Debug("11111111cmd: %v, msg: %s", msg.GetHeader().GetMsgID(), msg.GetData())
	req := pb.LoginReq{}
	if err := proto.Unmarshal(msg.GetData(), &req); err != nil {
		logger.Error("loginRpc err: %v", err)
		return 1, nil
	}

	uid := req.Token

	// 判断是否已经登录 ,挤掉,断开连接
	conn := c.Server.GetConnManage().GetConnByUID(uid)
	if conn != nil {
		logger.Debug("挤掉。。。。。。。。")
		conn.SendSuccess(uint32(pb.ProtoCode_DisConnectNty), nil)
		conn.Stop()
	}

	role := models.RoleExistByUid(uid)
	if role == nil {
		return 2, nil
	}
	role.SetProperty("Device", req.Device)
	protoMsg := &pb.RoleRsp{
		Role:       role.Data,
		Hero:       role.GetAllHero(),
		Team:       role.GetAllTeam(),
		Equipments: role.GetEquipments(),
	}
	//登录成功,存储agent role
	c.SetSchema(role)
	return 0, protoMsg
}

func (c *Agent) SendMsg(errCode int32, cmd uint32, msg interface{}) error {
	if msg == nil || errCode != 0 {
		return c.Send(errCode, cmd, nil)
	}
	rsp, err := proto.Marshal(msg.(proto.Message))
	if err != nil {
		return c.Send(-100, cmd, nil)
	}
	return c.Send(errCode, cmd, rsp)
}

func (c *Agent) OnMessage(msg components.IMessage) error {
	atomic.StoreInt64(&c.lastHeartCheckTime, common.Timex())
	logger.Debug("req protocolID: %d, %s", msg.GetHeader().GetMsgID(), msg.GetData())
	//heart
	if msg.GetHeader().GetMsgID() == uint32(pb.ProtoCode_HeartRpc) {
		return nil
	}

	//login
	if msg.GetHeader().GetMsgID() == uint32(pb.ProtoCode_LoginRpc) {
		code, protoMsg := c.OnLoginQuery(msg)
		return c.SendMsg(code, msg.GetHeader().GetMsgID(), protoMsg)
	}

	//get handler by msgid
	md := c.Server.GetAction(msg.GetHeader().GetMsgID())
	if md == nil {
		return fmt.Errorf("cmd: %d, handler is nil", msg.GetHeader().GetMsgID())
	}

	if msg.GetHeader().GetMsgID() != uint32(pb.ProtoCode_CreateRpc) && c.Role == nil {
		return c.Send(-101, msg.GetHeader().GetMsgID(), nil)
	}

	//调用协议号对应的逻辑函数
	f := md.(func(role *models.RoleModel, msg components.IMessage) (int32, interface{}))
	errCode, protoMsg := f(c.Role, msg)
	logger.Debug("rsp errcode: %d, protocode: %d", errCode, msg.GetHeader().GetMsgID())
	return c.SendMsg(errCode, msg.GetHeader().GetMsgID(), protoMsg)
}

func (c *Agent) OnTimer() {
	nextCheckTime := atomic.LoadInt64(&c.nextCheckTime)
	now := common.Timex()
	if now >= nextCheckTime {
		//检查心跳
		c.checkHeartBeat(now)
		nextCheckTime = now + common.HeartTimerInterval
		atomic.StoreInt64(&c.nextCheckTime, nextCheckTime)
	}

	if c.Role != nil {
		//role 恢复数据
		c.Role.OnRecoverTimer(now)
	}
}

func (c *Agent) OnClose() {
	c.IConnection = nil
	c.Role = nil
	agentPool.Put(c)

	if c.Role == nil {
		return
	}

	logger.Debug("ID: %d close, roleid", c.IConnection.GetID(), c.Role.Data.Id)
	c.Server.GetConnManage().DelRID(c.Role.Data.Id)
	c.Server.GetConnManage().DelUID(c.Role.Data.Uid)
	c.Role.OnOfflineEvent()
}

func (c *Agent) checkHeartBeat(now int64) {
	lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime)
	//logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.GetID(), lastHeartCheckTime, now)
	if math.Abs(float64(lastHeartCheckTime-now)) > common.HeartTimerInterval {
		c.heartTimeoutCount++
		if c.heartTimeoutCount >= common.HeartTimeoutCountMax {
			c.Stop()
			return
		}
		//logger.Debug("timeout count: %d", c.heartTimeoutCount)
	} else {
		c.heartTimeoutCount = 0
	}
}