Blame view

common/components/conn.go 5.05 KB
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
1
2
3
4
  package components
  
  import (
  	"bufio"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
5
  	"fmt"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
6
7
  	"net"
  	"pro2d/common"
765431a4   zhangqijia   增加schema接口, 抽象 mo...
8
  	"pro2d/common/logger"
58e37bfe   zhangqijia   add sync.Pool to ...
9
  	"sync"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
10
  	"sync/atomic"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
11
12
13
  	"time"
  )
  
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
14
15
16
  type Connection struct {
  	IConnection
  	net.Conn
54b3f133   zhangqijia   add connector int...
17
  	splitter ISplitter
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
18
  	Id       uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
19
  
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
20
21
22
23
24
25
26
  	scanner       *bufio.Scanner
  	writer        *bufio.Writer
  	WBuffer       chan []byte
  	Quit          chan *Connection
  	readFunc      chan func()
  	timerFunc     chan func()
  	customizeFunc chan func()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
27
28
29
30
31
  
  	messageCallback    MessageCallback
  	connectionCallback ConnectionCallback
  	closeCallback      CloseCallback
  	timerCallback      TimerCallback
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
32
33
  
  	Status uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
34
35
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
36
  var connectionPool = &sync.Pool{
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
37
  	New: func() interface{} { return new(Connection) },
58e37bfe   zhangqijia   add sync.Pool to ...
38
39
  }
  
54b3f133   zhangqijia   add connector int...
40
  func NewConn(id int, conn net.Conn, splitter ISplitter) IConnection {
58e37bfe   zhangqijia   add sync.Pool to ...
41
42
43
44
45
  	c := connectionPool.Get().(*Connection)
  	closed := atomic.LoadUint32(&c.Status)
  	if closed != 0 {
  		connectionPool.Put(c)
  		c = new(Connection)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
46
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
47
48
49
  
  	atomic.StoreUint32(&c.Id, uint32(id))
  	c.Conn = conn
54b3f133   zhangqijia   add connector int...
50
  	c.splitter = splitter
58e37bfe   zhangqijia   add sync.Pool to ...
51
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
52
53
  	c.scanner = bufio.NewScanner(conn)
  	c.writer = bufio.NewWriter(conn)
58e37bfe   zhangqijia   add sync.Pool to ...
54
55
56
  
  	c.reset()
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
57
  	return c
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
58
59
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
60
  func (c *Connection) reset() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
61
62
  	c.WBuffer = make(chan []byte, common.MaxMsgChan)
  	c.Quit = make(chan *Connection)
58e37bfe   zhangqijia   add sync.Pool to ...
63
64
  
  	if c.readFunc == nil {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
65
  		c.readFunc = make(chan func(), 10)
58e37bfe   zhangqijia   add sync.Pool to ...
66
67
68
69
  	}
  	if c.timerFunc == nil {
  		c.timerFunc = make(chan func(), 10)
  	}
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
70
71
72
  	if c.customizeFunc == nil {
  		c.customizeFunc = make(chan func(), 10)
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
73
74
75
76
77
78
79
80
81
  
  	//c.connectionCallback	= c.defaultConnectionCallback
  	//c.messageCallback 		= c.defaultMessageCallback
  	//c.closeCallback 		= c.defaultCloseCallback
  	//c.timerCallback 		= c.defaultTimerCallback
  }
  
  func (c *Connection) GetID() uint32 {
  	return atomic.LoadUint32(&c.Id)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
82
83
84
85
86
87
88
89
90
91
92
93
94
95
  }
  
  func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
  	c.connectionCallback = cb
  }
  
  func (c *Connection) SetMessageCallback(cb MessageCallback) {
  	c.messageCallback = cb
  }
  
  func (c *Connection) SetCloseCallback(cb CloseCallback) {
  	c.closeCallback = cb
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
96
97
98
99
  func (c *Connection) SetTimerCallback(cb TimerCallback) {
  	c.timerCallback = cb
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
100
  func (c *Connection) Start() {
77f5eec7   zhangqijia   plugin 插件热更 接口
101
102
103
104
  	go c.write()
  	go c.read()
  	go c.listen()
  
58e37bfe   zhangqijia   add sync.Pool to ...
105
  	atomic.StoreUint32(&c.Status, 1)
77f5eec7   zhangqijia   plugin 插件热更 接口
106
107
108
109
  	c.connectionCallback(c)
  	c.handleTimeOut()
  }
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
110
  func (c *Connection) Stop() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
111
112
113
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
114
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
115
116
117
118
119
120
121
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return
  	case c.Quit <- c:
77f5eec7   zhangqijia   plugin 插件热更 接口
122
123
  		return
  	}
77f5eec7   zhangqijia   plugin 插件热更 接口
124
125
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
126
  func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error {
54b3f133   zhangqijia   add connector int...
127
  	buf, err := c.splitter.Pack(cmd, data, errCode, 0)
77f5eec7   zhangqijia   plugin 插件热更 接口
128
129
130
131
132
133
134
135
136
137
138
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
c8509ef6   zhangqijia   fix: notify equip...
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
  		return nil
  	}
  }
  
  func (c *Connection) SendSuccess(cmd uint32, data []byte) error {
  	buf, err := c.splitter.Pack(cmd, data, 0, 0)
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
77f5eec7   zhangqijia   plugin 插件热更 接口
156
157
158
159
  		return nil
  	}
  }
  
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
160
161
162
163
  func (c *Connection) CustomChan() chan<- func() {
  	return c.customizeFunc
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
164
  func (c *Connection) defaultConnectionCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
165
166
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
167
  func (c *Connection) defaultMessageCallback(msg IMessage) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
168
169
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
170
  func (c *Connection) defaultCloseCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
171
172
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
173
  func (c *Connection) defaultTimerCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
174
175
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
176
  func (c *Connection) write() {
58e37bfe   zhangqijia   add sync.Pool to ...
177
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
178
  		//logger.Debug("write close")
58e37bfe   zhangqijia   add sync.Pool to ...
179
180
  		c.Stop()
  	}()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
181
182
183
  
  	for msg := range c.WBuffer {
  		n, err := c.writer.Write(msg)
cd2f96ab   zhangqijia   fix: 优化连接管理
184
185
  		if err != nil {
  			logger.Error("write fail err: "+err.Error(), "n: ", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
186
187
188
189
190
191
192
193
194
195
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
  			logger.Error("write Flush fail err: " + err.Error())
  			return
  		}
  	}
  }
  
  func (c *Connection) read() {
58e37bfe   zhangqijia   add sync.Pool to ...
196
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
197
  		//logger.Debug("read close")
58e37bfe   zhangqijia   add sync.Pool to ...
198
199
200
  		c.Stop()
  	}()
  
54b3f133   zhangqijia   add connector int...
201
  	c.scanner.Split(c.splitter.ParseMsg)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
202
203
  
  	for c.scanner.Scan() {
54b3f133   zhangqijia   add connector int...
204
  		req, err := c.splitter.UnPack(c.scanner.Bytes())
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
205
206
207
208
  		if err != nil {
  			return
  		}
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
209
  		req.SetSID(c.GetID())
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
210
211
212
  		c.readFunc <- func() {
  			c.messageCallback(req)
  		}
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
213
214
215
216
217
218
219
220
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		logger.Error("scanner.err: %s", err.Error())
  		return
  	}
  }
  
77f5eec7   zhangqijia   plugin 插件热更 接口
221
  //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
cd2f96ab   zhangqijia   fix: 优化连接管理
222
  func (c *Connection) listen() {
58e37bfe   zhangqijia   add sync.Pool to ...
223
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
224
  		//logger.Debug("listen close")
58e37bfe   zhangqijia   add sync.Pool to ...
225
226
  		c.quitting()
  	}()
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
227
  
cd2f96ab   zhangqijia   fix: 优化连接管理
228
  	for {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
229
  		select {
cd2f96ab   zhangqijia   fix: 优化连接管理
230
  		case timerFunc := <-c.timerFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
231
  			timerFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
232
  		case readFunc := <-c.readFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
233
  			readFunc()
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
234
235
  		case customizeFunc := <-c.customizeFunc:
  			customizeFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
236
  		case <-c.Quit:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
237
238
239
240
241
  			return
  		}
  	}
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
242
  func (c *Connection) handleTimeOut() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
243
244
245
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
246
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
247
248
249
250
251
252
  	c.timerFunc <- func() {
  		c.timerCallback(c)
  	}
  	TimeOut(1*time.Second, c.handleTimeOut)
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
253
  func (c *Connection) quitting() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
254
255
256
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
765431a4   zhangqijia   增加schema接口, 抽象 mo...
257
258
259
260
261
  	atomic.StoreUint32(&c.Status, 0)
  
  	logger.Debug("ID: %d close", c.Id)
  	close(c.WBuffer)
  	close(c.Quit)
58e37bfe   zhangqijia   add sync.Pool to ...
262
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
263
264
  	c.Conn.Close()
  	c.closeCallback(c)
58e37bfe   zhangqijia   add sync.Pool to ...
265
266
267
  
  	//放回到对象池
  	connectionPool.Put(c)
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
268
  }