Blame view

src/components/net/conn.go 3.7 KB
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
1
2
3
4
5
  package net
  
  import (
  	"bufio"
  	"fmt"
0cc58315   zhangqijia   添加定时器, 检查心跳
6
  	"math"
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
7
  	"net"
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
8
9
  	"pro2d/src/common"
  	"pro2d/src/components/logger"
38dd96b4   zhangqijia   定时器+网络数据 peer 在一条...
10
  	"pro2d/src/models"
0cc58315   zhangqijia   添加定时器, 检查心跳
11
12
  	"pro2d/src/utils"
  	"sync/atomic"
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
13
14
15
16
17
18
19
20
21
22
23
24
  )
  
  type Head struct {
  	Length 		int32
  	Cmd 		int32
  	ErrCode 	int32
  	PreField    int32
  }
  
  
  type Connection struct {
  	net.Conn
0cc58315   zhangqijia   添加定时器, 检查心跳
25
26
  	Id 				int
  	Server 			*Server
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
27
  
0cc58315   zhangqijia   添加定时器, 检查心跳
28
29
  	scanner 		*bufio.Scanner
  	writer 			*bufio.Writer
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
30
  
0cc58315   zhangqijia   添加定时器, 检查心跳
31
  	WBuffer 		chan []byte
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
32
  
2e0aa298   zhangqijia   update 每条连接新增一条协程...
33
34
35
  	updateFunc 		chan func()
  	readFunc   		chan func()
  
0cc58315   zhangqijia   添加定时器, 检查心跳
36
37
  	Quit 			chan *Connection
  
38dd96b4   zhangqijia   定时器+网络数据 peer 在一条...
38
39
  	Role			*models.RoleModel
  
0cc58315   zhangqijia   添加定时器, 检查心跳
40
41
42
  	nextCheckTime   int64 //下一次检查的时间
  	lastHeartCheckTime int64 //最后收消息时间
  	heartTimeoutCount  int   //超时次数
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
43
44
45
  }
  
  type MsgPkg struct {
9644352a   zhangqijia   登录服改为http,游戏服改为长连...
46
  	Head *Head
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
47
48
49
50
51
52
53
54
55
56
  	Body []byte
  	Conn *Connection
  }
  
  func NewConn(id int, conn net.Conn, s *Server) *Connection {
  	return &Connection{
  		Id: id,
  		Conn: conn,
  		Server: s,
  
0cc58315   zhangqijia   添加定时器, 检查心跳
57
58
59
  		scanner:            bufio.NewScanner(conn),
  		writer:             bufio.NewWriter(conn),
  		WBuffer:            make(chan []byte),
2e0aa298   zhangqijia   update 每条连接新增一条协程...
60
61
  		updateFunc:         make(chan func()),
  		readFunc:           make(chan func()),
0cc58315   zhangqijia   添加定时器, 检查心跳
62
63
  		Quit:               make(chan *Connection),
  		lastHeartCheckTime: utils.Timex(),
2e0aa298   zhangqijia   update 每条连接新增一条协程...
64
  		heartTimeoutCount:  0,
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
65
66
67
68
69
70
71
  	}
  }
  
  func (c *Connection) write()  {
  	defer c.Quiting()
  
  	for msg := range c.WBuffer {
98b0736d   zhangqijia   添加定时器, 检查心跳
72
73
74
  		n, err := c.writer.Write(msg)
  		if err != nil{
  			logger.Error("write fail err: " + err.Error(), "n: ", n)
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
75
76
77
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
98b0736d   zhangqijia   添加定时器, 检查心跳
78
  			logger.Error("write Flush fail err: " + err.Error())
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
79
80
  			return
  		}
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
81
82
83
84
85
  	}
  }
  
  func (c *Connection) read() {
  	defer c.Quiting()
5d9cf01c   zhangqijia   plugin 热更
86
  	c.scanner.Split(ParseMsg)
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
87
  
5d9cf01c   zhangqijia   plugin 热更
88
89
90
  	for c.scanner.Scan() {
  		req, err := DecodeMsg(c.scanner.Bytes())
  		if err != nil {
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
91
92
  			return
  		}
5d9cf01c   zhangqijia   plugin 热更
93
94
  
  		req.Conn = c
2e0aa298   zhangqijia   update 每条连接新增一条协程...
95
  		c.readFunc <- func() {
38dd96b4   zhangqijia   定时器+网络数据 peer 在一条...
96
97
  			c.Server.DoMsgHandler(req)
  		}
5d9cf01c   zhangqijia   plugin 热更
98
  
0cc58315   zhangqijia   添加定时器, 检查心跳
99
100
  		atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex())
  
5d9cf01c   zhangqijia   plugin 热更
101
102
103
104
105
106
  		//备注,可以在当前协程处理当条请求(如下, 实现很简单,已经删除),也可以丢到协程池里处理任务(如上),还未对比效果。
  		//c.Server.OnRecv(req)
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		fmt.Printf("scanner.err: %s\n", err.Error())
5d9cf01c   zhangqijia   plugin 热更
107
  		return
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
108
109
110
  	}
  }
  
