Blame view

common/components/conn.go 4.69 KB
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
1
2
3
4
  package components
  
  import (
  	"bufio"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
5
  	"fmt"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
6
7
  	"net"
  	"pro2d/common"
765431a4   zhangqijia   增加schema接口, 抽象 mo...
8
  	"pro2d/common/logger"
58e37bfe   zhangqijia   add sync.Pool to ...
9
  	"sync"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
10
  	"sync/atomic"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
11
12
13
  	"time"
  )
  
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
14
15
16
  type Connection struct {
  	IConnection
  	net.Conn
54b3f133   zhangqijia   add connector int...
17
  	splitter ISplitter
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
18
  	Id       uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
19
  
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
20
21
22
23
24
25
26
  	scanner       *bufio.Scanner
  	writer        *bufio.Writer
  	WBuffer       chan []byte
  	Quit          chan *Connection
  	readFunc      chan func()
  	timerFunc     chan func()
  	customizeFunc chan func()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
27
28
29
30
31
  
  	messageCallback    MessageCallback
  	connectionCallback ConnectionCallback
  	closeCallback      CloseCallback
  	timerCallback      TimerCallback
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
32
33
  
  	Status uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
34
35
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
36
  var connectionPool = &sync.Pool{
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
37
  	New: func() interface{} { return new(Connection) },
58e37bfe   zhangqijia   add sync.Pool to ...
38
39
  }
  
54b3f133   zhangqijia   add connector int...
40
  func NewConn(id int, conn net.Conn, splitter ISplitter) IConnection {
58e37bfe   zhangqijia   add sync.Pool to ...
41
42
43
44
45
  	c := connectionPool.Get().(*Connection)
  	closed := atomic.LoadUint32(&c.Status)
  	if closed != 0 {
  		connectionPool.Put(c)
  		c = new(Connection)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
46
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
47
48
49
  
  	atomic.StoreUint32(&c.Id, uint32(id))
  	c.Conn = conn
54b3f133   zhangqijia   add connector int...
50
  	c.splitter = splitter
58e37bfe   zhangqijia   add sync.Pool to ...
51
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
52
53
  	c.scanner = bufio.NewScanner(conn)
  	c.writer = bufio.NewWriter(conn)
58e37bfe   zhangqijia   add sync.Pool to ...
54
55
56
  
  	c.reset()
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
57
  	return c
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
58
59
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
60
  func (c *Connection) reset() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
61
62
  	c.WBuffer = make(chan []byte, common.MaxMsgChan)
  	c.Quit = make(chan *Connection)
58e37bfe   zhangqijia   add sync.Pool to ...
63
64
  
  	if c.readFunc == nil {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
65
  		c.readFunc = make(chan func(), 10)
58e37bfe   zhangqijia   add sync.Pool to ...
66
67
68
69
  	}
  	if c.timerFunc == nil {
  		c.timerFunc = make(chan func(), 10)
  	}
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
70
71
72
  	if c.customizeFunc == nil {
  		c.customizeFunc = make(chan func(), 10)
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
73
74
75
76
77
78
79
80
81
  
  	//c.connectionCallback	= c.defaultConnectionCallback
  	//c.messageCallback 		= c.defaultMessageCallback
  	//c.closeCallback 		= c.defaultCloseCallback
  	//c.timerCallback 		= c.defaultTimerCallback
  }
  
  func (c *Connection) GetID() uint32 {
  	return atomic.LoadUint32(&c.Id)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
82
83
84
85
86
87
88
89
90
91
92
93
94
95
  }
  
  func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
  	c.connectionCallback = cb
  }
  
  func (c *Connection) SetMessageCallback(cb MessageCallback) {
  	c.messageCallback = cb
  }
  
  func (c *Connection) SetCloseCallback(cb CloseCallback) {
  	c.closeCallback = cb
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
96
97
98
99
  func (c *Connection) SetTimerCallback(cb TimerCallback) {
  	c.timerCallback = cb
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
100
  func (c *Connection) Start() {
77f5eec7   zhangqijia   plugin 插件热更 接口
101
102
103
104
  	go c.write()
  	go c.read()
  	go c.listen()
  
58e37bfe   zhangqijia   add sync.Pool to ...
105
  	atomic.StoreUint32(&c.Status, 1)
77f5eec7   zhangqijia   plugin 插件热更 接口
106
107
108
109
  	c.connectionCallback(c)
  	c.handleTimeOut()
  }
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
110
  func (c *Connection) Stop() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
111
112
113
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
114
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
115
116
117
118
119
120
121
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return
  	case c.Quit <- c:
77f5eec7   zhangqijia   plugin 插件热更 接口
122
123
  		return
  	}
77f5eec7   zhangqijia   plugin 插件热更 接口
124
125
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
126
  func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error {
54b3f133   zhangqijia   add connector int...
127
  	buf, err := c.splitter.Pack(cmd, data, errCode, 0)
77f5eec7   zhangqijia   plugin 插件热更 接口
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
  		return nil
  	}
  }
  
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
143
144
145
146
  func (c *Connection) CustomChan() chan<- func() {
  	return c.customizeFunc
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
147
  func (c *Connection) defaultConnectionCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
148
149
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
150
  func (c *Connection) defaultMessageCallback(msg IMessage) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
151
152
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
153
  func (c *Connection) defaultCloseCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
154
155
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
156
  func (c *Connection) defaultTimerCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
157
158
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
159
  func (c *Connection) write() {
58e37bfe   zhangqijia   add sync.Pool to ...
160
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
161
  		//logger.Debug("write close")
58e37bfe   zhangqijia   add sync.Pool to ...
162
163
  		c.Stop()
  	}()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
164
165
166
  
  	for msg := range c.WBuffer {
  		n, err := c.writer.Write(msg)
cd2f96ab   zhangqijia   fix: 优化连接管理
167
168
  		if err != nil {
  			logger.Error("write fail err: "+err.Error(), "n: ", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
169
170
171
172
173
174
175
176
177
178
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
  			logger.Error("write Flush fail err: " + err.Error())
  			return
  		}
  	}
  }
  
  func (c *Connection) read() {
58e37bfe   zhangqijia   add sync.Pool to ...
179
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
180
  		//logger.Debug("read close")
58e37bfe   zhangqijia   add sync.Pool to ...
181
182
183
  		c.Stop()
  	}()
  
54b3f133   zhangqijia   add connector int...
184
  	c.scanner.Split(c.splitter.ParseMsg)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
185
186
  
  	for c.scanner.Scan() {
54b3f133   zhangqijia   add connector int...
187
  		req, err := c.splitter.UnPack(c.scanner.Bytes())
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
188
189
190
191
  		if err != nil {
  			return
  		}
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
192
  		req.SetSID(c.GetID())
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
193
194
195
  		c.readFunc <- func() {
  			c.messageCallback(req)
  		}
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
196
197
198
199
200
201
202
203
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		logger.Error("scanner.err: %s", err.Error())
  		return
  	}
  }
  
77f5eec7   zhangqijia   plugin 插件热更 接口
204
  //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
cd2f96ab   zhangqijia   fix: 优化连接管理
205
  func (c *Connection) listen() {
58e37bfe   zhangqijia   add sync.Pool to ...
206
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
207
  		//logger.Debug("listen close")
58e37bfe   zhangqijia   add sync.Pool to ...
208
209
  		c.quitting()
  	}()
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
210
  
cd2f96ab   zhangqijia   fix: 优化连接管理
211
  	for {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
212
  		select {
cd2f96ab   zhangqijia   fix: 优化连接管理
213
  		case timerFunc := <-c.timerFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
214
  			timerFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
215
  		case readFunc := <-c.readFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
216
  			readFunc()
8aaf28dd   zhangqijia   fix: 修复gm系统修改role...
217
218
  		case customizeFunc := <-c.customizeFunc:
  			customizeFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
219
  		case <-c.Quit:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
220
221
222
223
224
  			return
  		}
  	}
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
225
  func (c *Connection) handleTimeOut() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
226
227
228
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
229
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
230
231
232
233
234
235
  	c.timerFunc <- func() {
  		c.timerCallback(c)
  	}
  	TimeOut(1*time.Second, c.handleTimeOut)
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
236
  func (c *Connection) quitting() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
237
238
239
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
765431a4   zhangqijia   增加schema接口, 抽象 mo...
240
241
242
243
244
  	atomic.StoreUint32(&c.Status, 0)
  
  	logger.Debug("ID: %d close", c.Id)
  	close(c.WBuffer)
  	close(c.Quit)
58e37bfe   zhangqijia   add sync.Pool to ...
245
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
246
247
  	c.Conn.Close()
  	c.closeCallback(c)
58e37bfe   zhangqijia   add sync.Pool to ...
248
249
250
  
  	//放回到对象池
  	connectionPool.Put(c)
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
251
  }