agent_ctrl.lua
5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
local skynet = require "skynet"
local socket = require "skynet.socket"
local redisproxy = require "shared.redisproxy"
local netpack = require "skynet.netpack"
local xxtea = require "xxtea"
local deque = require "deque"
local datacenter = require "skynet.datacenter"
local agent_queued = require "services.agent_queued"
local logproxy = require "shared.logproxy"
local pcall = pcall
local string_format = string.format
require "utils.MysqlUtil"
local poold
-- agent过期时间 5分钟
local AGENT_EXPIRE_TIME = 300
local _M = {
-- fd -> uid
f2u = {},
-- uid -> 32 << fd | agent
u2f = {},
-- fd -> ip
f2i = {},
-- fd -> expire
f2e = {},
online = 0,
}
local function get_f(pack)
return (pack >> 32) & 0x7fffffff
end
local function get_a(pack)
return pack & 0xffffffff
end
local function set_pack(fd, agent)
return (fd << 32) | agent
end
function _M:init(obj, serice)
self.factory = deque.clone(obj)
poold = serice
end
-- @desc: agent退出
function _M:exit_agent(fd)
self.f2e[fd] = nil
local uid = self.f2u[fd]
if not uid then return end
local pack = self.u2f[uid]
if not pack then
self.f2u[fd] = nil
return
end
local agent = get_a(pack)
pcall(skynet.send, agent, "lua", "exit")
self.f2u[fd] = nil
self.u2f[uid] = nil
local nuid, nfd = agent_queued.pop()
if not nuid then
pcall(skynet.send, poold, "lua", "feed")
else
self:query_agent(nfd, nuid, true)
end
end
-- @desc: 客户端连入
function _M:socket_open(fd, addr)
self.f2i[fd] = addr
end
-- @desc: 网络关闭
function _M:socket_close(fd)
self.f2i[fd] = nil
local uid = self.f2u[fd]
if not uid then
-- 排队中?
agent_queued.socket_close(fd)
return
end
self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME
if not self.u2f[uid] then
self.f2u[fd] = nil
return
end
local agent = get_a(self.u2f[uid])
local ok, _ = pcall(skynet.call, agent, "lua", "close")
if not ok then self:exit_agent(fd) end
end
-- @desc: 网络出错
function _M:socket_error(fd)
self.f2i[fd] = nil
local uid = self.f2u[fd]
if not uid then
-- 排队中?
agent_queued.socket_close(fd)
return
end
if not self.u2f[uid] then
self.f2u[fd] = nil
return
end
local agent = get_a(self.u2f[uid])
-- 无论失败否,应该让逻辑走下去,保证agent状态以及索引正确
pcall(skynet.call, agent, "lua", "close")
self:exit_agent(fd)
end
local function table_nums(t)
local count = 0
for k, v in pairs(t) do
count = count + 1
end
return count
end
local next_check_time = 0
local next_log_time = 0
local CHECK_AGENT_STATUS_INTERVAL = 100 -- 检查agent状态的定时间隔
-- @desc: 检查agent状态,若过期,则让agent退出;并定时打日志统计在线人数
function _M:check_agent_status()
agent_queued.handle_timeout()
local now = skynet.timex()
if now >= next_check_time then
next_check_time = now + CHECK_AGENT_STATUS_INTERVAL
for fd, expire in pairs(self.f2e) do
if expire < now then
self:exit_agent(fd)
end
end
end
if now >= next_log_time and now % 60 == 0 then
next_log_time = now + 60
local count = table_nums(self.u2f)
datacenter.set("onlineCount", count)
logproxy:log({["@type"] = "online", count = count}, "log")
end
end
local hotfixList = {}
function _M:hotfix(code)
table.insert(hotfixList, code)
for uid, pack in pairs(self.u2f) do
local agent = get_a(pack)
pcall(skynet.send, agent, "lua", "hotfix", code)
end
end
local function query_agent_response(fd, response)
local head = string.pack("H", actionCodes.Role_queryLoginRpc + rpcResponseBegin)
local bin = MsgPack.pack(response)
if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
socket.write(fd, netpack.pack(head .. bin))
end
-- @desc: 玩家登陆第一个包,queryLogin,watchdog为客户端分配一个agent,并告诉gate分配成功,之后的消息直接走agent
function _M:query_agent(fd, uid, isQueue)
local pack = self.u2f[uid]
if pack then
local f = get_f(pack)
if fd == f then
skynet.error(string.format("%s same fd %d", uid, fd))
return
end
-- self.f2u[f] 肯定存在;self.f2e[f]不存在,则说明在线,则需要踢下线
if not self.f2e[f] then
local head = string.pack("H", actionCodes.Sys_maintainNotice)
local bin = MsgPack.pack({body = "server_accountOccupied", iskey = true})
if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
socket.write(f, netpack.pack(head .. bin))
skynet.timeout(10, function ()
skynet.call(gate_serv, "lua", "kick", f)
end)
end
local agent = get_a(pack)
local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, capsuled, fd, self.f2i[fd])
if not ok then
query_agent_response(fd, {ret = "INNER_ERROR"})
return
end
self.f2e[f] = nil
self.f2u[f] = nil
self.u2f[uid] = set_pack(fd, agent)
else
local agent
if isQueue then
agent = self.factory:pop()
if agent then
pcall(skynet.send, poold, "lua", "feed")
else
agent = skynet.newservice("agent")
end
else
-- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent
if agent_queued.count() > 0 then
-- 服务器满 开始排队
local rank = agent_queued.push(uid, fd)
query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank})
return
end
agent = self.factory:pop()
if not agent then
local rank = agent_queued.push(uid, fd)
query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank})
return
end
end
local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, capsuled, fd, self.f2i[fd], hotfixList)
if not ok then
self.factory:push(agent)
query_agent_response(fd, {ret = "INNER_ERROR"})
return
end
self.u2f[uid] = set_pack(fd, agent)
end
self.f2u[fd] = uid
local response = {}
--local user = redisproxy:get(string_format("uid:%s", uid))
local res, user = roleUidExists(uid)
if res then
response.ret = "RET_HAS_EXISTED"
response.name = user
else
response.ret = "RET_NOT_EXIST"
end
query_agent_response(fd, response)
end
return _M