local skynet = require "skynet" local socket = require "skynet.socket" local redisproxy = require "shared.redisproxy" local netpack = require "skynet.netpack" local xxtea = require "xxtea" local deque = require "deque" local datacenter = require "skynet.datacenter" local agent_queued = require "services.agent_queued" local logproxy = require "shared.logproxy" local pcall = pcall local string_format = string.format local poold -- agent过期时间 5分钟 local AGENT_EXPIRE_TIME = 300 local _M = { -- fd -> uid f2u = {}, -- uid -> 32 << fd | agent u2f = {}, -- fd -> ip f2i = {}, -- fd -> expire f2e = {}, online = 0, } local function get_f(pack) return (pack >> 32) & 0x7fffffff end local function get_a(pack) return pack & 0xffffffff end local function set_pack(fd, agent) return (fd << 32) | agent end function _M:init(obj, serice) self.factory = deque.clone(obj) poold = serice end -- @desc: agent退出 function _M:exit_agent(fd) self.f2e[fd] = nil local uid = self.f2u[fd] if not uid then return end local pack = self.u2f[uid] if not pack then self.f2u[fd] = nil return end local agent = get_a(pack) pcall(skynet.send, agent, "lua", "exit") self.f2u[fd] = nil self.u2f[uid] = nil local nuid, nfd = agent_queued.pop() if not nuid then pcall(skynet.send, poold, "lua", "feed") else self:query_agent(nfd, nuid, true) end end -- @desc: 客户端连入 function _M:socket_open(fd, addr) self.f2i[fd] = addr end -- @desc: 网络关闭 function _M:socket_close(fd) self.f2i[fd] = nil local uid = self.f2u[fd] if not uid then -- 排队中? agent_queued.socket_close(fd) return end self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME if not self.u2f[uid] then self.f2u[fd] = nil return end local agent = get_a(self.u2f[uid]) local ok, _ = pcall(skynet.call, agent, "lua", "close") if not ok then self:exit_agent(fd) end end -- @desc: 网络出错 function _M:socket_error(fd) self.f2i[fd] = nil local uid = self.f2u[fd] if not uid then -- 排队中? agent_queued.socket_close(fd) return end if not self.u2f[uid] then self.f2u[fd] = nil return end local agent = get_a(self.u2f[uid]) -- 无论失败否,应该让逻辑走下去,保证agent状态以及索引正确 pcall(skynet.call, agent, "lua", "close") self:exit_agent(fd) end local function table_nums(t) local count = 0 for k, v in pairs(t) do count = count + 1 end return count end local next_check_time = 0 local next_log_time = 0 local CHECK_AGENT_STATUS_INTERVAL = 100 -- 检查agent状态的定时间隔 -- @desc: 检查agent状态,若过期,则让agent退出;并定时打日志统计在线人数 function _M:check_agent_status() agent_queued.handle_timeout() local now = skynet.timex() if now >= next_check_time then next_check_time = now + CHECK_AGENT_STATUS_INTERVAL for fd, expire in pairs(self.f2e) do if expire < now then self:exit_agent(fd) end end end if now >= next_log_time and now % 60 == 0 then next_log_time = now + 60 local count = table_nums(self.u2f) datacenter.set("onlineCount", count) logproxy:log({["@type"] = "online", count = count}, "log") end end local hotfixList = {} function _M:hotfix(code) table.insert(hotfixList, code) for uid, pack in pairs(self.u2f) do local agent = get_a(pack) pcall(skynet.send, agent, "lua", "hotfix", code) end end local function query_agent_response(fd, response) local head = string.pack("H", actionCodes.Role_queryLoginRpc + rpcResponseBegin) local bin = MsgPack.pack(response) if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end socket.write(fd, netpack.pack(head .. bin)) end -- @desc: 玩家登陆第一个包,queryLogin,watchdog为客户端分配一个agent,并告诉gate分配成功,之后的消息直接走agent function _M:query_agent(fd, uid, isQueue) local pack = self.u2f[uid] if pack then local f = get_f(pack) if fd == f then skynet.error(string.format("%s same fd %d", uid, fd)) return end -- self.f2u[f] 肯定存在;self.f2e[f]不存在,则说明在线,则需要踢下线 if not self.f2e[f] then local head = string.pack("H", actionCodes.Sys_maintainNotice) local bin = MsgPack.pack({body = "server_accountOccupied", iskey = true}) if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end socket.write(f, netpack.pack(head .. bin)) skynet.timeout(10, function () skynet.call(gate_serv, "lua", "kick", f) end) end local agent = get_a(pack) local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd]) if not ok then query_agent_response(fd, {ret = "INNER_ERROR"}) return end self.f2e[f] = nil self.f2u[f] = nil self.u2f[uid] = set_pack(fd, agent) else local agent if isQueue then agent = self.factory:pop() if agent then pcall(skynet.send, poold, "lua", "feed") else agent = skynet.newservice("agent") end else -- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent if agent_queued.count() > 0 then -- 服务器满 开始排队 local rank = agent_queued.push(uid, fd) query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank}) return end agent = self.factory:pop() if not agent then local rank = agent_queued.push(uid, fd) query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank}) return end end local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd], hotfixList) if not ok then self.factory:push(agent) query_agent_response(fd, {ret = "INNER_ERROR"}) return end self.u2f[uid] = set_pack(fd, agent) end self.f2u[fd] = uid local response = {} local user = redisproxy:get(string_format("uid:%s", uid)) if user then response.ret = "RET_HAS_EXISTED" response.name = user else response.ret = "RET_NOT_EXIST" end query_agent_response(fd, response) end return _M