agent_ctrl.lua
5.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
local skynet = require "skynet"
local socket = require "skynet.socket"
local redisproxy = require "shared.redisproxy"
local netpack = require "skynet.netpack"
local xxtea = require "xxtea"
local deque = require "deque"
local datacenter = require "skynet.datacenter"
local pcall = pcall
local string_format = string.format
local poold
-- agent过期时间 10分钟
local AGENT_EXPIRE_TIME				= 300
local _M = {
	-- fd -> uid
	f2u = {},
	-- uid -> 32 << fd | agent
	u2f = {},
	-- fd -> ip
	f2i = {},
	-- fd -> expire
	f2e = {},
	online = 0,
}
local function get_f(pack)
	return (pack >> 32) & 0x7fffffff
end
local function get_a(pack)
	return pack & 0xffffffff
end
local function set_pack(fd, agent)
	return (fd << 32) | agent
end
function _M:init(obj, serice)
	self.factory = deque.clone(obj)
	poold = serice
end
-- @desc: agent退出
function _M:exit_agent(fd)
	self.f2e[fd] = nil
	local uid = self.f2u[fd]
	if not uid then return end
	local pack = self.u2f[uid]
	if not pack then
		self.f2u[fd] = nil
		return
	end
	local agent = get_a(pack)
	pcall(skynet.send, agent, "lua", "exit")
	pcall(skynet.send, poold, "lua", "feed")
	self.f2u[fd] = nil
	self.u2f[uid] = nil
end
-- @desc: 客户端连入
function _M:socket_open(fd, addr)
	self.f2i[fd] = addr
end
-- @desc: 网络关闭
function _M:socket_close(fd)
	self.f2i[fd] = nil
	local uid = self.f2u[fd]
	if not uid then return end
	self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME
	if not self.u2f[uid] then
		self.f2u[fd] = nil
		return 
	end
	local agent = get_a(self.u2f[uid])
	local ok, _ = pcall(skynet.call, agent, "lua", "close")
	if not ok then self:exit_agent(fd) end
end
-- @desc: 网络出错
function _M:socket_error(fd)
	self.f2i[fd] = nil
	local uid = self.f2u[fd]
	if not uid then return end
	if not self.u2f[uid] then
		self.f2u[fd] = nil
		return 
	end
	local agent = get_a(self.u2f[uid])
	-- 无论失败否,应该让逻辑走下去,保证agent状态以及索引正确
	pcall(skynet.call, agent, "lua", "close")
	self:exit_agent(fd)
end
local function table_nums(t)
    local count = 0
    for k, v in pairs(t) do
        count = count + 1
    end
    return count
end
local next_check_time = 0
local next_log_time = 0
local CHECK_AGENT_STATUS_INTERVAL 	= 100 -- 检查agent状态的定时间隔
-- @desc: 检查agent状态,若过期,则让agent退出;并定时打日志统计在线人数
function _M:check_agent_status()
	local now = skynet.timex()
	if now >= next_check_time then
		next_check_time = now + CHECK_AGENT_STATUS_INTERVAL
		for fd, expire in pairs(self.f2e) do
			if expire < now then
				self:exit_agent(fd)
			end
		end
	end
	if now >= next_log_time and now % 60 == 0 and logd then
		next_log_time = now + 60
		local count = table_nums(self.u2f)
		datacenter.set("onlineCount", count)
		pcall(skynet.send, logd, "lua", "log", "online", {count = count}, "online")
	end
end
local hotfixList = {}
function _M:hotfix(code)
	table.insert(hotfixList, code)
	for uid, pack in pairs(self.u2f) do
		local agent = get_a(pack)
		pcall(skynet.send, agent, "lua", "hotfix", code)
	end
end
local function query_agent_response(fd, response)
	local head = string.pack("H", actionCodes.Role_queryLoginRpc + rpcResponseBegin)
	local bin = MsgPack.pack(response)
	if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
	socket.write(fd, netpack.pack(head .. bin))
end
-- @desc: 玩家登陆第一个包,queryLogin,watchdog为客户端分配一个agent,并告诉gate分配成功,之后的消息直接走agent
function _M:query_agent(fd, uid)
	local pack = self.u2f[uid]
	if pack then
		local f = get_f(pack)
		if fd == f then
			skynet.error(string.format("%s same fd %d", uid, fd))
			return 
		end
		-- self.f2u[f] 肯定存在;self.f2e[f]不存在,则说明在线,则需要踢下线
		if not self.f2e[f] then
			local head = string.pack("H", actionCodes.Sys_maintainNotice)
			local bin = MsgPack.pack({body = "server_accountOccupied", iskey = true})
			if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
			socket.write(f, netpack.pack(head .. bin))
			skynet.timeout(10, function ()
				skynet.call(gate_serv, "lua", "kick", f)
			end)
		end
		local agent = get_a(pack)
		local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd])
		if not ok then
			query_agent_response(fd, {ret = "INNER_ERROR"})
			return
		end
		self.f2e[f] = nil
		self.f2u[f] = nil
		self.u2f[uid] = set_pack(fd, agent)
	else
		-- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent
		local agent = self.factory:pop()
		if not agent then
			-- 服务器满
			query_agent_response(fd, {ret = "RET_SERVER_FULL"})
			return 
		end
		local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd], hotfixList)
		if not ok then
			self.factory:push(agent)
			query_agent_response(fd, {ret = "INNER_ERROR"})
			return
		end
		self.u2f[uid] = set_pack(fd, agent)
	end
	self.f2u[fd] = uid
	local response = {}
	local user = redisproxy:get(string_format("uid:%s", uid))
	if user then
		response.ret = "RET_HAS_EXISTED"
		response.name = user
	else
		response.ret = "RET_NOT_EXIST"
	end
	query_agent_response(fd, response)
end
return _M