require "ProtocolCode" require "shared.init" require "utils.init" require "GlobalVar" require "RedisKeys" require "skynet.manager" local queue = require "skynet.queue" local netpack = require "skynet.netpack" local socket = require "skynet.socket" local xxtea = require "xxtea" skynet = require "skynet" redisproxy = require "shared.redisproxy" mysqlproxy = require "shared.mysqlproxy" datacenter = require "skynet.datacenter" mcast_util = require "services/mcast_util" csvdb = require "shared.csvdata" cluster = require "skynet.cluster" local CMD = {} local agentInfo = {} -- { client_fd, role, gate_serv, open_timer} local agent_util, cs _hotfixActions = _hotfixActions or {} _hotfixClass = _hotfixClass or {} _codeSession = {} --- {{{ 定时器相关 local function handle_timeout() if not agentInfo.open_timer then return end agent_util:update(agentInfo) skynet.timeout(100, handle_timeout) end local function handle_gc() if not agentInfo.open_timer then return end collectgarbage("collect") skynet.timeout(6000, handle_gc) end function start_agent_timer() if agentInfo.open_timer then return end agentInfo.open_timer = true skynet.timeout(150, handle_timeout) skynet.timeout(6000, handle_gc) end function cancel_agent_timer() agentInfo.open_timer = false end ---- 定时器相关 }}} local _pipelinings = {} --可同时多个序列 栈的形式保证嵌套不出错 function SendPacket(actionCode, bin, client_fd) --print(actionHandlers[actionCode]) --dump(MsgPack.unpack(bin)) -- 内部消息不扩散出去 if actionCode == actionCodes.Sys_endlessSeason then if agentInfo.role then agentInfo.role:advEndlessSeasonCheck() end return end client_fd = client_fd or agentInfo.client_fd local handlerName = actionHandlers[actionCode] local isRpc = string.sub(handlerName, -3, -1) == "Rpc" local session = _codeSession[actionCode] or 0 if isRpc then actionCode = actionCode + rpcResponseBegin end -- 查看是否是在 流水线操作中 if #_pipelinings > 0 then local _curPipelining = _pipelinings[#_pipelinings] _curPipelining[client_fd] = _curPipelining[client_fd] or {} --区分不同客户端 if not isRpc then session = nil end table.insert(_curPipelining[client_fd], {actionCode, bin, session}) else if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end local head = string.pack("H", actionCode) if isRpc then -- 是回复 -- 有session号 head = head .. string.pack("H", session) end return socket.write(client_fd, netpack.pack(head .. bin)) end end function SendPipelining(callback) if type(callback) ~= "function" then return end --push 当前队列 table.insert(_pipelinings, {}) -- 执行代码块 输出错误信息 local ok, err = pcall(callback) --弹出当前队列 local curPipelining = table.remove(_pipelinings) -- 查看是否有消息 if next(curPipelining) then for client_fd, msg in pairs(curPipelining) do SendPacket(actionCodes.Role_pipelining, MsgPack.pack(msg), client_fd) end end if not ok then error(err) end end function rpcAgent(roleId, funcName, ...) local agent = datacenter.get("agent", roleId) if agent then return skynet.call(agent.serv, "lua", funcName, ...) end end function rpcParter(serv, func, ...) if serv then local ok, result = pcall(skynet.call, serv, "role", func, ...) if ok then return result end end end local string_format = string.format local table_unpack = table.unpack function rpcRole(roleId, funcName, ...) local fields = ... local agent = datacenter.get("agent", roleId) if agent and agent.serv then if funcName == "getProperties" then return true, skynet.call(agent.serv, "role", funcName, fields) else return true, skynet.call(agent.serv, "role", funcName, ...) end else local roleCross = require("models.RoleCross") if funcName == "getProperties" then return false, roleCross.handle(funcName, roleId, fields) else return false, roleCross.handle(funcName, roleId, ...) end end end function rpcUnion(funcName, ...) local serv = agentInfo.userv if not serv then local consortiaId = agentInfo.role:getProperty("consortiaId") if consortiaId == 0 then return true,1000 end local union = datacenter.get("union", consortiaId) if not union or not union.serv then skynet.error("rpcUnion error: union serv addr not exist", funcName, consortiaId) return end serv = union.serv end return skynet.call(serv, "lua", funcName, ...) end function rpcOtherUnion(id, funcName, ...) local union = datacenter.get("union", id) if union and union.serv then return skynet.call(union.serv, "lua", funcName, ...) end end skynet.register_protocol { name = "client", id = skynet.PTYPE_CLIENT, unpack = function (msg, sz) local data = skynet.tostring(msg, sz) local cmd = string.unpack("H", string.sub(data, 1, 2)) return cmd, string.sub(data, 3) end, dispatch = function(session, address, cmd, data) skynet.ignoreret() -- skynet.trace() --执行序的跟踪统计 cs(function() if cmd == actionCodes.Sys_heartBeat then agent_util:heart_beat(agentInfo) return end local actionName = actionHandlers[cmd] if not actionName then print("actionName not exist", actionName) return end if string.sub(actionName, -3, -1) == "Rpc" then -- 是请求 -- 有session号 local codeSession = string.unpack("H", string.sub(data, 1, 2)) _codeSession[cmd] = codeSession data = string.sub(data, 3) end local method if _hotfixActions[actionName] then method = _hotfixActions[actionName] else local modName, funcName = actionName:match("(%w+)%.(%w+)") local ok, action = pcall(require, "actions." .. modName .. "Action") if not ok then print("require module name error", action, modName) return end method = action[funcName] end if type(method) ~= "function" then print("ERROR_SERVER_INVALID_ACTION", modName, funcName) return end if #data > 0 then data = xxtea.decrypt(data, XXTEA_KEY) end -- 一次操作是一个关联操作 记录 ucode 是一样的 if agentInfo.role then agentInfo.role:startActionUcode() end local result = method(agentInfo, data) if agentInfo.role then agentInfo.role:endActionUcode() end -- 清掉请求session _codeSession[cmd] = nil if not result or type(result) == "number" then SendPacket(actionCodes.Sys_innerErrorMsg, MsgPack.pack({id = cmd * 100 + (result or 0)})) end end) end } skynet.register_protocol { name = "role", id = 101, pack = skynet.pack, unpack = skynet.unpack, dispatch = function(session, address, submethod, ...) -- skynet.trace() --执行序的跟踪统计 local result if not agentInfo.role then result = "__OFFLINE__" else result = agentInfo.role[submethod](agentInfo.role, ...) end skynet.ret(skynet.pack(result)) end, } -- function CMD.start(gate, fd, ip) function CMD.start(session, source, gate, capsuled, fd, ip, hotfixs) ignoreHeartbeat = false agentInfo.client_fd = fd agentInfo.gate_serv = gate agentInfo.capsule_serv = capsuled agentInfo.ip = ip agent_util:reset() math.randomInit() if hotfixs then for _, hotfix in ipairs(hotfixs) do CMD.hotfix(hotfix) end end start_agent_timer() -- 这里将消息伪装成 watchdog 发出,这样就由 A->B->C->B->A 变成 A->B->C->A skynet.redirect(gate, source, "lua", session, skynet.pack("forward", fd, 0, skynet.self())) end function CMD.close() cancel_agent_timer() mcast_util.usub_world() mcast_util.usub_union() local role = agentInfo.role if not role then return end skynet.call(agentInfo.capsule_serv, "lua", "exit", role:getProperty("id")) role:log("onLogout", {logtime = skynet.timex()-role:getProperty("ltime")}) role:mylog("logout", {int1 = skynet.timex()-role:getProperty("ltime")}) role:onOfflineEvent() end function CMD.exit() if agentInfo.role then datacenter.set("agent", agentInfo.role:getProperty("id"), nil) end skynet.exit() end function CMD.subUnion(consortiaId, union) mcast_util.sub_union(consortiaId, union.chan) agentInfo.userv = union.serv end function CMD:usubUnion() mcast_util.usub_union() agentInfo.userv = nil end function CMD.hotfix(code) local ok, func = pcall(load, code) if ok then ok = pcall(func, agentInfo.role) end if not ok then skynet.error("hotfix_code error by code " .. code) end end local function routeGM(cmd, params) if type(params) ~= "table" or not agentInfo.role then return "指令失败" end local _M = require "actions.GmAction" agentInfo.role:startActionUcode() local status = _M[cmd](agentInfo.role, params) agentInfo.role:endActionUcode() return status end skynet.start(function() skynet.dispatch("lua", function(session, source, command, ...) -- skynet.trace() --执行序的跟踪统计 local f = CMD[command] if f then if command == "exit" then skynet.ignoreret() f(...) elseif command == "start" then skynet.ignoreret() f(session, source, ...) elseif command == "hotfix" then skynet.ignoreret() f(...) else skynet.ret(skynet.pack(f(...))) end else skynet.ret(skynet.pack(routeGM(command, ...))) end end) skynet.info_func(function() local info = {} info.ip = agentInfo.ip if agentInfo.role then info.roldId = agentInfo.role:getProperty("id") end return info end) cs = queue() pvpd = skynet.localname(".pvpcross") -- 错误码特殊处理 -- todo -- for key, value in pairs(csvdb["sys_codesCsv"]) do -- _G[string.upper(value.varname)] = key -- end globalCsv = csvdb["GlobalDefineCsv"] agent_util = require "services/agent_util" end)