0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
1
2
3
|
package main
import (
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
4
5
6
7
|
"github.com/golang/protobuf/proto"
"math"
"pro2d/common"
"pro2d/common/components"
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
8
|
"pro2d/common/logger"
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
9
|
"pro2d/models"
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
10
|
"pro2d/utils"
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
11
12
13
14
15
16
17
18
19
|
"sync/atomic"
)
type Agent struct {
components.IConnection
Server components.IServer
Role *models.RoleModel
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
20
|
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
21
22
23
24
25
26
27
28
29
30
|
Quit chan *Agent
nextCheckTime int64 //下一次检查的时间
lastHeartCheckTime int64
heartTimeoutCount int //超时次数
}
func NewAgent(s components.IServer) *Agent {
return &Agent{
Server: s,
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
31
|
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
32
33
34
35
36
37
38
39
|
Quit: make(chan *Agent),
nextCheckTime: 0,
lastHeartCheckTime: utils.Timex(),
heartTimeoutCount: 0,
}
}
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
40
41
|
func (c *Agent) OnConnection(conn components.IConnection) {
c.IConnection = conn
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
42
43
44
|
}
func (c *Agent) OnMessage(msg components.IMessage) {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
45
|
atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex())
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
46
|
md := c.Server.GetAction(msg.GetHeader().GetMsgID())
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
47
48
49
50
51
52
53
54
|
if md == nil {
logger.Debug("cmd: %d, md is nil", msg.GetHeader().GetMsgID())
return
}
logger.Debug("protocode handler: %d", msg.GetHeader().GetMsgID())
//fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg)
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
55
56
|
f := md.(func (msg components.IMessage) (int32, interface{}))
errCode, protomsg := f(msg)
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
57
58
|
rsp, err := proto.Marshal(protomsg.(proto.Message))
if err != nil {
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
59
|
conn := msg.GetSession()
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
60
|
if conn != nil {
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
61
|
conn.Send(-100, msg.GetHeader().GetMsgID(), nil)
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
62
63
64
|
}
return
}
|
765431a4
zhangqijia
增加schema接口, 抽象 mo...
|
65
|
conn := msg.GetSession()
|
77f5eec7
zhangqijia
plugin 插件热更 接口
|
66
67
68
69
|
if conn != nil {
conn.Send(errCode, msg.GetHeader().GetMsgID(), rsp)
}
return
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
logger.Error("protocode not handler: %d", msg.GetHeader().GetMsgID())
}
func (c *Agent) OnTimer() {
nextCheckTime := atomic.LoadInt64(&c.nextCheckTime)
now := utils.Timex()
if now >= nextCheckTime {
//检查心跳
c.checkHeartBeat(now)
nextCheckTime = now + common.HeartTimerInterval
atomic.StoreInt64(&c.nextCheckTime, nextCheckTime)
}
if c.Role != nil {
//role 恢复数据
c.Role.OnRecoverTimer(now)
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
86
|
}
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
87
88
89
|
}
func (c *Agent) OnClose() {
|
9a9d092e
zhangqijia
每条连接增加一个定时器,每条连接增...
|
90
|
c.Close()
|
0e5d52de
zhangqijia
reactor: 重构底层框架1.0
|
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
}
func (c *Agent) Close() {
if c.Role == nil {
return
}
c.Role.OnOfflineEvent()
}
func (c *Agent) checkHeartBeat(now int64) {
lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime)
logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.GetID(), lastHeartCheckTime, now)
if math.Abs(float64(lastHeartCheckTime - now)) > common.HeartTimerInterval {
c.heartTimeoutCount++
if c.heartTimeoutCount >= common.HeartTimeoutCountMax {
c.Stop()
return
}
logger.Debug("timeout count: %d", c.heartTimeoutCount)
}else {
c.heartTimeoutCount = 0
}
}
|