package net import ( "bufio" "fmt" "math" "net" "pro2d/src/common" "pro2d/src/components/logger" "pro2d/src/models" "pro2d/src/utils" "sync/atomic" ) type Head struct { Length int32 Cmd int32 ErrCode int32 PreField int32 } type Connection struct { net.Conn Id int Server *Server scanner *bufio.Scanner writer *bufio.Writer WBuffer chan []byte updateFunc chan func() readFunc chan func() Quit chan *Connection Role *models.RoleModel nextCheckTime int64 //下一次检查的时间 lastHeartCheckTime int64 //最后收消息时间 heartTimeoutCount int //超时次数 } type MsgPkg struct { Head *Head Body []byte Conn *Connection } func NewConn(id int, conn net.Conn, s *Server) *Connection { return &Connection{ Id: id, Conn: conn, Server: s, scanner: bufio.NewScanner(conn), writer: bufio.NewWriter(conn), WBuffer: make(chan []byte), updateFunc: make(chan func()), readFunc: make(chan func()), Quit: make(chan *Connection), lastHeartCheckTime: utils.Timex(), heartTimeoutCount: 0, } } func (c *Connection) write() { defer c.Quiting() for msg := range c.WBuffer { n, err := c.writer.Write(msg) if err != nil{ logger.Error("write fail err: " + err.Error(), "n: ", n) return } if err := c.writer.Flush(); err != nil { logger.Error("write Flush fail err: " + err.Error()) return } } } func (c *Connection) read() { defer c.Quiting() c.scanner.Split(ParseMsg) for c.scanner.Scan() { req, err := DecodeMsg(c.scanner.Bytes()) if err != nil { return } req.Conn = c c.readFunc <- func() { c.Server.DoMsgHandler(req) } atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex()) //备注,可以在当前协程处理当条请求(如下, 实现很简单,已经删除),也可以丢到协程池里处理任务(如上),还未对比效果。 //c.Server.OnRecv(req) } if err := c.scanner.Err(); err != nil { fmt.Printf("scanner.err: %s\n", err.Error()) return } } func (c *Connection) checkHeartBeat(now int64) { lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime) logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.Id, lastHeartCheckTime, now) if math.Abs(float64(lastHeartCheckTime - now)) > common.HeartTimerInterval { c.heartTimeoutCount++ if c.heartTimeoutCount >= common.HeartTimeoutCountMax { c.Quiting() return } logger.Debug("timeout count: %d", c.heartTimeoutCount) }else { c.heartTimeoutCount = 0 } } func (c *Connection) update() { nextCheckTime := atomic.LoadInt64(&c.nextCheckTime) now := utils.Timex() if now >= nextCheckTime { //检查心跳 c.checkHeartBeat(now) nextCheckTime = now + common.HeartTimerInterval atomic.StoreInt64(&c.nextCheckTime, nextCheckTime) } c.updateFunc <- func() { if c.Role != nil { //role 恢复数据 c.Role.OnRecoverTimer(now) } } } func (c *Connection) flush() { defer c.Stop() for { select { case rf := <- c.readFunc: rf() case uf := <- c.updateFunc: uf() case <- c.Quit: return } } } func (c *Connection) Start() { go c.write() go c.read() c.flush() } func (c *Connection) Stop() { logger.Debug("ID: %d close", c.Id) c.Conn.Close() if c.Role != nil { c.Role.OnOfflineEvent() } c.Server.OnClose(c) } func (c *Connection) Quiting() { c.Quit <- c } func (c *Connection) SendMsgByCode(errCode int32, cmd int32, data []byte){ h := &Head{ Length: int32(common.HEADLEN + len(data)), Cmd: cmd, ErrCode: errCode, PreField: 0, } pkg := &MsgPkg{ Head: h, Body: data, } buf, err := EncodeMsg(pkg) if err != nil { logger.Error("SendMsg error: %v", err) return } c.WBuffer <- buf }