Blame view

common/components/conn.go 4.44 KB
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
1
2
3
4
  package components
  
  import (
  	"bufio"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
5
  	"fmt"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
6
7
  	"net"
  	"pro2d/common"
765431a4   zhangqijia   增加schema接口, 抽象 mo...
8
  	"pro2d/common/logger"
58e37bfe   zhangqijia   add sync.Pool to ...
9
  	"sync"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
10
  	"sync/atomic"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
11
12
13
  	"time"
  )
  
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
14
15
16
  type Connection struct {
  	IConnection
  	net.Conn
54b3f133   zhangqijia   add connector int...
17
  	splitter ISplitter
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
18
  	Id       uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
19
  
cd2f96ab   zhangqijia   fix: 优化连接管理
20
21
22
23
  	scanner   *bufio.Scanner
  	writer    *bufio.Writer
  	WBuffer   chan []byte
  	Quit      chan *Connection
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
24
25
  	readFunc  chan func()
  	timerFunc chan func()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
26
27
28
29
30
  
  	messageCallback    MessageCallback
  	connectionCallback ConnectionCallback
  	closeCallback      CloseCallback
  	timerCallback      TimerCallback
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
31
32
  
  	Status uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
33
34
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
35
  var connectionPool = &sync.Pool{
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
36
  	New: func() interface{} { return new(Connection) },
58e37bfe   zhangqijia   add sync.Pool to ...
37
38
  }
  
54b3f133   zhangqijia   add connector int...
39
  func NewConn(id int, conn net.Conn, splitter ISplitter) IConnection {
58e37bfe   zhangqijia   add sync.Pool to ...
40
41
42
43
44
  	c := connectionPool.Get().(*Connection)
  	closed := atomic.LoadUint32(&c.Status)
  	if closed != 0 {
  		connectionPool.Put(c)
  		c = new(Connection)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
45
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
46
47
48
  
  	atomic.StoreUint32(&c.Id, uint32(id))
  	c.Conn = conn
54b3f133   zhangqijia   add connector int...
49
  	c.splitter = splitter
58e37bfe   zhangqijia   add sync.Pool to ...
50
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
51
52
  	c.scanner = bufio.NewScanner(conn)
  	c.writer = bufio.NewWriter(conn)
58e37bfe   zhangqijia   add sync.Pool to ...
53
54
55
  
  	c.reset()
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
56
  	return c
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
57
58
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
59
  func (c *Connection) reset() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
60
61
  	c.WBuffer = make(chan []byte, common.MaxMsgChan)
  	c.Quit = make(chan *Connection)
58e37bfe   zhangqijia   add sync.Pool to ...
62
63
  
  	if c.readFunc == nil {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
64
  		c.readFunc = make(chan func(), 10)
58e37bfe   zhangqijia   add sync.Pool to ...
65
66
67
68
69
70
71
72
73
74
75
76
77
  	}
  	if c.timerFunc == nil {
  		c.timerFunc = make(chan func(), 10)
  	}
  
  	//c.connectionCallback	= c.defaultConnectionCallback
  	//c.messageCallback 		= c.defaultMessageCallback
  	//c.closeCallback 		= c.defaultCloseCallback
  	//c.timerCallback 		= c.defaultTimerCallback
  }
  
  func (c *Connection) GetID() uint32 {
  	return atomic.LoadUint32(&c.Id)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
78
79
80
81
82
83
84
85
86
87
88
89
90
91
  }
  
  func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
  	c.connectionCallback = cb
  }
  
  func (c *Connection) SetMessageCallback(cb MessageCallback) {
  	c.messageCallback = cb
  }
  
  func (c *Connection) SetCloseCallback(cb CloseCallback) {
  	c.closeCallback = cb
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
92
93
94
95
  func (c *Connection) SetTimerCallback(cb TimerCallback) {
  	c.timerCallback = cb
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
96
  func (c *Connection) Start() {
77f5eec7   zhangqijia   plugin 插件热更 接口
97
98
99
100
  	go c.write()
  	go c.read()
  	go c.listen()
  
58e37bfe   zhangqijia   add sync.Pool to ...
101
  	atomic.StoreUint32(&c.Status, 1)
77f5eec7   zhangqijia   plugin 插件热更 接口
102
103
104
105
  	c.connectionCallback(c)
  	c.handleTimeOut()
  }
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
106
  func (c *Connection) Stop() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
107
108
109
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
110
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
111
112
113
114
115
116
117
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return
  	case c.Quit <- c:
77f5eec7   zhangqijia   plugin 插件热更 接口
118
119
  		return
  	}
77f5eec7   zhangqijia   plugin 插件热更 接口
120
121
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
122
  func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error {
54b3f133   zhangqijia   add connector int...
123
  	buf, err := c.splitter.Pack(cmd, data, errCode, 0)
77f5eec7   zhangqijia   plugin 插件热更 接口
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
  		return nil
  	}
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
139
  func (c *Connection) defaultConnectionCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
140
141
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
142
  func (c *Connection) defaultMessageCallback(msg IMessage) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
143
144
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
145
  func (c *Connection) defaultCloseCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
146
147
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
148
  func (c *Connection) defaultTimerCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
149
150
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
151
  func (c *Connection) write() {
58e37bfe   zhangqijia   add sync.Pool to ...
152
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
153
  		//logger.Debug("write close")
58e37bfe   zhangqijia   add sync.Pool to ...
154
155
  		c.Stop()
  	}()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
156
157
158
  
  	for msg := range c.WBuffer {
  		n, err := c.writer.Write(msg)
cd2f96ab   zhangqijia   fix: 优化连接管理
159
160
  		if err != nil {
  			logger.Error("write fail err: "+err.Error(), "n: ", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
161
162
163
164
165
166
167
168
169
170
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
  			logger.Error("write Flush fail err: " + err.Error())
  			return
  		}
  	}
  }
  
  func (c *Connection) read() {
58e37bfe   zhangqijia   add sync.Pool to ...
171
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
172
  		//logger.Debug("read close")
58e37bfe   zhangqijia   add sync.Pool to ...
173
174
175
  		c.Stop()
  	}()
  
54b3f133   zhangqijia   add connector int...
176
  	c.scanner.Split(c.splitter.ParseMsg)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
177
178
  
  	for c.scanner.Scan() {
54b3f133   zhangqijia   add connector int...
179
  		req, err := c.splitter.UnPack(c.scanner.Bytes())
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
180
181
182
183
  		if err != nil {
  			return
  		}
  
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
184
  		req.SetSID(c.GetID())
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
185
186
187
  		c.readFunc <- func() {
  			c.messageCallback(req)
  		}
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
188
189
190
191
192
193
194
195
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		logger.Error("scanner.err: %s", err.Error())
  		return
  	}
  }
  
77f5eec7   zhangqijia   plugin 插件热更 接口
196
  //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
cd2f96ab   zhangqijia   fix: 优化连接管理
197
  func (c *Connection) listen() {
58e37bfe   zhangqijia   add sync.Pool to ...
198
  	defer func() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
199
  		//logger.Debug("listen close")
58e37bfe   zhangqijia   add sync.Pool to ...
200
201
  		c.quitting()
  	}()
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
202
  
cd2f96ab   zhangqijia   fix: 优化连接管理
203
  	for {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
204
  		select {
cd2f96ab   zhangqijia   fix: 优化连接管理
205
  		case timerFunc := <-c.timerFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
206
  			timerFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
207
  		case readFunc := <-c.readFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
208
  			readFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
209
  		case <-c.Quit:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
210
211
212
213
214
  			return
  		}
  	}
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
215
  func (c *Connection) handleTimeOut() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
216
217
218
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
219
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
220
221
222
223
224
225
  	c.timerFunc <- func() {
  		c.timerCallback(c)
  	}
  	TimeOut(1*time.Second, c.handleTimeOut)
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
226
  func (c *Connection) quitting() {
eadc9aff   zhangqijia   feat: 增加上阵下阵协议,增加...
227
228
229
  	if atomic.LoadUint32(&c.Status) == 0 {
  		return
  	}
765431a4   zhangqijia   增加schema接口, 抽象 mo...
230
231
232
233
234
  	atomic.StoreUint32(&c.Status, 0)
  
  	logger.Debug("ID: %d close", c.Id)
  	close(c.WBuffer)
  	close(c.Quit)
58e37bfe   zhangqijia   add sync.Pool to ...
235
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
236
237
  	c.Conn.Close()
  	c.closeCallback(c)
58e37bfe   zhangqijia   add sync.Pool to ...
238
239
240
  
  	//放回到对象池
  	connectionPool.Put(c)
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
241
  }