agent.lua 8.62 KB
require "ProtocolCode"
require "shared.init"
require "utils.init"
require "GlobalVar"
require "RedisKeys"
require "skynet.manager"

local harbor = require "skynet.harbor"
local queue = require "skynet.queue"
local netpack = require "skynet.netpack"
local socket = require "skynet.socket"
local sharedata = require "skynet.sharedata"
local xxtea = require "xxtea"

skynet = require "skynet"
redisproxy = require "shared.redisproxy"
datacenter = require "skynet.datacenter"
mcast_util = require "services/mcast_util"
globalCsv = require "csvdata/GlobalDefine"

local CMD = {}
local agentInfo = {}  -- { client_fd, role, gate_serv, open_timer}

local agent_util, cs

_hotfixActions = _hotfixActions or {}

--- {{{ 定时器相关
local function handle_timeout()
	if not agentInfo.open_timer then return end

	if not agentInfo.role then
		skynet.timeout(100, handle_timeout)
		return
	end

	agent_util:update(agentInfo)
	skynet.timeout(100, handle_timeout)
end

function start_agent_timer()
	agentInfo.open_timer = true
	skynet.timeout(150, handle_timeout)
end

function cancel_agent_timer()
	agentInfo.open_timer = false
end
---- 定时器相关 }}}

