Blame view

common/components/conn.go 5.6 KB
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
1
2
3
4
  package components
  
  import (
  	"bufio"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
5
  	"fmt"
927b2652   zhangqijia   feat: EmailNewNty...
6
  	"github.com/golang/protobuf/proto"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
7
8
  	"net"
  	"pro2d/common"
765431a4   zhangqijia   增加schema接口, 抽象 mo...
9
  	"pro2d/common/logger"
58e37bfe   zhangqijia   add sync.Pool to ...
10
  	"sync"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
11
  	"sync/atomic"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
12
13
14
  	"time"
  )
  
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
15
16
17
  type Connection struct {
  	IConnection
  	net.Conn
54b3f133   zhangqijia   add connector int...
18
  	splitter ISplitter
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
19
  	Id       uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
20
  
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
21
22
23
24
25
26
27
  	scanner       *bufio.Scanner
  	writer        *bufio.Writer
  	WBuffer       chan []byte
  	Quit          chan *Connection
  	readFunc      chan func()
  	timerFunc     chan func()
  	customizeFunc chan func()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
28
29
30
31
32
  
  	messageCallback    MessageCallback
  	connectionCallback ConnectionCallback
  	closeCallback      CloseCallback
  	timerCallback      TimerCallback
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
33
34
  
  	Status uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
35
36
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
37
  var connectionPool = &sync.Pool{
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
38
  	New: func() interface{} { return new(Connection) },
58e37bfe   zhangqijia   add sync.Pool to ...
39
40
  }
  
54b3f133   zhangqijia   add connector int...
41
  func NewConn(id int, conn net.Conn, splitter ISplitter) IConnection {
58e37bfe   zhangqijia   add sync.Pool to ...
42
  	c := connectionPool.Get().(*Connection)
495e9142   zhangqijia   fix: 增加DisConnect...
43
44
  	status := atomic.LoadUint32(&c.Status)
  	if status != 0 {
58e37bfe   zhangqijia   add sync.Pool to ...
45
46
  		connectionPool.Put(c)
  		c = new(Connection)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
47
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
48
49
50
  
  	atomic.StoreUint32(&c.Id, uint32(id))
  	c.Conn = conn
54b3f133   zhangqijia   add connector int...
51
  	c.splitter = splitter
58e37bfe   zhangqijia   add sync.Pool to ...
52
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
53
54
  	c.scanner = bufio.NewScanner(conn)
  	c.writer = bufio.NewWriter(conn)
58e37bfe   zhangqijia   add sync.Pool to ...
55
56
57
  
  	c.reset()
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
58
  	return c
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
59
60
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
61
  func (c *Connection) reset() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
62
63
  	c.WBuffer = make(chan []byte, common.MaxMsgChan)
  	c.Quit = make(chan *Connection)
58e37bfe   zhangqijia   add sync.Pool to ...
64
65
  
  	if c.readFunc == nil {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
66
  		c.readFunc = make(chan func(), 10)
58e37bfe   zhangqijia   add sync.Pool to ...
67
68
69
70
  	}
  	if c.timerFunc == nil {
  		c.timerFunc = make(chan func(), 10)
  	}
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
71
72
73
  	if c.customizeFunc == nil {
  		c.customizeFunc = make(chan func(), 10)
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
74
75
76
77
78
79
80
81
82
  
  	//c.connectionCallback	= c.defaultConnectionCallback
  	//c.messageCallback 		= c.defaultMessageCallback
  	//c.closeCallback 		= c.defaultCloseCallback
  	//c.timerCallback 		= c.defaultTimerCallback
  }
  
  func (c *Connection) GetID() uint32 {
  	return atomic.LoadUint32(&c.Id)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
83
84
85
86
87
88
89
90
91
92
93
94
95
96
  }
  
  func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
  	c.connectionCallback = cb
  }
  
  func (c *Connection) SetMessageCallback(cb MessageCallback) {
  	c.messageCallback = cb
  }
  
  func (c *Connection) SetCloseCallback(cb CloseCallback) {
  	c.closeCallback = cb
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
97
98
99
100
  func (c *Connection) SetTimerCallback(cb TimerCallback) {
  	c.timerCallback = cb
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
101
  func (c *Connection) Start() {
77f5eec7   zhangqijia   plugin 插件热更 接口
102
103
104
105
  	go c.write()
  	go c.read()
  	go c.listen()
  
58e37bfe   zhangqijia   add sync.Pool to ...
106
  	atomic.StoreUint32(&c.Status, 1)
77f5eec7   zhangqijia   plugin 插件热更 接口
107
108
109
110
  	c.connectionCallback(c)
  	c.handleTimeOut()
  }
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
111
  func (c *Connection) Stop() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
112
113
114
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
115
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
116
117
118
119
120
121
122
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return
  	case c.Quit <- c:
77f5eec7   zhangqijia   plugin 插件热更 接口
123
124
  		return
  	}
77f5eec7   zhangqijia   plugin 插件热更 接口
125
126
  }
  
8568cf44   zhangqijia   update preserve
127
128
  func (c *Connection) Send(errCode int32, cmd uint32, data []byte, preserve uint32) error {
  	buf, err := c.splitter.Pack(cmd, data, errCode, preserve)
77f5eec7   zhangqijia   plugin 插件热更 接口
129
130
131
132
133
134
135
136
137
138
139
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
c8509ef6   zhangqijia   fix: notify equip...
140
141
142
143
  		return nil
  	}
  }
  
927b2652   zhangqijia   feat: EmailNewNty...
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
  func (c *Connection) SendPB(errCode int32, cmd uint32, data proto.Message, preserve uint32) error {
  	pbData, err := proto.Marshal(data)
  	if err != nil {
  		return err
  	}
  	buf, err := c.splitter.Pack(cmd, pbData, errCode, preserve)
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
  		return nil
  	}
  }
  
