agent_ctrl.lua 5.83 KB
local skynet = require "skynet"
local socket = require "skynet.socket"
local redisproxy = require "shared.redisproxy"
local netpack = require "skynet.netpack"
local xxtea = require "xxtea"
local deque = require "deque"
local datacenter = require "skynet.datacenter"
local agent_queued = require "services.agent_queued"
local logproxy = require "shared.logproxy"

local pcall = pcall
local string_format = string.format

local poold

-- agent过期时间 5分钟
local AGENT_EXPIRE_TIME				= 300

local _M = {
	-- fd -> uid
	f2u = {},
	-- uid -> 32 << fd | agent
	u2f = {},
	-- fd -> ip
	f2i = {},
	-- fd -> expire
	f2e = {},
	online = 0,
}

local function get_f(pack)
	return (pack >> 32) & 0x7fffffff
end

local function get_a(pack)
	return pack & 0xffffffff
end

local function set_pack(fd, agent)
	return (fd << 32) | agent
end

function _M:init(obj, serice)
	self.factory = deque.clone(obj)
	poold = serice
end

-- @desc: agent退出
function _M:exit_agent(fd)
	self.f2e[fd] = nil
	local uid = self.f2u[fd]
	if not uid then return end

	local pack = self.u2f[uid]
	if not pack then
		self.f2u[fd] = nil
		return
	end

	local agent = get_a(pack)

	pcall(skynet.send, agent, "lua", "exit")

	self.f2u[fd] = nil
	self.u2f[uid] = nil
	local nuid, nfd = agent_queued.pop()
	if not nuid then
		pcall(skynet.send, poold, "lua", "feed")
	else
		self:query_agent(nfd, nuid, true)
	end
end

-- @desc: 客户端连入
function _M:socket_open(fd, addr)
	self.f2i[fd] = addr
end

-- @desc: 网络关闭
function _M:socket_close(fd)
	self.f2i[fd] = nil
	local uid = self.f2u[fd]
	if not uid then 
		-- 排队中?
		agent_queued.socket_close(fd)
		return 
	end
	self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME

	if not self.u2f[uid] then
		self.f2u[fd] = nil
		return 
	end
	local agent = get_a(self.u2f[uid])
	local ok, _ = pcall(skynet.call, agent, "lua", "close")
	if not ok then self:exit_agent(fd) end
end

-- @desc: 网络出错
function _M:socket_error(fd)
	self.f2i[fd] = nil
	local uid = self.f2u[fd]
	if not uid then 
		-- 排队中?
		agent_queued.socket_close(fd)
		return 
	end

	if not self.u2f[uid] then
		self.f2u[fd] = nil
		return 
	end
	local agent = get_a(self.u2f[uid])
	-- 无论失败否,应该让逻辑走下去,保证agent状态以及索引正确
	pcall(skynet.call, agent, "lua", "close")
	self:exit_agent(fd)
end

local function table_nums(t)
    local count = 0
    for k, v in pairs(t) do
        count = count + 1
    end
    return count
end

local next_check_time = 0
local next_log_time = 0
local CHECK_AGENT_STATUS_INTERVAL 	= 100 -- 检查agent状态的定时间隔
-- @desc: 检查agent状态,若过期,则让agent退出;并定时打日志统计在线人数
function _M:check_agent_status()
	agent_queued.handle_timeout()
	local now = skynet.timex()
	if now >= next_check_time then
		next_check_time = now + CHECK_AGENT_STATUS_INTERVAL
		for fd, expire in pairs(self.f2e) do
			if expire < now then
				self:exit_agent(fd)
			end
		end
	end

	if now >= next_log_time and now % 60 == 0 then
		next_log_time = now + 60
		local count = table_nums(self.u2f)
		datacenter.set("onlineCount", count)
		logproxy:log({["@type"] = "online", count = count}, "log")
	end
end

local hotfixList = {}

function _M:hotfix(code)
	table.insert(hotfixList, code)
	for uid, pack in pairs(self.u2f) do
		local agent = get_a(pack)
		pcall(skynet.send, agent, "lua", "hotfix", code)
	end
end

local function query_agent_response(fd, response)
	local head = string.pack("H", actionCodes.Role_queryLoginRpc + rpcResponseBegin)

	local bin = MsgPack.pack(response)
	if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
	socket.write(fd, netpack.pack(head .. bin))
end

-- @desc: 玩家登陆第一个包,queryLogin,watchdog为客户端分配一个agent,并告诉gate分配成功,之后的消息直接走agent
function _M:query_agent(fd, uid, isQueue)
	local pack = self.u2f[uid]
	if pack then
		local f = get_f(pack)
		if fd == f then
			skynet.error(string.format("%s same fd %d", uid, fd))
			return 
		end
		-- self.f2u[f] 肯定存在;self.f2e[f]不存在,则说明在线,则需要踢下线
		if not self.f2e[f] then
			local head = string.pack("H", actionCodes.Sys_maintainNotice)
			local bin = MsgPack.pack({body = "server_accountOccupied", iskey = true})
			if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
			socket.write(f, netpack.pack(head .. bin))
			skynet.timeout(10, function ()
				skynet.call(gate_serv, "lua", "kick", f)
			end)
		end

		local agent = get_a(pack)
		local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd])
		if not ok then
			query_agent_response(fd, {ret = "INNER_ERROR"})
			return
		end

		self.f2e[f] = nil
		self.f2u[f] = nil
		self.u2f[uid] = set_pack(fd, agent)
	else
		local agent
		if isQueue then
			agent = self.factory:pop()
			if agent then
				pcall(skynet.send, poold, "lua", "feed")
			else
				agent = skynet.newservice("agent")
			end
		else
			-- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent  
			if agent_queued.count() > 0 then
				-- 服务器满 开始排队
				local rank = agent_queued.push(uid, fd)
				query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank})
				return 
			end

			agent = self.factory:pop()
			if not agent then
				local rank = agent_queued.push(uid, fd)
				query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank})
				return 
			end
		end

		local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd], hotfixList)
		if not ok then
			self.factory:push(agent)
			query_agent_response(fd, {ret = "INNER_ERROR"})
			return
		end

		self.u2f[uid] = set_pack(fd, agent)
	end

	self.f2u[fd] = uid

	local response = {}

	local user = redisproxy:get(string_format("uid:%s", uid))
	if user then
		response.ret = "RET_HAS_EXISTED"
		response.name = user
	else
		response.ret = "RET_NOT_EXIST"
	end
	query_agent_response(fd, response)
end

return _M