0cc58315   zhangqijia   添加定时器, 检查心跳
111
112
113
  func (c *Connection) checkHeartBeat(now int64)  {
  	lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime)
  	logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.Id, lastHeartCheckTime, now)
33ea26ab   zhangqijia   使用schema封装mongo
114
  	if math.Abs(float64(lastHeartCheckTime - now)) > common.HeartTimerInterval {
0cc58315   zhangqijia   添加定时器, 检查心跳
115
  		c.heartTimeoutCount++
33ea26ab   zhangqijia   使用schema封装mongo
116
  		if c.heartTimeoutCount >= common.HeartTimeoutCountMax {
0cc58315   zhangqijia   添加定时器, 检查心跳
117
118
119
120
121
122
123
124
125
126
127
128
129
  			c.Quiting()
  			return
  		}
  		logger.Debug("timeout count: %d", c.heartTimeoutCount)
  	}else {
  		c.heartTimeoutCount = 0
  	}
  }
  
  func (c *Connection) update()  {
  	nextCheckTime := atomic.LoadInt64(&c.nextCheckTime)
  	now := utils.Timex()
  	if now >= nextCheckTime {
38dd96b4   zhangqijia   定时器+网络数据 peer 在一条...
130
  		//检查心跳
0cc58315   zhangqijia   添加定时器, 检查心跳
131
  		c.checkHeartBeat(now)
33ea26ab   zhangqijia   使用schema封装mongo
132
  		nextCheckTime = now + common.HeartTimerInterval
0cc58315   zhangqijia   添加定时器, 检查心跳
133
134
  		atomic.StoreInt64(&c.nextCheckTime, nextCheckTime)
  	}
38dd96b4   zhangqijia   定时器+网络数据 peer 在一条...
135
  
7f269318   zhangqijia   add pb.go; 添加关闭连接...
136
137
  	c.updateFunc <- func() {
  	 	if c.Role != nil {
2e0aa298   zhangqijia   update 每条连接新增一条协程...
138
  			//role 恢复数据
38dd96b4   zhangqijia   定时器+网络数据 peer 在一条...
139
140
141
  			c.Role.OnRecoverTimer(now)
  		}
  	}
0cc58315   zhangqijia   添加定时器, 检查心跳
142
  }
33ea26ab   zhangqijia   使用schema封装mongo
143
  
2e0aa298   zhangqijia   update 每条连接新增一条协程...
144
145
146
147
148
149
150
151
152
153
154
155
156
  func (c *Connection) flush()  {
  	defer c.Stop()
  	for  {
  		select {
  		case rf := <- c.readFunc:
  			rf()
  		case uf := <- c.updateFunc:
  			uf()
  		case <- c.Quit:
  			return
  		}
  	}
  }
0cc58315   zhangqijia   添加定时器, 检查心跳
157
  
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
158
159
160
  func (c *Connection) Start()  {
  	go c.write()
  	go c.read()
2e0aa298   zhangqijia   update 每条连接新增一条协程...
161
   	c.flush()
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
162
163
164
  }
  
  func (c *Connection) Stop()  {
2e0aa298   zhangqijia   update 每条连接新增一条协程...
165
  	logger.Debug("ID: %d close", c.Id)
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
166
  	c.Conn.Close()
7f269318   zhangqijia   add pb.go; 添加关闭连接...
167
168
169
170
  	if c.Role != nil {
  		c.Role.OnOfflineEvent()
  	}
  
2e0aa298   zhangqijia   update 每条连接新增一条协程...
171
  	c.Server.OnClose(c)
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
172
173
174
  }
  
  func (c *Connection) Quiting()  {
2e0aa298   zhangqijia   update 每条连接新增一条协程...
175
  	c.Quit <- c
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
176
177
  }
  
9644352a   zhangqijia   登录服改为http,游戏服改为长连...
178
179
180
181
182
183
184
185
186
187
188
189
190
  func (c *Connection) SendMsgByCode(errCode int32, cmd int32, data []byte){
  	h := &Head{
  		Length:   int32(common.HEADLEN + len(data)),
  		Cmd:      cmd,
  		ErrCode:  errCode,
  		PreField: 0,
  	}
  	pkg := &MsgPkg{
  		Head: h,
  		Body: data,
  	}
  	buf, err := EncodeMsg(pkg)
  	if err != nil {
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
191
  		logger.Error("SendMsg error: %v", err)
9644352a   zhangqijia   登录服改为http,游戏服改为长连...
192
193
194
  		return
  	}
  	c.WBuffer <- buf
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
195
  }