Blame view

common/components/conn.go 4.42 KB
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
1
2
3
4
  package components
  
  import (
  	"bufio"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
5
  	"fmt"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
6
7
  	"net"
  	"pro2d/common"
765431a4   zhangqijia   增加schema接口, 抽象 mo...
8
  	"pro2d/common/logger"
58e37bfe   zhangqijia   add sync.Pool to ...
9
  	"sync"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
10
  	"sync/atomic"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
11
12
13
  	"time"
  )
  
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
14
15
16
  type Connection struct {
  	IConnection
  	net.Conn
54b3f133   zhangqijia   add connector int...
17
  	splitter ISplitter
58e37bfe   zhangqijia   add sync.Pool to ...
18
  	Id     uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
19
  
cd2f96ab   zhangqijia   fix: 优化连接管理
20
21
22
23
  	scanner   *bufio.Scanner
  	writer    *bufio.Writer
  	WBuffer   chan []byte
  	Quit      chan *Connection
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
24
25
  	readFunc  chan func()
  	timerFunc chan func()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
26
27
28
29
30
  
  	messageCallback    MessageCallback
  	connectionCallback ConnectionCallback
  	closeCallback      CloseCallback
  	timerCallback      TimerCallback
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
31
32
  
  	Status uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
33
34
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
35
36
37
38
  var connectionPool = &sync.Pool{
  	New: func() interface{} { return new(Connection)},
  }
  
54b3f133   zhangqijia   add connector int...
39
  func NewConn(id int, conn net.Conn, splitter ISplitter) IConnection {
58e37bfe   zhangqijia   add sync.Pool to ...
40
41
42
43
44
  	c := connectionPool.Get().(*Connection)
  	closed := atomic.LoadUint32(&c.Status)
  	if closed != 0 {
  		connectionPool.Put(c)
  		c = new(Connection)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
45
  	}
58e37bfe   zhangqijia   add sync.Pool to ...
46
47
48
  
  	atomic.StoreUint32(&c.Id, uint32(id))
  	c.Conn = conn
54b3f133   zhangqijia   add connector int...
49
  	c.splitter = splitter
58e37bfe   zhangqijia   add sync.Pool to ...
50
51
52
53
54
55
  
  	c.scanner =   bufio.NewScanner(conn)
  	c.writer =  bufio.NewWriter(conn)
  
  	c.reset()
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
56
  	return c
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
57
58
  }
  
58e37bfe   zhangqijia   add sync.Pool to ...
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
  func (c *Connection) reset() {
  	c.WBuffer 	= make(chan []byte, common.MaxMsgChan)
  	c.Quit 		= make(chan *Connection)
  
  	if c.readFunc == nil {
  		c.readFunc 	= make(chan func(), 10)
  	}
  	if c.timerFunc == nil {
  		c.timerFunc = make(chan func(), 10)
  	}
  
  	//c.connectionCallback	= c.defaultConnectionCallback
  	//c.messageCallback 		= c.defaultMessageCallback
  	//c.closeCallback 		= c.defaultCloseCallback
  	//c.timerCallback 		= c.defaultTimerCallback
  }
  
  func (c *Connection) GetID() uint32 {
  	return atomic.LoadUint32(&c.Id)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
78
79
80
81
82
83
84
85
86
87
88
89
90
91
  }
  
  func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
  	c.connectionCallback = cb
  }
  
  func (c *Connection) SetMessageCallback(cb MessageCallback) {
  	c.messageCallback = cb
  }
  
  func (c *Connection) SetCloseCallback(cb CloseCallback) {
  	c.closeCallback = cb
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
92
93
94
95
  func (c *Connection) SetTimerCallback(cb TimerCallback) {
  	c.timerCallback = cb
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
96
  func (c *Connection) Start() {
77f5eec7   zhangqijia   plugin 插件热更 接口
97
98
99
100
  	go c.write()
  	go c.read()
  	go c.listen()
  
58e37bfe   zhangqijia   add sync.Pool to ...
101
  	atomic.StoreUint32(&c.Status, 1)
77f5eec7   zhangqijia   plugin 插件热更 接口
102
103
104
105
  	c.connectionCallback(c)
  	c.handleTimeOut()
  }
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
106
  func (c *Connection) Stop() {
58e37bfe   zhangqijia   add sync.Pool to ...
107
108
  	if atomic.LoadUint32(&c.Status) == 0 { return }
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
109
110
111
112
113
114
115
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return
  	case c.Quit <- c:
77f5eec7   zhangqijia   plugin 插件热更 接口
116
117
  		return
  	}
77f5eec7   zhangqijia   plugin 插件热更 接口
118
119
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
120
  func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error {
54b3f133   zhangqijia   add connector int...
121
  	buf, err := c.splitter.Pack(cmd, data, errCode, 0)
77f5eec7   zhangqijia   plugin 插件热更 接口
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
  		return nil
  	}
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
137
  func (c *Connection) defaultConnectionCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
138
139
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
140
  func (c *Connection) defaultMessageCallback(msg IMessage) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
141
142
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
143
  func (c *Connection) defaultCloseCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
144
145
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
146
  func (c *Connection) defaultTimerCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
147
148
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
149
  func (c *Connection) write() {
58e37bfe   zhangqijia   add sync.Pool to ...
150
151
152
153
  	defer func() {
  		logger.Debug("write close")
  		c.Stop()
  	}()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
154
155
156
  
  	for msg := range c.WBuffer {
  		n, err := c.writer.Write(msg)
cd2f96ab   zhangqijia   fix: 优化连接管理
157
158
  		if err != nil {
  			logger.Error("write fail err: "+err.Error(), "n: ", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
159
160
161
162
163
164
165
166
167
168
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
  			logger.Error("write Flush fail err: " + err.Error())
  			return
  		}
  	}
  }
  
  func (c *Connection) read() {
58e37bfe   zhangqijia   add sync.Pool to ...
169
170
171
172
173
  	defer func() {
  		logger.Debug("read close")
  		c.Stop()
  	}()
  
54b3f133   zhangqijia   add connector int...
174
  	c.scanner.Split(c.splitter.ParseMsg)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
175
176
  
  	for c.scanner.Scan() {
54b3f133   zhangqijia   add connector int...
177
  		req, err := c.splitter.UnPack(c.scanner.Bytes())
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
178
179
180
181
  		if err != nil {
  			return
  		}
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
182
  		req.SetSession(c)
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
183
184
185
  		c.readFunc <- func() {
  			c.messageCallback(req)
  		}
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
186
187
188
189
190
191
192
193
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		logger.Error("scanner.err: %s", err.Error())
  		return
  	}
  }
  
77f5eec7   zhangqijia   plugin 插件热更 接口
194
  //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
cd2f96ab   zhangqijia   fix: 优化连接管理
195
  func (c *Connection) listen() {
58e37bfe   zhangqijia   add sync.Pool to ...
196
197
198
199
  	defer func() {
  		logger.Debug("listen close")
  		c.quitting()
  	}()
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
200
  
cd2f96ab   zhangqijia   fix: 优化连接管理
201
  	for {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
202
  		select {
cd2f96ab   zhangqijia   fix: 优化连接管理
203
  		case timerFunc := <-c.timerFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
204
  			timerFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
205
  		case readFunc := <-c.readFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
206
  			readFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
207
  		case <-c.Quit:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
208
209
210
211
212
  			return
  		}
  	}
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
213
  func (c *Connection) handleTimeOut() {
58e37bfe   zhangqijia   add sync.Pool to ...
214
215
  	if atomic.LoadUint32(&c.Status) == 0 { return }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
216
217
218
219
220
221
  	c.timerFunc <- func() {
  		c.timerCallback(c)
  	}
  	TimeOut(1*time.Second, c.handleTimeOut)
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
222
  func (c *Connection) quitting() {
58e37bfe   zhangqijia   add sync.Pool to ...
223
  	if atomic.LoadUint32(&c.Status) == 0 { return }
765431a4   zhangqijia   增加schema接口, 抽象 mo...
224
225
226
227
228
  	atomic.StoreUint32(&c.Status, 0)
  
  	logger.Debug("ID: %d close", c.Id)
  	close(c.WBuffer)
  	close(c.Quit)
58e37bfe   zhangqijia   add sync.Pool to ...
229
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
230
231
  	c.Conn.Close()
  	c.closeCallback(c)
58e37bfe   zhangqijia   add sync.Pool to ...
232
233
234
  
  	//放回到对象池
  	connectionPool.Put(c)
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
235
  }