agent.go 2.82 KB
package main

import (
	"fmt"
	"github.com/golang/protobuf/proto"
	"math"
	"pro2d/common"
	"pro2d/common/components"
	"pro2d/models"
	"pro2d/pb"
	"pro2d/utils"
	"pro2d/utils/logger"
	"sync/atomic"
)

type Agent struct {
	components.IConnection
	Server components.IServer

	Role			*models.RoleModel

	readFunc  chan func()
	timerFunc chan func()
	Quit	  chan *Agent

	nextCheckTime   	int64 //下一次检查的时间
	lastHeartCheckTime 	int64
	heartTimeoutCount  	int   //超时次数
}

func NewAgent(s components.IServer) *Agent {
	return &Agent{
		Server:    s,
		readFunc:  make(chan func(), 10),
		timerFunc: make(chan func(), 10),
		Quit: make(chan *Agent),

		nextCheckTime:      0,
		lastHeartCheckTime: utils.Timex(),
		heartTimeoutCount:  0,
	}
}

func (c *Agent) listen()  {
	defer c.Close()
	for  {
		select {
		case timerFunc := <- c.timerFunc:
			timerFunc()
		case readFunc := <- c.readFunc:
			readFunc()
		case <- c.Quit:
			return
		}
	}
}

func (c *Agent) OnConnection(conn components.IConnection)  {
	c.IConnection = conn
	go c.listen()
}

func (c *Agent) OnMessage(msg components.IMessage)  {
	f := func() {
		atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex())
		if md, ok := components.ActionMap[pb.ProtoCode(msg.GetHeader().GetMsgID())]; ok {
			logger.Debug("protocode handler: %d", msg.GetHeader().GetMsgID())
			errCode, protomsg := md(msg)
			rsp, err := proto.Marshal(protomsg)
			fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg)
			if err != nil {
				conn := c.Server.GetIConnection(msg.GetSessId())
				if conn != nil {
					conn.Send(-100, msg.GetHeader().GetMsgID(), nil)
				}
				return
			}
			conn := c.Server.GetIConnection(msg.GetSessId())
			if conn != nil {
				conn.Send(errCode, msg.GetHeader().GetMsgID(), rsp)
			}
			return
		}
		logger.Error("protocode not handler: %d", msg.GetHeader().GetMsgID())
	}
	c.readFunc <- f
}

func (c *Agent) OnClose()  {
	c.Quit <- c
}

func (c *Agent) Close()  {
	if c.Role == nil {
		return
	}

	c.Role.OnOfflineEvent()
}

func (c *Agent) checkHeartBeat(now int64)  {
	lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime)
	logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.GetID(), lastHeartCheckTime, now)
	if math.Abs(float64(lastHeartCheckTime - now)) > common.HeartTimerInterval {
		c.heartTimeoutCount++
		if c.heartTimeoutCount >= common.HeartTimeoutCountMax {
			c.Stop()
			return
		}
		logger.Debug("timeout count: %d", c.heartTimeoutCount)
	}else {
		c.heartTimeoutCount = 0
	}
}

func (c *Agent) update()  {
	nextCheckTime := atomic.LoadInt64(&c.nextCheckTime)
	now := utils.Timex()
	if now >= nextCheckTime {
		//检查心跳
		c.checkHeartBeat(now)
		nextCheckTime = now + common.HeartTimerInterval
		atomic.StoreInt64(&c.nextCheckTime, nextCheckTime)
	}

	c.timerFunc <- func() {
		if c.Role != nil {
			//role 恢复数据
			c.Role.OnRecoverTimer(now)
		}
	}
}