0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
1
2
3
4
|
package components
import (
"bufio"
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
5
|
"fmt"
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
6
7
|
"net"
"pro2d/common"
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
8
|
"pro2d/common/logger"
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
9
|
"sync/atomic"
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
10
11
12
|
"time"
)
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
13
14
15
16
17
18
|
type Connection struct {
IConnection
net.Conn
Server IServer
Id int
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
19
20
21
22
|
scanner *bufio.Scanner
writer *bufio.Writer
WBuffer chan []byte
Quit chan *Connection
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
23
24
|
readFunc chan func()
timerFunc chan func()
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
25
26
27
28
29
|
messageCallback MessageCallback
connectionCallback ConnectionCallback
closeCallback CloseCallback
timerCallback TimerCallback
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
30
31
|
Status uint32
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
32
33
34
|
}
func NewConn(id int, conn net.Conn, s IServer) *Connection {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
35
|
c := &Connection{
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
36
37
38
39
40
41
42
43
|
Id: id,
Conn: conn,
Server: s,
scanner: bufio.NewScanner(conn),
writer: bufio.NewWriter(conn),
WBuffer: make(chan []byte, common.MaxMsgChan),
Quit: make(chan *Connection),
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
44
45
46
|
readFunc: make(chan func(), 10),
timerFunc: make(chan func(), 10),
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
47
|
Status: 0,
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
48
|
}
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
49
50
51
52
53
|
c.connectionCallback = c.defaultConnectionCallback
c.messageCallback = c.defaultMessageCallback
c.closeCallback = c.defaultCloseCallback
c.timerCallback = c.defaultTimerCallback
return c
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
}
func (c *Connection) GetID() int {
return c.Id
}
func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
c.connectionCallback = cb
}
func (c *Connection) SetMessageCallback(cb MessageCallback) {
c.messageCallback = cb
}
func (c *Connection) SetCloseCallback(cb CloseCallback) {
c.closeCallback = cb
}
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
72
73
74
75
|
func (c *Connection) SetTimerCallback(cb TimerCallback) {
c.timerCallback = cb
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
76
|
func (c *Connection) Start() {
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
77
78
79
80
81
82
83
84
85
|
go c.write()
go c.read()
go c.listen()
c.Status = 1
c.connectionCallback(c)
c.handleTimeOut()
}
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
86
87
88
89
90
91
92
93
|
func (c *Connection) Stop() {
sendTimeout := time.NewTimer(5 * time.Millisecond)
defer sendTimeout.Stop()
// 发送超时
select {
case <-sendTimeout.C:
return
case c.Quit <- c:
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
94
95
|
return
}
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
96
97
|
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
98
|
func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error {
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
buf, err := c.Server.GetSplitter().Pack(cmd, data, errCode, 0)
if err != nil {
return err
}
sendTimeout := time.NewTimer(5 * time.Millisecond)
defer sendTimeout.Stop()
// 发送超时
select {
case <-sendTimeout.C:
return fmt.Errorf("send buff msg timeout")
case c.WBuffer <- buf:
return nil
}
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
115
|
func (c *Connection) defaultConnectionCallback(conn IConnection) {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
116
117
|
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
118
|
func (c *Connection) defaultMessageCallback(msg IMessage) {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
119
120
|
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
121
|
func (c *Connection) defaultCloseCallback(conn IConnection) {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
122
123
|
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
124
|
func (c *Connection) defaultTimerCallback(conn IConnection) {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
125
126
|
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
127
|
func (c *Connection) write() {
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
128
|
defer c.quitting()
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
129
130
131
|
for msg := range c.WBuffer {
n, err := c.writer.Write(msg)
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
132
133
|
if err != nil {
logger.Error("write fail err: "+err.Error(), "n: ", n)
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
134
135
136
137
138
139
140
141
142
143
|
return
}
if err := c.writer.Flush(); err != nil {
logger.Error("write Flush fail err: " + err.Error())
return
}
}
}
func (c *Connection) read() {
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
144
|
defer c.quitting()
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
145
146
147
148
149
150
151
152
|
c.scanner.Split(c.Server.GetSplitter().ParseMsg)
for c.scanner.Scan() {
req, err := c.Server.GetSplitter().UnPack(c.scanner.Bytes())
if err != nil {
return
}
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
153
|
req.SetSession(c)
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
154
155
156
|
c.readFunc <- func() {
c.messageCallback(req)
}
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
157
158
159
160
161
162
163
164
|
}
if err := c.scanner.Err(); err != nil {
logger.Error("scanner.err: %s", err.Error())
return
}
}
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
165
|
//此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
166
|
func (c *Connection) listen() {
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
167
|
defer c.quitting()
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
168
|
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
169
|
for {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
170
|
select {
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
171
|
case timerFunc := <-c.timerFunc:
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
172
|
timerFunc()
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
173
|
case readFunc := <-c.readFunc:
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
174
|
readFunc()
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
175
|
case <-c.Quit:
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
176
177
178
179
180
|
return
}
}
}
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
181
182
183
184
185
186
187
|
func (c *Connection) handleTimeOut() {
c.timerFunc <- func() {
c.timerCallback(c)
}
TimeOut(1*time.Second, c.handleTimeOut)
}
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
188
|
func (c *Connection) quitting() {
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
189
190
191
192
193
194
195
196
197
|
closed := atomic.LoadUint32(&c.Status)
if closed == 0 {
return
}
atomic.StoreUint32(&c.Status, 0)
logger.Debug("ID: %d close", c.Id)
close(c.WBuffer)
close(c.Quit)
|
cd2f96ab
zhangqijia
fix: 优化连接管理
|
198
|
close(c.readFunc)
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
199
200
|
c.Conn.Close()
c.closeCallback(c)
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
201
|
}
|