local _pipelinings = {}  --可同时多个序列  栈的形式保证嵌套不出错
function SendPacket(actionCode, bin, client_fd)
	client_fd = client_fd or agentInfo.client_fd
	
	local handlerName = actionHandlers[actionCode]
	if string.sub(handlerName, -3, -1) == "Rpc" then
		actionCode = actionCode + rpcResponseBegin
	end
	--  查看是否是在 流水线操作中
	if #_pipelinings > 0 then
		local _curPipelining = _pipelinings[#_pipelinings]
		_curPipelining[client_fd] = _curPipelining[client_fd] or {} --区分不同客户端
		table.insert(_curPipelining[client_fd], {actionCode, bin})
	else
		if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
		local head = string.pack("H", actionCode)
		return socket.write(client_fd, netpack.pack(head .. bin))
	end
end

function SendPipelining(callback)
	if type(callback) ~= "function" then return end
	--push 当前队列
	table.insert(_pipelinings, {})
	-- 执行代码块  输出错误信息
	local ok, err = pcall(callback)
	--弹出当前队列
	local curPipelining = table.remove(_pipelinings)
	-- 查看是否有消息
	if next(curPipelining) then  
		for client_fd, msg in pairs(curPipelining) do
			SendPacket(actionCodes.Role_pipelining, MsgPack.pack(msg), client_fd)
		end
	end
	if not ok then error(err) end
end

function rpcAgent(roleId, funcName, ...)
	local agent = datacenter.get("agent", roleId)
	if agent then
		return skynet.call(agent.serv, "lua", funcName, ...)
	end
end

function rpcParter(serv, func, ...)
	if serv then
		local ok, result = pcall(skynet.call, serv, "role", func, ...)
		if ok then
			return result
		end
	end
end

local string_format = string.format
local table_unpack = table.unpack
function rpcRole(roleId, funcName, ...)
	local fields = ...
	local agent = datacenter.get("agent", roleId)
	if agent and agent.serv then
		if funcName == "getProperties" then
			return true, skynet.call(agent.serv, "role", funcName, fields)
		else
			return true, skynet.call(agent.serv, "role", funcName, ...)
		end
	else
		local rediskey = string_format("role:%d", roleId)
		if funcName == "setProperty" then
			return false, redisproxy:hset(rediskey, ...)
		elseif funcName == "getProperty" then
			return false, redisproxy:hget(rediskey, ...)
		elseif funcName == "getProperties" then
			local sRole = require("models.Role")
			local returnValue = redisproxy:hmget(rediskey, table_unpack(...))
			local ret = {}
			for index, key in ipairs(fields) do
			    local typ = sRole.schema[key][1]
			    local def = sRole.schema[key][2]
			    if typ == "number" then
			    	ret[key] = tonumber(returnValue[index] or def)
				else
			    	ret[key] = returnValue[index]
			    end
			end
			return false, ret
		elseif funcName == "setProperties" then
			local result = {}
			for k,v in pairs(fields) do
				result[#result+1] = k
				result[#result+1] = v
			end
			return false, redisproxy:hmset(rediskey, table_unpack(result))
		end
	end
end

function rpcUnion(funcName, ...)
	local serv = agentInfo.userv
	if not serv then
		local consortiaId = agentInfo.role:getProperty("consortiaId")
		if consortiaId == 0 then return true,1000 end
		local union = datacenter.get("union", consortiaId)
		if not union or not union.serv then
			skynet.error("rpcUnion error: union serv addr not exist", funcName, consortiaId)
			return
		end
		serv = union.serv
	end
	return skynet.call(serv, "lua", funcName, ...)
end

function rpcOtherUnion(id, funcName, ...)
	local union = datacenter.get("union", id)
	if union and union.serv then
		return skynet.call(union.serv, "lua", funcName, ...)
	end
end

skynet.register_protocol {
	name = "client",
	id = skynet.PTYPE_CLIENT,
	unpack = function (msg, sz)
		local data = skynet.tostring(msg, sz)
		local cmd = string.unpack("H", string.sub(data, 1, 2))
		return cmd, string.sub(data, 3)
	end,
	dispatch = function(session, address, cmd, data)
		skynet.ignoreret()
		-- skynet.trace() --执行序的跟踪统计
		cs(function()
			if cmd == actionCodes.Sys_heartBeat then
				agent_util:heart_beat(agentInfo)
				return
			end
			local actionName = actionHandlers[cmd]
			if not actionName then
				print("actionName not exist", actionName)
				return
			end

			local method
			if _hotfixActions[actionName] then
				method = _hotfixActions[actionName]
			else
				local modName, funcName = actionName:match("(%w+)%.(%w+)")

				local ok, action = pcall(require, "actions." .. modName .. "Action")
				if not ok then
					print("require module name error", action, modName)
					return
				end
				method = action[funcName]
			end
			
			if type(method) ~= "function" then
				print("ERROR_SERVER_INVALID_ACTION", modName, funcName)
				return
			end

			if #data > 0 then data = xxtea.decrypt(data, XXTEA_KEY) end
			local result = method(agentInfo, data)
			if not result or type(result) == "number" then
				SendPacket(actionCodes.Sys_innerErrorMsg, MsgPack.pack({id = cmd * 100 + (result or 0)}))
			end
		end)
	end
}

skynet.register_protocol {
	name = "role",
	id = 13,
	pack = skynet.pack,
	unpack = skynet.unpack,
	dispatch = function(session, address, submethod, ...)
		-- skynet.trace() --执行序的跟踪统计
		local result
		if not agentInfo.role then
			result = "__OFFLINE__"
		else
			result = agentInfo.role[submethod](agentInfo.role, ...)
		end

		skynet.ret(skynet.pack(result))
	end,
}

-- function CMD.start(gate, fd, ip)
function CMD.start(session, source, gate, fd, ip, hotfixs)
	ignoreHeartbeat = false

	agentInfo.client_fd = fd
	agentInfo.gate_serv = gate
	agentInfo.ip = ip

	agent_util:reset()
	math.randomInit()

	if hotfixs then
		for _, hotfix in ipairs(hotfixs) do
			CMD.hotfix(hotfix)
		end
	end

	-- 这里将消息伪装成 watchdog 发出,这样就由 A->B->C->B->A 变成 A->B->C->A
	skynet.redirect(gate, source, "lua", session, skynet.pack("forward", fd, 0, skynet.self()))
end

function CMD.close()
	cancel_agent_timer()
	mcast_util.usub_world()
	mcast_util.usub_union()
	
	local role = agentInfo.role
	if not role then return end
	role:log("logout", {online = skynet.timex()-role:getProperty("ltime")})
	role:onOfflineEvent()
end

function CMD.exit()
	if agentInfo.role then
		-- role:log("logout", {online = skynet.timex()-role:getProperty("ltime")})
		datacenter.set("agent", agentInfo.role:getProperty("id"), nil)
	end
	skynet.exit()
end

function CMD.subUnion(consortiaId, union)
	mcast_util.sub_union(consortiaId, union.chan)
	agentInfo.userv = union.serv
end

function CMD:usubUnion()
	mcast_util.usub_union()
	agentInfo.userv = nil
end

function CMD.hotfix(code)
	local ok, func = pcall(load, code)
	if ok then
		ok = pcall(func)
	end
	if not ok then
		skynet.error("hotfix error by code " .. code)
	end
end

local function routeGM(cmd, params)
	if type(params) ~= "table" or not agentInfo.role then
		return "指令失败"
	end
	local _M = require "actions.GmAction"
	return _M[cmd](agentInfo.role, params)
end

skynet.start(function()
	skynet.dispatch("lua", function(session, source, command, ...)
		-- skynet.trace() --执行序的跟踪统计
		local f = CMD[command]
		if f then
			if command == "exit" then
				skynet.ignoreret()
				f(...)
			elseif command == "start" then
				skynet.ignoreret()
				f(session, source, ...)
			else
				skynet.ret(skynet.pack(f(...)))
			end
		else
			skynet.ret(skynet.pack(routeGM(command, ...)))
		end
	end)

	redisd = harbor.queryname("REDIS")
	if tonumber(skynet.getenv "logd") == 1 then
		logd = harbor.queryname("LOGD")
	end

	cs = queue()

	csvdb = sharedata.query("csvdata")
	-- 错误码特殊处理
	-- todo
	-- for key, value in pairs(csvdb["sys_codesCsv"]) do
	-- 	_G[string.upper(value.varname)] = key
	-- end

	agent_util = require "services/agent_util"
end)