require "ProtocolCode" require "shared.init" require "utils.init" require "GlobalVar" require "RedisKeys" require "skynet.manager" local harbor = require "skynet.harbor" local queue = require "skynet.queue" local netpack = require "skynet.netpack" local socket = require "skynet.socket" local sharedata = require "skynet.sharedata" local xxtea = require "xxtea" skynet = require "skynet" redisproxy = require "shared.redisproxy" datacenter = require "skynet.datacenter" mcast_util = require "services/mcast_util" globalCsv = require "csvdata/GlobalDefine" local CMD = {} local agentInfo = {} -- { client_fd, role, gate_serv, open_timer} local agent_util, cs _hotfixActions = _hotfixActions or {} --- {{{ 定时器相关 local function handle_timeout() if not agentInfo.open_timer then return end if not agentInfo.role then skynet.timeout(100, handle_timeout) return end agent_util:update(agentInfo) skynet.timeout(100, handle_timeout) end function start_agent_timer() agentInfo.open_timer = true skynet.timeout(150, handle_timeout) end function cancel_agent_timer() agentInfo.open_timer = false end ---- 定时器相关 }}} local _pipelinings = {} --可同时多个序列 栈的形式保证嵌套不出错 function SendPacket(actionCode, bin, client_fd) client_fd = client_fd or agentInfo.client_fd local handlerName = actionHandlers[actionCode] if string.sub(handlerName, -3, -1) == "Rpc" then actionCode = actionCode + rpcResponseBegin end -- 查看是否是在 流水线操作中 if #_pipelinings > 0 then local _curPipelining = _pipelinings[#_pipelinings] _curPipelining[client_fd] = _curPipelining[client_fd] or {} --区分不同客户端 table.insert(_curPipelining[client_fd], {actionCode, bin}) else if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end local head = string.pack("H", actionCode) return socket.write(client_fd, netpack.pack(head .. bin)) end end function SendPipelining(callback) if type(callback) ~= "function" then return end --push 当前队列 table.insert(_pipelinings, {}) -- 执行代码块 输出错误信息 local ok, err = pcall(callback) --弹出当前队列 local curPipelining = table.remove(_pipelinings) -- 查看是否有消息 if next(curPipelining) then for client_fd, msg in pairs(curPipelining) do SendPacket(actionCodes.Role_pipelining, MsgPack.pack(msg), client_fd) end end if not ok then error(err) end end function rpcAgent(roleId, funcName, ...) local agent = datacenter.get("agent", roleId) if agent then return skynet.call(agent.serv, "lua", funcName, ...) end end function rpcParter(serv, func, ...) if serv then local ok, result = pcall(skynet.call, serv, "role", func, ...) if ok then return result end end end local string_format = string.format local table_unpack = table.unpack function rpcRole(roleId, funcName, ...) local fields = ... local agent = datacenter.get("agent", roleId) if agent and agent.serv then if funcName == "getProperties" then return true, skynet.call(agent.serv, "role", funcName, fields) else return true, skynet.call(agent.serv, "role", funcName, ...) end else local rediskey = string_format("role:%d", roleId) if funcName == "setProperty" then return false, redisproxy:hset(rediskey, ...) elseif funcName == "getProperty" then return false, redisproxy:hget(rediskey, ...) elseif funcName == "getProperties" then local sRole = require("models.Role") local returnValue = redisproxy:hmget(rediskey, table_unpack(...)) local ret = {} for index, key in ipairs(fields) do local typ = sRole.schema[key][1] local def = sRole.schema[key][2] if typ == "number" then ret[key] = tonumber(returnValue[index] or def) else ret[key] = returnValue[index] end end return false, ret elseif funcName == "setProperties" then local result = {} for k,v in pairs(fields) do result[#result+1] = k result[#result+1] = v end return false, redisproxy:hmset(rediskey, table_unpack(result)) end end end function rpcUnion(funcName, ...) local serv = agentInfo.userv if not serv then local consortiaId = agentInfo.role:getProperty("consortiaId") if consortiaId == 0 then return true,1000 end local union = datacenter.get("union", consortiaId) if not union or not union.serv then skynet.error("rpcUnion error: union serv addr not exist", funcName, consortiaId) return end serv = union.serv end return skynet.call(serv, "lua", funcName, ...) end function rpcOtherUnion(id, funcName, ...) local union = datacenter.get("union", id) if union and union.serv then return skynet.call(union.serv, "lua", funcName, ...) end end skynet.register_protocol { name = "client", id = skynet.PTYPE_CLIENT, unpack = function (msg, sz) local data = skynet.tostring(msg, sz) local cmd = string.unpack("H", string.sub(data, 1, 2)) return cmd, string.sub(data, 3) end, dispatch = function(session, address, cmd, data) skynet.ignoreret() -- skynet.trace() --执行序的跟踪统计 cs(function() if cmd == actionCodes.Sys_heartBeat then agent_util:heart_beat(agentInfo) return end local actionName = actionHandlers[cmd] if not actionName then print("actionName not exist", actionName) return end local method if _hotfixActions[actionName] then method = _hotfixActions[actionName] else local modName, funcName = actionName:match("(%w+)%.(%w+)") local ok, action = pcall(require, "actions." .. modName .. "Action") if not ok then print("require module name error", action, modName) return end method = action[funcName] end if type(method) ~= "function" then print("ERROR_SERVER_INVALID_ACTION", modName, funcName) return end if #data > 0 then data = xxtea.decrypt(data, XXTEA_KEY) end local result = method(agentInfo, data) if not result or type(result) == "number" then SendPacket(actionCodes.Sys_innerErrorMsg, MsgPack.pack({id = cmd * 100 + (result or 0)})) end end) end } skynet.register_protocol { name = "role", id = 13, pack = skynet.pack, unpack = skynet.unpack, dispatch = function(session, address, submethod, ...) -- skynet.trace() --执行序的跟踪统计 local result if not agentInfo.role then result = "__OFFLINE__" else result = agentInfo.role[submethod](agentInfo.role, ...) end skynet.ret(skynet.pack(result)) end, } -- function CMD.start(gate, fd, ip) function CMD.start(session, source, gate, fd, ip, hotfixs) ignoreHeartbeat = false agentInfo.client_fd = fd agentInfo.gate_serv = gate agentInfo.ip = ip agent_util:reset() math.randomInit() if hotfixs then for _, hotfix in ipairs(hotfixs) do CMD.hotfix(hotfix) end end -- 这里将消息伪装成 watchdog 发出,这样就由 A->B->C->B->A 变成 A->B->C->A skynet.redirect(gate, source, "lua", session, skynet.pack("forward", fd, 0, skynet.self())) end function CMD.close() cancel_agent_timer() mcast_util.usub_world() mcast_util.usub_union() local role = agentInfo.role if not role then return end role:log("logout", {online = skynet.timex()-role:getProperty("ltime")}) role:onOfflineEvent() end function CMD.exit() if agentInfo.role then -- role:log("logout", {online = skynet.timex()-role:getProperty("ltime")}) datacenter.set("agent", agentInfo.role:getProperty("id"), nil) end skynet.exit() end function CMD.subUnion(consortiaId, union) mcast_util.sub_union(consortiaId, union.chan) agentInfo.userv = union.serv end function CMD:usubUnion() mcast_util.usub_union() agentInfo.userv = nil end function CMD.hotfix(code) local ok, func = pcall(load, code) if ok then ok = pcall(func) end if not ok then skynet.error("hotfix error by code " .. code) end end local function routeGM(cmd, params) if type(params) ~= "table" or not agentInfo.role then return "指令失败" end local _M = require "actions.GmAction" return _M[cmd](agentInfo.role, params) end skynet.start(function() skynet.dispatch("lua", function(session, source, command, ...) -- skynet.trace() --执行序的跟踪统计 local f = CMD[command] if f then if command == "exit" then skynet.ignoreret() f(...) elseif command == "start" then skynet.ignoreret() f(session, source, ...) else skynet.ret(skynet.pack(f(...))) end else skynet.ret(skynet.pack(routeGM(command, ...))) end end) redisd = harbor.queryname("REDIS") if tonumber(skynet.getenv "logd") == 1 then logd = harbor.queryname("LOGD") end cs = queue() csvdb = sharedata.query("csvdata") -- 错误码特殊处理 -- todo -- for key, value in pairs(csvdb["sys_codesCsv"]) do -- _G[string.upper(value.varname)] = key -- end agent_util = require "services/agent_util" end)