conn.go 3.73 KB
package net

import (
	"bufio"
	"fmt"
	"math"
	"net"
	"pro2d/src/common"
	"pro2d/src/components/logger"
	"pro2d/src/utils"
	"sync/atomic"
)

type Head struct {
	Length 		int32
	Cmd 		int32
	ErrCode 	int32
	PreField    int32
}


type Connection struct {
	net.Conn
	Id 				int
	Server 			*Server

	scanner 		*bufio.Scanner
	writer 			*bufio.Writer

	WBuffer 		chan []byte
	RBuffer 		chan *MsgPkg

	Quit 			chan *Connection

	nextCheckTime   int64 //下一次检查的时间
	lastHeartCheckTime int64 //最后收消息时间
	heartTimeoutCount  int   //超时次数
}

type MsgPkg struct {
	Head *Head
	Body []byte
	Conn *Connection
}

func NewConn(id int, conn net.Conn, s *Server) *Connection {
	return &Connection{
		Id: id,
		Conn: conn,
		Server: s,

		scanner:            bufio.NewScanner(conn),
		writer:             bufio.NewWriter(conn),
		WBuffer:            make(chan []byte),
		RBuffer:            make(chan *MsgPkg),
		Quit:               make(chan *Connection),
		lastHeartCheckTime: utils.Timex(),
		heartTimeoutCount: 0,
	}
}

func (c *Connection) write()  {
	defer c.Quiting()

	for msg := range c.WBuffer {
		n, err := c.writer.Write(msg)
		if err != nil{
			logger.Error("write fail err: " + err.Error(), "n: ", n)
			return
		}
		if err := c.writer.Flush(); err != nil {
			logger.Error("write Flush fail err: " + err.Error())
			return
		}
	}
}

func (c *Connection) read() {
	defer c.Quiting()
	c.scanner.Split(ParseMsg)

	for c.scanner.Scan() {
		req, err := DecodeMsg(c.scanner.Bytes())
		if err != nil {
			return
		}

		req.Conn = c
		//得到需要处理此条连接的workerID
		workerID := c.Id % c.Server.SConf.WorkerPoolSize
		//将请求消息发送给任务队列
		c.Server.TaskQueue[workerID] <- req

		atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex())

		//备注,可以在当前协程处理当条请求(如下, 实现很简单,已经删除),也可以丢到协程池里处理任务(如上),还未对比效果。
		//c.Server.OnRecv(req)
	}

	if err := c.scanner.Err(); err != nil {
		fmt.Printf("scanner.err: %s\n", err.Error())
		c.Quiting()
		return
	}
}


func (c *Connection) checkHeartBeat(now int64)  {
	lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime)
	logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.Id, lastHeartCheckTime, now)
	if math.Abs(float64(lastHeartCheckTime - now)) > common.HEART_TIMER_INTERVAL {
		c.heartTimeoutCount++
		if c.heartTimeoutCount >= common.HEART_TIMEOUT_COUNT_MAX {
			c.Quiting()
			return
		}
		logger.Debug("timeout count: %d", c.heartTimeoutCount)
	}else {
		c.heartTimeoutCount = 0
	}
}

func (c *Connection) update()  {
	nextCheckTime := atomic.LoadInt64(&c.nextCheckTime)
	now := utils.Timex()
	if now >= nextCheckTime {
		c.checkHeartBeat(now)
		nextCheckTime = now + common.HEART_TIMER_INTERVAL
		atomic.StoreInt64(&c.nextCheckTime, nextCheckTime)
	}
}
//
//func (c *Connection) SetLastHeartCheckTime()  {
//	now := utils.Timex()
//	lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime)
//	if now - lastHeartCheckTime < common.HEART_TIMER_INTERVAL {
//		logger.Debug("heart too quick")
//	}
//	atomic.StoreInt64(&c.lastHeartCheckTime, now)
//}

func (c *Connection) Start()  {
	go c.write()
	go c.read()
	//for {
	//	c.SendMsgByCode(100, 1, nil)
	//	time.Sleep(2*time.Second)
	//}
}

func (c *Connection) Stop()  {
	close(c.RBuffer)
	c.Conn.Close()
}

func (c *Connection) Quiting()  {
	logger.Debug("ID: %d close", c.Id)
	c.Server.OnClose(c)
}

func (c *Connection) SendMsgByCode(errCode int32, cmd int32, data []byte){
	h := &Head{
		Length:   int32(common.HEADLEN + len(data)),
		Cmd:      cmd,
		ErrCode:  errCode,
		PreField: 0,
	}
	pkg := &MsgPkg{
		Head: h,
		Body: data,
	}
	buf, err := EncodeMsg(pkg)
	if err != nil {
		logger.Error("SendMsg error: %v", err)
		return
	}
	c.WBuffer <- buf
}