package service import ( "fmt" "github.com/golang/protobuf/proto" "math" "pro2d/common" "pro2d/common/components" "pro2d/common/logger" "pro2d/models" "pro2d/pb" "sync" "sync/atomic" ) type Agent struct { components.IConnection Server components.IServer components.IAgent Role *models.RoleModel nextCheckTime int64 //下一次检查的时间 lastHeartCheckTime int64 heartTimeoutCount int //超时次数 } var agentPool = sync.Pool{New: func() interface{} { return new(Agent) }} func NewAgent(s components.IServer) *Agent { a := agentPool.Get().(*Agent) a.Server = s a.nextCheckTime = 0 a.lastHeartCheckTime = common.Timex() a.heartTimeoutCount = 0 return a } func (c *Agent) SetSchema(schema components.ISchema) { c.Role = schema.(*models.RoleModel) c.Role.SetConn(c) c.Server.GetConnManage().AddRID(c.Role.Data.Id, c.IConnection.GetID()) c.Server.GetConnManage().AddUID(c.Role.Data.Uid, c.IConnection.GetID()) } func (c *Agent) GetSchema() components.ISchema { return c.Role } func (c *Agent) SetServer(server components.IServer) { c.Server = server } func (c *Agent) GetServer() components.IServer { return c.Server } func (c *Agent) OnConnection(conn components.IConnection) { c.IConnection = conn } /*OnLoginQuery 登录请求 2 角色不存在 */ func (c *Agent) OnLoginQuery(msg components.IMessage) (int32, proto.Message) { req := pb.LoginReq{} if err := proto.Unmarshal(msg.GetData(), &req); err != nil { logger.Error("loginRpc err: %v", err) return 1, nil } uid := req.Token // 判断是否已经登录 ,挤掉,断开连接 conn := c.Server.GetConnManage().GetConnByUID(uid) if conn != nil { logger.Debug("挤掉。。。。。。。。") conn.SendSuccess(uint32(pb.ProtoCode_DisConnectNty), nil, msg.GetHeader().GetPreserve()) conn.Stop() } role := models.RoleExistByUid(uid) if role == nil { return 2, nil } role.SetProperty("Device", req.Device) protoMsg := &pb.LoginRsp{ Role: role.Data, Hero: role.GetAllHero(), Team: role.GetAllTeam(), Equipments: role.GetEquipments(), } //登录成功,存储agent role c.SetSchema(role) return 0, protoMsg } func (c *Agent) SendMsg(errCode int32, cmd uint32, msg interface{}, preserve uint32) error { if msg == nil || errCode != 0 { return c.Send(errCode, cmd, nil, preserve) } rsp, err := proto.Marshal(msg.(proto.Message)) if err != nil { return c.Send(-100, cmd, nil, preserve) } return c.Send(errCode, cmd, rsp, preserve) } func (c *Agent) OnMessage(msg components.IMessage) error { atomic.StoreInt64(&c.lastHeartCheckTime, common.Timex()) logger.Debug("req protocolID: %d, %s", msg.GetHeader().GetMsgID(), msg.GetData()) //heart if msg.GetHeader().GetMsgID() == uint32(pb.ProtoCode_HeartRpc) { return nil } //login if msg.GetHeader().GetMsgID() == uint32(pb.ProtoCode_LoginRpc) { code, protoMsg := c.OnLoginQuery(msg) return c.SendMsg(code, msg.GetHeader().GetMsgID(), protoMsg, msg.GetHeader().GetPreserve()) } //get handler by msgid method := c.Server.GetAction(msg.GetHeader().GetMsgID()) if method == nil { return fmt.Errorf("cmd: %d, handler is nil", msg.GetHeader().GetMsgID()) } if msg.GetHeader().GetMsgID() != uint32(pb.ProtoCode_CreateRpc) && c.Role == nil { return c.Send(-101, msg.GetHeader().GetMsgID(), nil, msg.GetHeader().GetPreserve()) } // 调用协议号对应的逻辑函数 f := method.(func(role *models.RoleModel, msg components.IMessage) (int32, interface{})) errCode, protoMsg := f(c.Role, msg) logger.Debug("rsp errcode: %d, protocode: %d", errCode, msg.GetHeader().GetMsgID()) return c.SendMsg(errCode, msg.GetHeader().GetMsgID(), protoMsg, msg.GetHeader().GetPreserve()) } func (c *Agent) OnTimer() { nextCheckTime := atomic.LoadInt64(&c.nextCheckTime) now := common.Timex() if now >= nextCheckTime { //检查心跳 c.checkHeartBeat(now) nextCheckTime = now + common.HeartTimerInterval atomic.StoreInt64(&c.nextCheckTime, nextCheckTime) } if c.Role != nil { //role 恢复数据 c.Role.OnRecoverTimer(now) } } func (c *Agent) OnClose() { c.IConnection = nil c.Role = nil agentPool.Put(c) if c.Role == nil { return } logger.Debug("ID: %d close, roleid", c.IConnection.GetID(), c.Role.Data.Id) c.Server.GetConnManage().DelRID(c.Role.Data.Id) c.Server.GetConnManage().DelUID(c.Role.Data.Uid) c.Role.OnOfflineEvent() } func (c *Agent) checkHeartBeat(now int64) { lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime) //logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.GetID(), lastHeartCheckTime, now) if math.Abs(float64(lastHeartCheckTime-now)) > common.HeartTimerInterval { c.heartTimeoutCount++ if c.heartTimeoutCount >= common.HeartTimeoutCountMax { c.Stop() return } //logger.Debug("timeout count: %d", c.heartTimeoutCount) } else { c.heartTimeoutCount = 0 } }