package main import ( "fmt" "github.com/golang/protobuf/proto" "math" "pro2d/common" "pro2d/common/components" "pro2d/models" "pro2d/pb" "pro2d/utils" "pro2d/utils/logger" "sync/atomic" ) type Agent struct { components.IConnection Server components.IServer Role *models.RoleModel readFunc chan func() timerFunc chan func() Quit chan *Agent nextCheckTime int64 //下一次检查的时间 lastHeartCheckTime int64 heartTimeoutCount int //超时次数 } func NewAgent(s components.IServer) *Agent { return &Agent{ Server: s, readFunc: make(chan func(), 10), timerFunc: make(chan func(), 10), Quit: make(chan *Agent), nextCheckTime: 0, lastHeartCheckTime: utils.Timex(), heartTimeoutCount: 0, } } func (c *Agent) listen() { defer c.Close() for { select { case timerFunc := <- c.timerFunc: timerFunc() case readFunc := <- c.readFunc: readFunc() case <- c.Quit: return } } } func (c *Agent) OnConnection(conn components.IConnection) { c.IConnection = conn go c.listen() } func (c *Agent) OnMessage(msg components.IMessage) { f := func() { atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex()) if md, ok := components.ActionMap[pb.ProtoCode(msg.GetHeader().GetMsgID())]; ok { logger.Debug("protocode handler: %d", msg.GetHeader().GetMsgID()) errCode, protomsg := md(msg) rsp, err := proto.Marshal(protomsg) fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg) if err != nil { conn := c.Server.GetIConnection(msg.GetSessId()) if conn != nil { conn.Send(-100, msg.GetHeader().GetMsgID(), nil) } return } conn := c.Server.GetIConnection(msg.GetSessId()) if conn != nil { conn.Send(errCode, msg.GetHeader().GetMsgID(), rsp) } return } logger.Error("protocode not handler: %d", msg.GetHeader().GetMsgID()) } c.readFunc <- f } func (c *Agent) OnClose() { c.Quit <- c } func (c *Agent) Close() { if c.Role == nil { return } c.Role.OnOfflineEvent() } func (c *Agent) checkHeartBeat(now int64) { lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime) logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.GetID(), lastHeartCheckTime, now) if math.Abs(float64(lastHeartCheckTime - now)) > common.HeartTimerInterval { c.heartTimeoutCount++ if c.heartTimeoutCount >= common.HeartTimeoutCountMax { c.Stop() return } logger.Debug("timeout count: %d", c.heartTimeoutCount) }else { c.heartTimeoutCount = 0 } } func (c *Agent) update() { nextCheckTime := atomic.LoadInt64(&c.nextCheckTime) now := utils.Timex() if now >= nextCheckTime { //检查心跳 c.checkHeartBeat(now) nextCheckTime = now + common.HeartTimerInterval atomic.StoreInt64(&c.nextCheckTime, nextCheckTime) } c.timerFunc <- func() { if c.Role != nil { //role 恢复数据 c.Role.OnRecoverTimer(now) } } }