local skynet = require "skynet" local socket = require "skynet.socket" local redisproxy = require "shared.redisproxy" local netpack = require "skynet.netpack" local xxtea = require "xxtea" local deque = require "deque" local datacenter = require "skynet.datacenter" local pcall = pcall local string_format = string.format local poold -- agent过期时间 5分钟 local AGENT_EXPIRE_TIME = 300 local _M = { -- fd -> uid f2u = {}, -- uid -> 32 << fd | agent u2f = {}, -- fd -> ip f2i = {}, -- fd -> expire f2e = {}, online = 0, } local function get_f(pack) return (pack >> 32) & 0x7fffffff end local function get_a(pack) return pack & 0xffffffff end local function set_pack(fd, agent) return (fd << 32) | agent end function _M:init(obj, serice) self.factory = deque.clone(obj) poold = serice end -- @desc: agent退出 function _M:exit_agent(fd) self.f2e[fd] = nil local uid = self.f2u[fd] if not uid then return end local pack = self.u2f[uid] if not pack then self.f2u[fd] = nil return end local agent = get_a(pack) pcall(skynet.send, agent, "lua", "exit") -- 这里检查是否有排队用户 local nuid, nfd = skynet.call(queued_serv, "lua", "pop") if not nuid then pcall(skynet.send, poold, "lua", "feed") else self:query_agent(nfd, nuid, true) end self.f2u[fd] = nil self.u2f[uid] = nil end -- @desc: 客户端连入 function _M:socket_open(fd, addr) self.f2i[fd] = addr end -- @desc: 网络关闭 function _M:socket_close(fd) self.f2i[fd] = nil local uid = self.f2u[fd] if not uid then -- 排队中? pcall(skynet.call, queued_serv, "lua", "socket_close", fd) return end self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME if not self.u2f[uid] then self.f2u[fd] = nil return end local agent = get_a(self.u2f[uid]) local ok, _ = pcall(skynet.call, agent, "lua", "close") if not ok then self:exit_agent(fd) end end -- @desc: 网络出错 function _M:socket_error(fd) self.f2i[fd] = nil local uid = self.f2u[fd] if not uid then -- 排队中? pcall(skynet.call, queued_serv, "lua", "socket_close", fd) return end if not self.u2f[uid] then self.f2u[fd] = nil return end local agent = get_a(self.u2f[uid]) -- 无论失败否,应该让逻辑走下去,保证agent状态以及索引正确 pcall(skynet.call, agent, "lua", "close") self:exit_agent(fd) end local function table_nums(t) local count = 0 for k, v in pairs(t) do count = count + 1 end return count end local next_check_time = 0 local next_log_time = 0 local CHECK_AGENT_STATUS_INTERVAL = 100 -- 检查agent状态的定时间隔 -- @desc: 检查agent状态,若过期,则让agent退出;并定时打日志统计在线人数 function _M:check_agent_status() local now = skynet.timex() if now >= next_check_time then next_check_time = now + CHECK_AGENT_STATUS_INTERVAL for fd, expire in pairs(self.f2e) do if expire < now then self:exit_agent(fd) end end end if now >= next_log_time and now % 60 == 0 and logd then next_log_time = now + 60 local count = table_nums(self.u2f) datacenter.set("onlineCount", count) pcall(skynet.send, logd, "lua", "log", {["@type"] = "online", count = count}, "log") end end local hotfixList = {} function _M:hotfix(code) table.insert(hotfixList, code) for uid, pack in pairs(self.u2f) do local agent = get_a(pack) pcall(skynet.send, agent, "lua", "hotfix", code) end end local function query_agent_response(fd, response) local head = string.pack("H", actionCodes.Role_queryLoginRpc + rpcResponseBegin) local bin = MsgPack.pack(response) if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end socket.write(fd, netpack.pack(head .. bin)) end -- @desc: 玩家登陆第一个包,queryLogin,watchdog为客户端分配一个agent,并告诉gate分配成功,之后的消息直接走agent function _M:query_agent(fd, uid, isQueue) local pack = self.u2f[uid] if pack then local f = get_f(pack) if fd == f then skynet.error(string.format("%s same fd %d", uid, fd)) return end -- self.f2u[f] 肯定存在;self.f2e[f]不存在,则说明在线,则需要踢下线 if not self.f2e[f] then local head = string.pack("H", actionCodes.Sys_maintainNotice) local bin = MsgPack.pack({body = "server_accountOccupied", iskey = true}) if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end socket.write(f, netpack.pack(head .. bin)) skynet.timeout(10, function () skynet.call(gate_serv, "lua", "kick", f) end) end local agent = get_a(pack) local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd]) if not ok then query_agent_response(fd, {ret = "INNER_ERROR"}) return end self.f2e[f] = nil self.f2u[f] = nil self.u2f[uid] = set_pack(fd, agent) else -- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent local agent if isQueue then agent = skynet.newservice("agent") else agent = self.factory:pop() if not agent then -- 服务器满 -- 开始排队 local ok, rank = pcall(skynet.call, queued_serv, "lua", "push", uid, fd) query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank}) return end end local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd], hotfixList) if not ok then self.factory:push(agent) query_agent_response(fd, {ret = "INNER_ERROR"}) return end self.u2f[uid] = set_pack(fd, agent) end self.f2u[fd] = uid local response = {} local user = redisproxy:get(string_format("uid:%s", uid)) if user then response.ret = "RET_HAS_EXISTED" response.name = user else response.ret = "RET_NOT_EXIST" end query_agent_response(fd, response) end return _M