Blame view

common/components/conn.go 2.29 KB
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
  package components
  
  import (
  	"bufio"
  	"errors"
  	"net"
  	"pro2d/common"
  	"pro2d/utils/logger"
  	"time"
  )
  
  
  type Connection struct {
  	IConnection
  	net.Conn
  	Server IServer
  	Id     int
  
  	scanner 		*bufio.Scanner
  	writer 			*bufio.Writer
  	WBuffer 		chan []byte
  	Quit 			chan *Connection
  
  	messageCallback    MessageCallback
  	connectionCallback ConnectionCallback
  	closeCallback      CloseCallback
  	timerCallback      TimerCallback
  }
  
  func NewConn(id int, conn net.Conn, s IServer) *Connection {
  	return &Connection{
  		Id: id,
  		Conn: conn,
  		Server: s,
  
  		scanner:            bufio.NewScanner(conn),
  		writer:             bufio.NewWriter(conn),
  		WBuffer:            make(chan []byte, common.MaxMsgChan),
  		Quit:               make(chan *Connection),
  	}
  }
  
  func (c *Connection) GetID() int {
  	return c.Id
  }
  
  func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
  	c.connectionCallback = cb
  }
  
  func (c *Connection) SetMessageCallback(cb MessageCallback) {
  	c.messageCallback = cb
  }
  
  func (c *Connection) SetCloseCallback(cb CloseCallback) {
  	c.closeCallback = cb
  }
  
  func (c *Connection) write()  {
  	defer c.Stop()
  
  	for msg := range c.WBuffer {
  		n, err := c.writer.Write(msg)
  		if err != nil{
  			logger.Error("write fail err: " + err.Error(), "n: ", n)
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
  			logger.Error("write Flush fail err: " + err.Error())
  			return
  		}
  	}
  }
  
  func (c *Connection) read() {
  	defer c.Stop()
  	c.scanner.Split(c.Server.GetSplitter().ParseMsg)
  
  	for c.scanner.Scan() {
  		req, err := c.Server.GetSplitter().UnPack(c.scanner.Bytes())
  		if err != nil {
  			return
  		}
  
  		req.SetSessId(c.Id)
  		c.messageCallback(req)
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		logger.Error("scanner.err: %s", err.Error())
  		return
  	}
  }
  
  func (c *Connection) Start()  {
  	c.connectionCallback(c)
  	go c.write()
  	go c.read()
  }
  
  func (c *Connection) Stop()  {
  	logger.Debug("ID: %d close", c.Id)
  	c.Conn.Close()
  	c.closeCallback(c)
  }
  
  func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error{
  	buf, err := c.Server.GetSplitter().Pack(cmd, data, errCode, 0)
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return errors.New("send buff msg timeout")
  	case c.WBuffer <- buf:
  		return nil
  	}
  }