Blame view

common/components/conn.go 5.09 KB
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
1
2
3
4
  package components
  
  import (
  	"bufio"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
5
  	"fmt"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
6
7
  	"net"
  	"pro2d/common"
765431a4   zhangqijia   增加schema接口, 抽象 mo...
8
  	"pro2d/common/logger"
58e37bfe   zhangqijia   add sync.Pool to ...
9
  	"sync"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
10
  	"sync/atomic"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
11
12
13
  	"time"
  )
  
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
14
15
16
  type Connection struct {
  	IConnection
  	net.Conn
54b3f133   zhangqijia   add connector int...
17
  	splitter ISplitter
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
18
  	Id       uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
19
  
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
20
21
22
23
24
25
26
  	scanner       *bufio.Scanner
  	writer        *bufio.Writer
  	WBuffer       chan []byte
  	Quit          chan *Connection
  	readFunc      chan func()
  	timerFunc     chan func()
  	customizeFunc chan func()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
27
28
29
30
31
  
  	messageCallback    MessageCallback
  	connectionCallback ConnectionCallback
  	closeCallback      CloseCallback
  	timerCallback      TimerCallback
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
32
33
  
  	Status uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
34
35
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
36
  var connectionPool = &sync.Pool{
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
37
  	New: func() interface{} { return new(Connection) },
58e37bfe   zhangqijia   add sync.Pool to ...
38
39
  }
  
54b3f133   zhangqijia   add connector int...
40
  func NewConn(id int, conn net.Conn, splitter ISplitter) IConnection {
58e37bfe   zhangqijia   add sync.Pool to ...
41
  	c := connectionPool.Get().(*Connection)
495e9142   zhangqijia   fix: 增加DisConnect...
42
43
  	status := atomic.LoadUint32(&c.Status)
  	if status != 0 {
58e37bfe   zhangqijia   add sync.Pool to ...
44
45
  		connectionPool.Put(c)
  		c = new(Connection)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
46
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
47
48
49
  
  	atomic.StoreUint32(&c.Id, uint32(id))
  	c.Conn = conn
54b3f133   zhangqijia   add connector int...
50
  	c.splitter = splitter
58e37bfe   zhangqijia   add sync.Pool to ...
51
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
52
53
  	c.scanner = bufio.NewScanner(conn)
  	c.writer = bufio.NewWriter(conn)
58e37bfe   zhangqijia   add sync.Pool to ...
54
55
56
  
  	c.reset()
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
57
  	return c
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
58
59
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
60
  func (c *Connection) reset() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
61
62
  	c.WBuffer = make(chan []byte, common.MaxMsgChan)
  	c.Quit = make(chan *Connection)
58e37bfe   zhangqijia   add sync.Pool to ...
63
64
  
  	if c.readFunc == nil {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
65
  		c.readFunc = make(chan func(), 10)
58e37bfe   zhangqijia   add sync.Pool to ...
66
67
68
69
  	}
  	if c.timerFunc == nil {
  		c.timerFunc = make(chan func(), 10)
  	}
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
70
71
72
  	if c.customizeFunc == nil {
  		c.customizeFunc = make(chan func(), 10)
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
73
74
75
76
77
78
79
80
81
  
  	//c.connectionCallback	= c.defaultConnectionCallback
  	//c.messageCallback 		= c.defaultMessageCallback
  	//c.closeCallback 		= c.defaultCloseCallback
  	//c.timerCallback 		= c.defaultTimerCallback
  }
  
  func (c *Connection) GetID() uint32 {
  	return atomic.LoadUint32(&c.Id)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
82
83
84
85
86
87
88
89
90
91
92
93
94
95
  }
  
  func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
  	c.connectionCallback = cb
  }
  
  func (c *Connection) SetMessageCallback(cb MessageCallback) {
  	c.messageCallback = cb
  }
  
  func (c *Connection) SetCloseCallback(cb CloseCallback) {
  	c.closeCallback = cb
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
96
97
98
99
  func (c *Connection) SetTimerCallback(cb TimerCallback) {
  	c.timerCallback = cb
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
100
  func (c *Connection) Start() {
77f5eec7   zhangqijia   plugin 插件热更 接口
101
102
103
104
  	go c.write()
  	go c.read()
  	go c.listen()
  
58e37bfe   zhangqijia   add sync.Pool to ...
105
  	atomic.StoreUint32(&c.Status, 1)
77f5eec7   zhangqijia   plugin 插件热更 接口
106
107
108
109
  	c.connectionCallback(c)
  	c.handleTimeOut()
  }
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
110
  func (c *Connection) Stop() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
111
112
113
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
114
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
115
116
117
118
119
120
121
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return
  	case c.Quit <- c:
77f5eec7   zhangqijia   plugin 插件热更 接口
122
123
  		return
  	}
77f5eec7   zhangqijia   plugin 插件热更 接口
124
125
  }
  
8568cf44   zhangqijia   update preserve
126
127
  func (c *Connection) Send(errCode int32, cmd uint32, data []byte, preserve uint32) error {
  	buf, err := c.splitter.Pack(cmd, data, errCode, preserve)
77f5eec7   zhangqijia   plugin 插件热更 接口
128
129
130
131
132
133
134
135
136
137
138
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
c8509ef6   zhangqijia   fix: notify equip...
139
140
141
142
  		return nil
  	}
  }
  
8568cf44   zhangqijia   update preserve
143
144
  func (c *Connection) SendSuccess(cmd uint32, data []byte, preserve uint32) error {
  	buf, err := c.splitter.Pack(cmd, data, 0, preserve)
c8509ef6   zhangqijia   fix: notify equip...
145
146
147
148
149
150
151
152
153
154
155
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
77f5eec7   zhangqijia   plugin 插件热更 接口
156
157
158
159
  		return nil
  	}
  }
  
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
160
161
162
163
  func (c *Connection) CustomChan() chan<- func() {
  	return c.customizeFunc
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
164
  func (c *Connection) defaultConnectionCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
165
166
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
167
  func (c *Connection) defaultMessageCallback(msg IMessage) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
168
169
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
170
  func (c *Connection) defaultCloseCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
171
172
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
173
  func (c *Connection) defaultTimerCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
174
175
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
176
  func (c *Connection) write() {
58e37bfe   zhangqijia   add sync.Pool to ...
177
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
178
  		//logger.Debug("write close")
58e37bfe   zhangqijia   add sync.Pool to ...
179
180
  		c.Stop()
  	}()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
181
182
183
  
  	for msg := range c.WBuffer {
  		n, err := c.writer.Write(msg)
cd2f96ab   zhangqijia   fix: 优化连接管理
184
185
  		if err != nil {
  			logger.Error("write fail err: "+err.Error(), "n: ", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
186
187
188
189
190
191
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
  			logger.Error("write Flush fail err: " + err.Error())
  			return
  		}
f74e34e3   zhangqijia   fix: 批量删除装备
192
  		logger.Debug("write n: %d", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
193
194
195
196
  	}
  }
  
  func (c *Connection) read() {
58e37bfe   zhangqijia   add sync.Pool to ...
197
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
198
  		//logger.Debug("read close")
58e37bfe   zhangqijia   add sync.Pool to ...
199
200
201
  		c.Stop()
  	}()
  
54b3f133   zhangqijia   add connector int...
202
  	c.scanner.Split(c.splitter.ParseMsg)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
203
204
  
  	for c.scanner.Scan() {
54b3f133   zhangqijia   add connector int...
205
  		req, err := c.splitter.UnPack(c.scanner.Bytes())
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
206
207
208
209
  		if err != nil {
  			return
  		}
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
210
  		req.SetSID(c.GetID())
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
211
212
213
  		c.readFunc <- func() {
  			c.messageCallback(req)
  		}
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
214
215
216
217
218
219
220
221
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		logger.Error("scanner.err: %s", err.Error())
  		return
  	}
  }
  
77f5eec7   zhangqijia   plugin 插件热更 接口
222
  //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
cd2f96ab   zhangqijia   fix: 优化连接管理
223
  func (c *Connection) listen() {
58e37bfe   zhangqijia   add sync.Pool to ...
224
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
225
  		//logger.Debug("listen close")
58e37bfe   zhangqijia   add sync.Pool to ...
226
227
  		c.quitting()
  	}()
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
228
  
cd2f96ab   zhangqijia   fix: 优化连接管理
229
  	for {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
230
  		select {
cd2f96ab   zhangqijia   fix: 优化连接管理
231
  		case timerFunc := <-c.timerFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
232
  			timerFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
233
  		case readFunc := <-c.readFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
234
  			readFunc()
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
235
236
  		case customizeFunc := <-c.customizeFunc:
  			customizeFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
237
  		case <-c.Quit:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
238
239
240
241
242
  			return
  		}
  	}
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
243
  func (c *Connection) handleTimeOut() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
244
245
246
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
247
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
248
249
250
251
252
253
  	c.timerFunc <- func() {
  		c.timerCallback(c)
  	}
  	TimeOut(1*time.Second, c.handleTimeOut)
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
254
  func (c *Connection) quitting() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
255
256
257
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
765431a4   zhangqijia   增加schema接口, 抽象 mo...
258
259
  	atomic.StoreUint32(&c.Status, 0)
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
260
261
  	close(c.WBuffer)
  	close(c.Quit)
58e37bfe   zhangqijia   add sync.Pool to ...
262
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
263
264
  	c.Conn.Close()
  	c.closeCallback(c)
58e37bfe   zhangqijia   add sync.Pool to ...
265
266
267
  
  	//放回到对象池
  	connectionPool.Put(c)
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
268
  }