package components import ( "bufio" "fmt" "net" "pro2d/common" "pro2d/common/logger" "sync/atomic" "time" ) type Connection struct { IConnection net.Conn Server IServer Id int scanner *bufio.Scanner writer *bufio.Writer WBuffer chan []byte Quit chan *Connection readFunc chan func() timerFunc chan func() messageCallback MessageCallback connectionCallback ConnectionCallback closeCallback CloseCallback timerCallback TimerCallback Status uint32 } func NewConn(id int, conn net.Conn, s IServer) *Connection { c := &Connection{ Id: id, Conn: conn, Server: s, scanner: bufio.NewScanner(conn), writer: bufio.NewWriter(conn), WBuffer: make(chan []byte, common.MaxMsgChan), Quit: make(chan *Connection), readFunc: make(chan func(), 10), timerFunc: make(chan func(), 10), Status: 0, } c.connectionCallback = c.defaultConnectionCallback c.messageCallback = c.defaultMessageCallback c.closeCallback = c.defaultCloseCallback c.timerCallback = c.defaultTimerCallback return c } func (c *Connection) GetID() int { return c.Id } func (c *Connection) SetConnectionCallback(cb ConnectionCallback) { c.connectionCallback = cb } func (c *Connection) SetMessageCallback(cb MessageCallback) { c.messageCallback = cb } func (c *Connection) SetCloseCallback(cb CloseCallback) { c.closeCallback = cb } func (c *Connection) SetTimerCallback(cb TimerCallback) { c.timerCallback = cb } func (c *Connection) Start() { go c.write() go c.read() go c.listen() c.Status = 1 c.connectionCallback(c) c.handleTimeOut() } func (c *Connection) Stop() { sendTimeout := time.NewTimer(5 * time.Millisecond) defer sendTimeout.Stop() // 发送超时 select { case <-sendTimeout.C: return case c.Quit <- c: return } } func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error { buf, err := c.Server.GetSplitter().Pack(cmd, data, errCode, 0) if err != nil { return err } sendTimeout := time.NewTimer(5 * time.Millisecond) defer sendTimeout.Stop() // 发送超时 select { case <-sendTimeout.C: return fmt.Errorf("send buff msg timeout") case c.WBuffer <- buf: return nil } } func (c *Connection) defaultConnectionCallback(conn IConnection) { } func (c *Connection) defaultMessageCallback(msg IMessage) { } func (c *Connection) defaultCloseCallback(conn IConnection) { } func (c *Connection) defaultTimerCallback(conn IConnection) { } func (c *Connection) write() { defer c.quitting() for msg := range c.WBuffer { n, err := c.writer.Write(msg) if err != nil { logger.Error("write fail err: "+err.Error(), "n: ", n) return } if err := c.writer.Flush(); err != nil { logger.Error("write Flush fail err: " + err.Error()) return } } } func (c *Connection) read() { defer c.quitting() c.scanner.Split(c.Server.GetSplitter().ParseMsg) for c.scanner.Scan() { req, err := c.Server.GetSplitter().UnPack(c.scanner.Bytes()) if err != nil { return } req.SetSession(c) c.readFunc <- func() { c.messageCallback(req) } } if err := c.scanner.Err(); err != nil { logger.Error("scanner.err: %s", err.Error()) return } } //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。 func (c *Connection) listen() { defer c.quitting() for { select { case timerFunc := <-c.timerFunc: timerFunc() case readFunc := <-c.readFunc: readFunc() case <-c.Quit: return } } } func (c *Connection) handleTimeOut() { c.timerFunc <- func() { c.timerCallback(c) } TimeOut(1*time.Second, c.handleTimeOut) } func (c *Connection) quitting() { closed := atomic.LoadUint32(&c.Status) if closed == 0 { return } atomic.StoreUint32(&c.Status, 0) logger.Debug("ID: %d close", c.Id) close(c.WBuffer) close(c.Quit) close(c.readFunc) c.Conn.Close() c.closeCallback(c) }