Blame view

common/components/conn.go 5.62 KB
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
1
2
3
4
  package components
  
  import (
  	"bufio"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
5
  	"fmt"
927b2652   zhangqijia   feat: EmailNewNty...
6
  	"github.com/golang/protobuf/proto"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
7
8
  	"net"
  	"pro2d/common"
765431a4   zhangqijia   增加schema接口, 抽象 mo...
9
  	"pro2d/common/logger"
aa487faa   zhangqijia   null
10
  	"pro2d/pb"
58e37bfe   zhangqijia   add sync.Pool to ...
11
  	"sync"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
12
  	"sync/atomic"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
13
14
15
  	"time"
  )
  
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
16
17
18
  type Connection struct {
  	IConnection
  	net.Conn
54b3f133   zhangqijia   add connector int...
19
  	splitter ISplitter
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
20
  	Id       uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
21
  
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
22
23
24
25
26
27
28
  	scanner       *bufio.Scanner
  	writer        *bufio.Writer
  	WBuffer       chan []byte
  	Quit          chan *Connection
  	readFunc      chan func()
  	timerFunc     chan func()
  	customizeFunc chan func()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
29
30
31
32
33
  
  	messageCallback    MessageCallback
  	connectionCallback ConnectionCallback
  	closeCallback      CloseCallback
  	timerCallback      TimerCallback
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
34
35
  
  	Status uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
36
37
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
38
  var connectionPool = &sync.Pool{
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
39
  	New: func() interface{} { return new(Connection) },
58e37bfe   zhangqijia   add sync.Pool to ...
40
41
  }
  
54b3f133   zhangqijia   add connector int...
42
  func NewConn(id int, conn net.Conn, splitter ISplitter) IConnection {
58e37bfe   zhangqijia   add sync.Pool to ...
43
  	c := connectionPool.Get().(*Connection)
495e9142   zhangqijia   fix: 增加DisConnect...
44
45
  	status := atomic.LoadUint32(&c.Status)
  	if status != 0 {
58e37bfe   zhangqijia   add sync.Pool to ...
46
47
  		connectionPool.Put(c)
  		c = new(Connection)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
48
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
49
50
51
  
  	atomic.StoreUint32(&c.Id, uint32(id))
  	c.Conn = conn
54b3f133   zhangqijia   add connector int...
52
  	c.splitter = splitter
58e37bfe   zhangqijia   add sync.Pool to ...
53
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
54
55
  	c.scanner = bufio.NewScanner(conn)
  	c.writer = bufio.NewWriter(conn)
58e37bfe   zhangqijia   add sync.Pool to ...
56
57
58
  
  	c.reset()
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
59
  	return c
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
60
61
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
62
  func (c *Connection) reset() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
63
64
  	c.WBuffer = make(chan []byte, common.MaxMsgChan)
  	c.Quit = make(chan *Connection)
58e37bfe   zhangqijia   add sync.Pool to ...
65
66
  
  	if c.readFunc == nil {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
67
  		c.readFunc = make(chan func(), 10)
58e37bfe   zhangqijia   add sync.Pool to ...
68
69
70
71
  	}
  	if c.timerFunc == nil {
  		c.timerFunc = make(chan func(), 10)
  	}
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
72
73
74
  	if c.customizeFunc == nil {
  		c.customizeFunc = make(chan func(), 10)
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
75
76
77
78
79
80
81
82
83
  
  	//c.connectionCallback	= c.defaultConnectionCallback
  	//c.messageCallback 		= c.defaultMessageCallback
  	//c.closeCallback 		= c.defaultCloseCallback
  	//c.timerCallback 		= c.defaultTimerCallback
  }
  
  func (c *Connection) GetID() uint32 {
  	return atomic.LoadUint32(&c.Id)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
84
85
86
87
88
89
90
91
92
93
94
95
96
97
  }
  
  func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
  	c.connectionCallback = cb
  }
  
  func (c *Connection) SetMessageCallback(cb MessageCallback) {
  	c.messageCallback = cb
  }
  
  func (c *Connection) SetCloseCallback(cb CloseCallback) {
  	c.closeCallback = cb
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
98
99
100
101
  func (c *Connection) SetTimerCallback(cb TimerCallback) {
  	c.timerCallback = cb
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
102
  func (c *Connection) Start() {
77f5eec7   zhangqijia   plugin 插件热更 接口
103
104
105
106
  	go c.write()
  	go c.read()
  	go c.listen()
  
58e37bfe   zhangqijia   add sync.Pool to ...
107
  	atomic.StoreUint32(&c.Status, 1)
77f5eec7   zhangqijia   plugin 插件热更 接口
108
109
110
111
  	c.connectionCallback(c)
  	c.handleTimeOut()
  }
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
112
  func (c *Connection) Stop() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
113
114
115
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
116
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
117
118
119
120
121
122
123
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return
  	case c.Quit <- c:
77f5eec7   zhangqijia   plugin 插件热更 接口
124
125
  		return
  	}
77f5eec7   zhangqijia   plugin 插件热更 接口
126
127
  }
  
8568cf44   zhangqijia   update preserve
128
129
  func (c *Connection) Send(errCode int32, cmd uint32, data []byte, preserve uint32) error {
  	buf, err := c.splitter.Pack(cmd, data, errCode, preserve)
77f5eec7   zhangqijia   plugin 插件热更 接口
130
131
132
133
134
135
136
137
138
139
140
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
c8509ef6   zhangqijia   fix: notify equip...
141
142
143
144
  		return nil
  	}
  }
  
aa487faa   zhangqijia   null
145
  func (c *Connection) SendPB(errCode int32, cmd pb.ProtoCode, data proto.Message, preserve uint32) error {
927b2652   zhangqijia   feat: EmailNewNty...
146
147
148
149
  	pbData, err := proto.Marshal(data)
  	if err != nil {
  		return err
  	}
aa487faa   zhangqijia   null
150
  	buf, err := c.splitter.Pack(uint32(cmd), pbData, errCode, preserve)
927b2652   zhangqijia   feat: EmailNewNty...
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
  		return nil
  	}
  }
  
