Blame view

common/components/conn.go 3.81 KB
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
1
2
3
4
  package components
  
  import (
  	"bufio"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
5
  	"fmt"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
6
7
  	"net"
  	"pro2d/common"
765431a4   zhangqijia   增加schema接口, 抽象 mo...
8
  	"pro2d/common/logger"
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
9
  	"sync/atomic"
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
10
11
12
  	"time"
  )
  
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
13
14
15
16
17
18
  type Connection struct {
  	IConnection
  	net.Conn
  	Server IServer
  	Id     int
  
cd2f96ab   zhangqijia   fix: 优化连接管理
19
20
21
22
  	scanner   *bufio.Scanner
  	writer    *bufio.Writer
  	WBuffer   chan []byte
  	Quit      chan *Connection
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
23
24
  	readFunc  chan func()
  	timerFunc chan func()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
25
26
27
28
29
  
  	messageCallback    MessageCallback
  	connectionCallback ConnectionCallback
  	closeCallback      CloseCallback
  	timerCallback      TimerCallback
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
30
31
  
  	Status uint32
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
32
33
34
  }
  
  func NewConn(id int, conn net.Conn, s IServer) *Connection {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
35
  	c := &Connection{
cd2f96ab   zhangqijia   fix: 优化连接管理
36
37
38
39
40
41
42
43
  		Id:     id,
  		Conn:   conn,
  		Server: s,
  
  		scanner:   bufio.NewScanner(conn),
  		writer:    bufio.NewWriter(conn),
  		WBuffer:   make(chan []byte, common.MaxMsgChan),
  		Quit:      make(chan *Connection),
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
44
45
46
  		readFunc:  make(chan func(), 10),
  		timerFunc: make(chan func(), 10),
  
cd2f96ab   zhangqijia   fix: 优化连接管理
47
  		Status: 0,
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
48
  	}
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
49
50
51
52
53
  	c.connectionCallback = c.defaultConnectionCallback
  	c.messageCallback = c.defaultMessageCallback
  	c.closeCallback = c.defaultCloseCallback
  	c.timerCallback = c.defaultTimerCallback
  	return c
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  }
  
  func (c *Connection) GetID() int {
  	return c.Id
  }
  
  func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
  	c.connectionCallback = cb
  }
  
  func (c *Connection) SetMessageCallback(cb MessageCallback) {
  	c.messageCallback = cb
  }
  
  func (c *Connection) SetCloseCallback(cb CloseCallback) {
  	c.closeCallback = cb
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
72
73
74
75
  func (c *Connection) SetTimerCallback(cb TimerCallback) {
  	c.timerCallback = cb
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
76
  func (c *Connection) Start() {
77f5eec7   zhangqijia   plugin 插件热更 接口
77
78
79
80
81
82
83
84
85
  	go c.write()
  	go c.read()
  	go c.listen()
  
  	c.Status = 1
  	c.connectionCallback(c)
  	c.handleTimeOut()
  }
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
86
87
88
89
90
91
92
93
  func (c *Connection) Stop() {
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return
  	case c.Quit <- c:
77f5eec7   zhangqijia   plugin 插件热更 接口
94
95
  		return
  	}
77f5eec7   zhangqijia   plugin 插件热更 接口
96
97
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
98
  func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error {
77f5eec7   zhangqijia   plugin 插件热更 接口
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
  	buf, err := c.Server.GetSplitter().Pack(cmd, data, errCode, 0)
  	if err != nil {
  		return err
  	}
  
  	sendTimeout := time.NewTimer(5 * time.Millisecond)
  	defer sendTimeout.Stop()
  	// 发送超时
  	select {
  	case <-sendTimeout.C:
  		return fmt.Errorf("send buff msg timeout")
  	case c.WBuffer <- buf:
  		return nil
  	}
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
115
  func (c *Connection) defaultConnectionCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
116
117
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
118
  func (c *Connection) defaultMessageCallback(msg IMessage) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
119
120
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
121
  func (c *Connection) defaultCloseCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
122
123
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
124
  func (c *Connection) defaultTimerCallback(conn IConnection) {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
125
126
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
127
  func (c *Connection) write() {
765431a4   zhangqijia   增加schema接口, 抽象 mo...
128
  	defer c.quitting()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
129
130
131
  
  	for msg := range c.WBuffer {
  		n, err := c.writer.Write(msg)
cd2f96ab   zhangqijia   fix: 优化连接管理
132
133
  		if err != nil {
  			logger.Error("write fail err: "+err.Error(), "n: ", n)
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
134
135
136
137
138
139
140
141
142
143
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
  			logger.Error("write Flush fail err: " + err.Error())
  			return
  		}
  	}
  }
  
  func (c *Connection) read() {
765431a4   zhangqijia   增加schema接口, 抽象 mo...
144
  	defer c.quitting()
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
145
146
147
148
149
150
151
152
  	c.scanner.Split(c.Server.GetSplitter().ParseMsg)
  
  	for c.scanner.Scan() {
  		req, err := c.Server.GetSplitter().UnPack(c.scanner.Bytes())
  		if err != nil {
  			return
  		}
  
765431a4   zhangqijia   增加schema接口, 抽象 mo...
153
  		req.SetSession(c)
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
154
155
156
  		c.readFunc <- func() {
  			c.messageCallback(req)
  		}
0e5d52de   zhangqijia   reactor: 重构底层框架1.0
157
158
159
160
161
162
163
164
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		logger.Error("scanner.err: %s", err.Error())
  		return
  	}
  }
  
77f5eec7   zhangqijia   plugin 插件热更 接口
165
  //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
cd2f96ab   zhangqijia   fix: 优化连接管理
166
  func (c *Connection) listen() {
765431a4   zhangqijia   增加schema接口, 抽象 mo...
167
  	defer c.quitting()
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
168
  
cd2f96ab   zhangqijia   fix: 优化连接管理
169
  	for {
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
170
  		select {
cd2f96ab   zhangqijia   fix: 优化连接管理
171
  		case timerFunc := <-c.timerFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
172
  			timerFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
173
  		case readFunc := <-c.readFunc:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
174
  			readFunc()
cd2f96ab   zhangqijia   fix: 优化连接管理
175
  		case <-c.Quit:
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
176
177
178
179
180
  			return
  		}
  	}
  }
  
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
181
182
183
184
185
186
187
  func (c *Connection) handleTimeOut() {
  	c.timerFunc <- func() {
  		c.timerCallback(c)
  	}
  	TimeOut(1*time.Second, c.handleTimeOut)
  }
  
cd2f96ab   zhangqijia   fix: 优化连接管理
188
  func (c *Connection) quitting() {
765431a4   zhangqijia   增加schema接口, 抽象 mo...
189
190
191
192
193
194
195
196
197
  	closed := atomic.LoadUint32(&c.Status)
  	if closed == 0 {
  		return
  	}
  	atomic.StoreUint32(&c.Status, 0)
  
  	logger.Debug("ID: %d close", c.Id)
  	close(c.WBuffer)
  	close(c.Quit)
cd2f96ab   zhangqijia   fix: 优化连接管理
198
  	close(c.readFunc)
765431a4   zhangqijia   增加schema接口, 抽象 mo...
199
200
  	c.Conn.Close()
  	c.closeCallback(c)
9a9d092e   zhangqijia   每条连接增加一个定时器,每条连接增...
201
  }