0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
1
2
3
4
|
package components
import (
"bufio"
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
5
|
"fmt"
|
927b2652
zhangqijia
feat: EmailNewNty...
|
6
|
"github.com/golang/protobuf/proto"
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
7
8
|
"net"
"pro2d/common"
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
9
|
"pro2d/common/logger"
|
aa487faa
zhangqijia
null
|
10
|
"pro2d/pb"
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
11
|
"sync"
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
12
|
"sync/atomic"
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
13
14
15
|
"time"
)
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
16
17
18
|
type Connection struct {
IConnection
net.Conn
|
54b3f133
zhangqijia
add connector int...
|
19
|
splitter ISplitter
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
20
|
Id uint32
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
21
|
|
8aaf28dd
zhangqijia
fix: 修复gm系统修改role...
|
22
23
24
25
26
27
28
|
scanner *bufio.Scanner
writer *bufio.Writer
WBuffer chan []byte
Quit chan *Connection
readFunc chan func()
timerFunc chan func()
customizeFunc chan func()
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
29
30
31
32
33
|
messageCallback MessageCallback
connectionCallback ConnectionCallback
closeCallback CloseCallback
timerCallback TimerCallback
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
34
35
|
Status uint32
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
36
37
|
}
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
38
|
var connectionPool = &sync.Pool{
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
39
|
New: func() interface{} { return new(Connection) },
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
40
41
|
}
|
54b3f133
zhangqijia
add connector int...
|
42
|
func NewConn(id int, conn net.Conn, splitter ISplitter) IConnection {
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
43
|
c := connectionPool.Get().(*Connection)
|
495e9142
zhangqijia
fix: 增加DisConnect...
|
44
45
|
status := atomic.LoadUint32(&c.Status)
if status != 0 {
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
46
47
|
connectionPool.Put(c)
c = new(Connection)
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
48
|
}
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
49
50
51
|
atomic.StoreUint32(&c.Id, uint32(id))
c.Conn = conn
|
54b3f133
zhangqijia
add connector int...
|
52
|
c.splitter = splitter
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
53
|
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
54
55
|
c.scanner = bufio.NewScanner(conn)
c.writer = bufio.NewWriter(conn)
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
56
57
58
|
c.reset()
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
59
|
return c
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
60
61
|
}
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
62
|
func (c *Connection) reset() {
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
63
64
|
c.WBuffer = make(chan []byte, common.MaxMsgChan)
c.Quit = make(chan *Connection)
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
65
66
|
if c.readFunc == nil {
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
67
|
c.readFunc = make(chan func(), 10)
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
68
69
70
71
|
}
if c.timerFunc == nil {
c.timerFunc = make(chan func(), 10)
}
|
8aaf28dd
zhangqijia
fix: 修复gm系统修改role...
|
72
73
74
|
if c.customizeFunc == nil {
c.customizeFunc = make(chan func(), 10)
}
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
75
76
77
78
79
80
81
82
83
|
//c.connectionCallback = c.defaultConnectionCallback
//c.messageCallback = c.defaultMessageCallback
//c.closeCallback = c.defaultCloseCallback
//c.timerCallback = c.defaultTimerCallback
}
func (c *Connection) GetID() uint32 {
return atomic.LoadUint32(&c.Id)
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
}
func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
c.connectionCallback = cb
}
func (c *Connection) SetMessageCallback(cb MessageCallback) {
c.messageCallback = cb
}
func (c *Connection) SetCloseCallback(cb CloseCallback) {
c.closeCallback = cb
}
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
98
99
100
101
|
func (c *Connection) SetTimerCallback(cb TimerCallback) {
c.timerCallback = cb
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
102
|
func (c *Connection) Start() {
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
103
104
105
106
|
go c.write()
go c.read()
go c.listen()
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
107
|
atomic.StoreUint32(&c.Status, 1)
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
108
109
110
111
|
c.connectionCallback(c)
c.handleTimeOut()
}
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
112
|
func (c *Connection) Stop() {
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
113
114
115
|
if atomic.LoadUint32(&c.Status) == 0 {
return
}
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
116
|
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
117
118
119
120
121
122
123
|
sendTimeout := time.NewTimer(5 * time.Millisecond)
defer sendTimeout.Stop()
// 发送超时
select {
case <-sendTimeout.C:
return
case c.Quit <- c:
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
124
125
|
return
}
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
126
127
|
}
|
8568cf44
zhangqijia
update preserve
|
128
129
|
func (c *Connection) Send(errCode int32, cmd uint32, data []byte, preserve uint32) error {
buf, err := c.splitter.Pack(cmd, data, errCode, preserve)
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
130
131
132
133
134
135
136
137
138
139
140
|
if err != nil {
return err
}
sendTimeout := time.NewTimer(5 * time.Millisecond)
defer sendTimeout.Stop()
// 发送超时
select {
case <-sendTimeout.C:
return fmt.Errorf("send buff msg timeout")
case c.WBuffer <- buf:
|
c8509ef6
zhangqijia
fix: notify equip...
|
141
142
143
144
|
return nil
}
}
|
aa487faa
zhangqijia
null
|
145
|
func (c *Connection) SendPB(errCode int32, cmd pb.ProtoCode, data proto.Message, preserve uint32) error {
|
927b2652
zhangqijia
feat: EmailNewNty...
|
146
147
148
149
|
pbData, err := proto.Marshal(data)
if err != nil {
return err
}
|
aa487faa
zhangqijia
null
|
150
|
buf, err := c.splitter.Pack(uint32(cmd), pbData, errCode, preserve)
|
927b2652
zhangqijia
feat: EmailNewNty...
|
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
if err != nil {
return err
}
sendTimeout := time.NewTimer(5 * time.Millisecond)
defer sendTimeout.Stop()
// 发送超时
select {
case <-sendTimeout.C:
return fmt.Errorf("send buff msg timeout")
case c.WBuffer <- buf:
return nil
}
}
|
8568cf44
zhangqijia
update preserve
|
166
167
|
func (c *Connection) SendSuccess(cmd uint32, data []byte, preserve uint32) error {
buf, err := c.splitter.Pack(cmd, data, 0, preserve)
|
c8509ef6
zhangqijia
fix: notify equip...
|
168
169
170
171
172
173
174
175
176
177
178
|
if err != nil {
return err
}
sendTimeout := time.NewTimer(5 * time.Millisecond)
defer sendTimeout.Stop()
// 发送超时
select {
case <-sendTimeout.C:
return fmt.Errorf("send buff msg timeout")
case c.WBuffer <- buf:
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
179
180
181
182
|
return nil
}
}
|
8aaf28dd
zhangqijia
fix: 修复gm系统修改role...
|
183
184
185
186
|
func (c *Connection) CustomChan() chan<- func() {
return c.customizeFunc
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
187
|
func (c *Connection) defaultConnectionCallback(conn IConnection) {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
188
189
|
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
190
|
func (c *Connection) defaultMessageCallback(msg IMessage) {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
191
192
|
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
193
|
func (c *Connection) defaultCloseCallback(conn IConnection) {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
194
195
|
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
196
|
func (c *Connection) defaultTimerCallback(conn IConnection) {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
197
198
|
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
199
|
func (c *Connection) write() {
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
200
|
defer func() {
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
201
|
//logger.Debug("write close")
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
202
203
|
c.Stop()
}()
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
204
205
206
|
for msg := range c.WBuffer {
n, err := c.writer.Write(msg)
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
207
208
|
if err != nil {
logger.Error("write fail err: "+err.Error(), "n: ", n)
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
209
210
211
212
213
214
|
return
}
if err := c.writer.Flush(); err != nil {
logger.Error("write Flush fail err: " + err.Error())
return
}
|
f74e34e3
zhangqijia
fix: 批量删除装备
|
215
|
logger.Debug("write n: %d", n)
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
216
217
218
219
|
}
}
func (c *Connection) read() {
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
220
|
defer func() {
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
221
|
//logger.Debug("read close")
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
222
223
224
|
c.Stop()
}()
|
54b3f133
zhangqijia
add connector int...
|
225
|
c.scanner.Split(c.splitter.ParseMsg)
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
226
227
|
for c.scanner.Scan() {
|
54b3f133
zhangqijia
add connector int...
|
228
|
req, err := c.splitter.UnPack(c.scanner.Bytes())
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
229
230
231
232
|
if err != nil {
return
}
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
233
|
req.SetSID(c.GetID())
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
234
235
236
|
c.readFunc <- func() {
c.messageCallback(req)
}
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
237
238
239
240
241
242
243
244
|
}
if err := c.scanner.Err(); err != nil {
logger.Error("scanner.err: %s", err.Error())
return
}
}
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
245
|
//此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
246
|
func (c *Connection) listen() {
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
247
|
defer func() {
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
248
|
//logger.Debug("listen close")
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
249
250
|
c.quitting()
}()
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
251
|
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
252
|
for {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
253
|
select {
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
254
|
case timerFunc := <-c.timerFunc:
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
255
|
timerFunc()
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
256
|
case readFunc := <-c.readFunc:
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
257
|
readFunc()
|
8aaf28dd
zhangqijia
fix: 修复gm系统修改role...
|
258
259
|
case customizeFunc := <-c.customizeFunc:
customizeFunc()
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
260
|
case <-c.Quit:
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
261
262
263
264
265
|
return
}
}
}
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
266
|
func (c *Connection) handleTimeOut() {
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
267
268
269
|
if atomic.LoadUint32(&c.Status) == 0 {
return
}
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
270
|
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
271
272
273
274
275
276
|
c.timerFunc <- func() {
c.timerCallback(c)
}
TimeOut(1*time.Second, c.handleTimeOut)
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
277
|
func (c *Connection) quitting() {
|
eadc9aff
zhangqijia
feat: 增加上阵下阵协议,增加...
|
278
279
280
|
if atomic.LoadUint32(&c.Status) == 0 {
return
}
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
281
282
|
atomic.StoreUint32(&c.Status, 0)
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
283
284
|
close(c.WBuffer)
close(c.Quit)
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
285
|
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
286
287
|
c.Conn.Close()
c.closeCallback(c)
|
58e37bfe
zhangqijia
add sync.Pool to ...
|
288
289
290
|
//放回到对象池
connectionPool.Put(c)
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
291
|
}
|