8568cf44   zhangqijia   update preserve
166
167
  func (c *Connection) SendSuccess(cmd uint32, data []byte, preserve uint32) error {
  	buf, err := c.splitter.Pack(cmd, data, 0, preserve)
c8509ef6   zhangqijia   fix: notify equip...
168
169
170
171
172
173
174
175
176
177
178
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
77f5eec7   zhangqijia   plugin 插件热更 接口
179
180
181
182
  		return nil
  	}
  }
  
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
183
184
185
186
  func (c *Connection) CustomChan() chan<- func() {
  	return c.customizeFunc
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
187
  func (c *Connection) defaultConnectionCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
188
189
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
190
  func (c *Connection) defaultMessageCallback(msg IMessage) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
191
192
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
193
  func (c *Connection) defaultCloseCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
194
195
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
196
  func (c *Connection) defaultTimerCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
197
198
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
199
  func (c *Connection) write() {
58e37bfe   zhangqijia   add sync.Pool to ...
200
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
201
  		//logger.Debug("write close")
58e37bfe   zhangqijia   add sync.Pool to ...
202
203
  		c.Stop()
  	}()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
204
205
206
  
  	for msg := range c.WBuffer {
  		n, err := c.writer.Write(msg)
cd2f96ab   zhangqijia   fix: 优化连接管理
207
208
  		if err != nil {
  			logger.Error("write fail err: "+err.Error(), "n: ", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
209
210
211
212
213
214
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
  			logger.Error("write Flush fail err: " + err.Error())
  			return
  		}
f74e34e3   zhangqijia   fix: 批量删除装备
215
  		logger.Debug("write n: %d", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
216
217
218
219
  	}
  }
  
  func (c *Connection) read() {
58e37bfe   zhangqijia   add sync.Pool to ...
220
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
221
  		//logger.Debug("read close")
58e37bfe   zhangqijia   add sync.Pool to ...
222
223
224
  		c.Stop()
  	}()
  
54b3f133   zhangqijia   add connector int...
225
  	c.scanner.Split(c.splitter.ParseMsg)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
226
227
  
  	for c.scanner.Scan() {
54b3f133   zhangqijia   add connector int...
228
  		req, err := c.splitter.UnPack(c.scanner.Bytes())
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
229
230
231
232
  		if err != nil {
  			return
  		}
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
233
  		req.SetSID(c.GetID())
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
234
235
236
  		c.readFunc <- func() {
  			c.messageCallback(req)
  		}
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
237
238
239
240
241
242
243
244
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		logger.Error("scanner.err: %s", err.Error())
  		return
  	}
  }
  
77f5eec7   zhangqijia   plugin 插件热更 接口
245
  //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
cd2f96ab   zhangqijia   fix: 优化连接管理
246
  func (c *Connection) listen() {
58e37bfe   zhangqijia   add sync.Pool to ...
247
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
248
  		//logger.Debug("listen close")
58e37bfe   zhangqijia   add sync.Pool to ...
249
250
  		c.quitting()
  	}()
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
251
  
cd2f96ab   zhangqijia   fix: 优化连接管理
252
  	for {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
253
  		select {
cd2f96ab   zhangqijia   fix: 优化连接管理
254
  		case timerFunc := <-c.timerFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
255
  			timerFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
256
  		case readFunc := <-c.readFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
257
  			readFunc()
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
258
259
  		case customizeFunc := <-c.customizeFunc:
  			customizeFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
260
  		case <-c.Quit:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
261
262
263
264
265
  			return
  		}
  	}
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
266
  func (c *Connection) handleTimeOut() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
267
268
269
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
270
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
271
272
273
274
275
276
  	c.timerFunc <- func() {
  		c.timerCallback(c)
  	}
  	TimeOut(1*time.Second, c.handleTimeOut)
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
277
  func (c *Connection) quitting() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
278
279
280
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
765431a4   zhangqijia   增加schema接口, 抽象 mo...
281
282
  	atomic.StoreUint32(&c.Status, 0)
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
283
284
  	close(c.WBuffer)
  	close(c.Quit)
58e37bfe   zhangqijia   add sync.Pool to ...
285
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
286
287
  	c.Conn.Close()
  	c.closeCallback(c)
58e37bfe   zhangqijia   add sync.Pool to ...
288
289
290
  
  	//放回到对象池
  	connectionPool.Put(c)
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
291
  }