8568cf44   zhangqijia   update preserve
165
166
  func (c *Connection) SendSuccess(cmd uint32, data []byte, preserve uint32) error {
  	buf, err := c.splitter.Pack(cmd, data, 0, preserve)
c8509ef6   zhangqijia   fix: notify equip...
167
168
169
170
171
172
173
174
175
176
177
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
77f5eec7   zhangqijia   plugin 插件热更 接口
178
179
180
181
  		return nil
  	}
  }
  
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
182
183
184
185
  func (c *Connection) CustomChan() chan<- func() {
  	return c.customizeFunc
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
186
  func (c *Connection) defaultConnectionCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
187
188
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
189
  func (c *Connection) defaultMessageCallback(msg IMessage) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
190
191
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
192
  func (c *Connection) defaultCloseCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
193
194
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
195
  func (c *Connection) defaultTimerCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
196
197
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
198
  func (c *Connection) write() {
58e37bfe   zhangqijia   add sync.Pool to ...
199
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
200
  		//logger.Debug("write close")
58e37bfe   zhangqijia   add sync.Pool to ...
201
202
  		c.Stop()
  	}()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
203
204
205
  
  	for msg := range c.WBuffer {
  		n, err := c.writer.Write(msg)
cd2f96ab   zhangqijia   fix: 优化连接管理
206
207
  		if err != nil {
  			logger.Error("write fail err: "+err.Error(), "n: ", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
208
209
210
211
212
213
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
  			logger.Error("write Flush fail err: " + err.Error())
  			return
  		}
f74e34e3   zhangqijia   fix: 批量删除装备
214
  		logger.Debug("write n: %d", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
215
216
217
218
  	}
  }
  
  func (c *Connection) read() {
58e37bfe   zhangqijia   add sync.Pool to ...
219
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
220
  		//logger.Debug("read close")
58e37bfe   zhangqijia   add sync.Pool to ...
221
222
223
  		c.Stop()
  	}()
  
54b3f133   zhangqijia   add connector int...
224
  	c.scanner.Split(c.splitter.ParseMsg)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
225
226
  
  	for c.scanner.Scan() {
54b3f133   zhangqijia   add connector int...
227
  		req, err := c.splitter.UnPack(c.scanner.Bytes())
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
228
229
230
231
  		if err != nil {
  			return
  		}
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
232
  		req.SetSID(c.GetID())
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
233
234
235
  		c.readFunc <- func() {
  			c.messageCallback(req)
  		}
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
236
237
238
239
240
241
242
243
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		logger.Error("scanner.err: %s", err.Error())
  		return
  	}
  }
  
77f5eec7   zhangqijia   plugin 插件热更 接口
244
  //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
cd2f96ab   zhangqijia   fix: 优化连接管理
245
  func (c *Connection) listen() {
58e37bfe   zhangqijia   add sync.Pool to ...
246
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
247
  		//logger.Debug("listen close")
58e37bfe   zhangqijia   add sync.Pool to ...
248
249
  		c.quitting()
  	}()
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
250
  
cd2f96ab   zhangqijia   fix: 优化连接管理
251
  	for {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
252
  		select {
cd2f96ab   zhangqijia   fix: 优化连接管理
253
  		case timerFunc := <-c.timerFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
254
  			timerFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
255
  		case readFunc := <-c.readFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
256
  			readFunc()
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
257
258
  		case customizeFunc := <-c.customizeFunc:
  			customizeFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
259
  		case <-c.Quit:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
260
261
262
263
264
  			return
  		}
  	}
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
265
  func (c *Connection) handleTimeOut() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
266
267
268
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
269
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
270
271
272
273
274
275
  	c.timerFunc <- func() {
  		c.timerCallback(c)
  	}
  	TimeOut(1*time.Second, c.handleTimeOut)
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
276
  func (c *Connection) quitting() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
277
278
279
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
765431a4   zhangqijia   增加schema接口, 抽象 mo...
280
281
  	atomic.StoreUint32(&c.Status, 0)
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
282
283
  	close(c.WBuffer)
  	close(c.Quit)
58e37bfe   zhangqijia   add sync.Pool to ...
284
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
285
286
  	c.Conn.Close()
  	c.closeCallback(c)
58e37bfe   zhangqijia   add sync.Pool to ...
287
288
289
  
  	//放回到对象池
  	connectionPool.Put(c)
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
290
  }