From e24d1abd1d3f1a01389a4f114585ec5348c6e647 Mon Sep 17 00:00:00 2001 From: zhouhaihai Date: Wed, 28 Oct 2020 17:52:08 +0800 Subject: [PATCH] 修改 排队 --- src/services/agent_ctrl.lua | 36 +++++++++++++++++++++++------------- src/services/agent_queued.lua | 154 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/services/poold.lua | 2 +- src/services/queued.lua | 178 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/services/watchdog.lua | 2 -- 5 files changed, 178 insertions(+), 194 deletions(-) create mode 100644 src/services/agent_queued.lua delete mode 100644 src/services/queued.lua diff --git a/src/services/agent_ctrl.lua b/src/services/agent_ctrl.lua index 3aa274c..d41abd0 100644 --- a/src/services/agent_ctrl.lua +++ b/src/services/agent_ctrl.lua @@ -5,6 +5,7 @@ local netpack = require "skynet.netpack" local xxtea = require "xxtea" local deque = require "deque" local datacenter = require "skynet.datacenter" +local agent_queued = require "services.agent_queued" local pcall = pcall local string_format = string.format @@ -59,16 +60,14 @@ function _M:exit_agent(fd) pcall(skynet.send, agent, "lua", "exit") - -- 这里检查是否有排队用户 - local nuid, nfd = skynet.call(queued_serv, "lua", "pop") + self.f2u[fd] = nil + self.u2f[uid] = nil + local nuid, nfd = agent_queued.pop() if not nuid then pcall(skynet.send, poold, "lua", "feed") else self:query_agent(nfd, nuid, true) end - - self.f2u[fd] = nil - self.u2f[uid] = nil end -- @desc: 客户端连入 @@ -82,7 +81,7 @@ function _M:socket_close(fd) local uid = self.f2u[fd] if not uid then -- 排队中? - pcall(skynet.call, queued_serv, "lua", "socket_close", fd) + agent_queued.socket_close(fd) return end self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME @@ -102,7 +101,7 @@ function _M:socket_error(fd) local uid = self.f2u[fd] if not uid then -- 排队中? - pcall(skynet.call, queued_serv, "lua", "socket_close", fd) + agent_queued.socket_close(fd) return end @@ -129,6 +128,7 @@ local next_log_time = 0 local CHECK_AGENT_STATUS_INTERVAL = 100 -- 检查agent状态的定时间隔 -- @desc: 检查agent状态,若过期,则让agent退出;并定时打日志统计在线人数 function _M:check_agent_status() + agent_queued.handle_timeout() local now = skynet.timex() if now >= next_check_time then next_check_time = now + CHECK_AGENT_STATUS_INTERVAL @@ -196,16 +196,26 @@ function _M:query_agent(fd, uid, isQueue) self.f2u[f] = nil self.u2f[uid] = set_pack(fd, agent) else - -- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent - local agent + local agent if isQueue then - agent = skynet.newservice("agent") + agent = self.factory:pop() + if agent then + pcall(skynet.send, poold, "lua", "feed") + else + agent = skynet.newservice("agent") + end else + -- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent + if agent_queued.count() > 0 then + -- 服务器满 开始排队 + local rank = agent_queued.push(uid, fd) + query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank}) + return + end + agent = self.factory:pop() if not agent then - -- 服务器满 - -- 开始排队 - local ok, rank = pcall(skynet.call, queued_serv, "lua", "push", uid, fd) + local rank = agent_queued.push(uid, fd) query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank}) return end diff --git a/src/services/agent_queued.lua b/src/services/agent_queued.lua new file mode 100644 index 0000000..cb3c4e3 --- /dev/null +++ b/src/services/agent_queued.lua @@ -0,0 +1,154 @@ +-- 排队系统 + +require "ProtocolCode" +require "shared.init" +require "utils.init" +require "GlobalVar" +require "RedisKeys" +require "skynet.manager" + +local queue = require "skynet.queue" +local netpack = require "skynet.netpack" +local socket = require "skynet.socket" +local xxtea = require "xxtea" + +skynet = require "skynet" + +local MAX_COUNT = tonumber(skynet.getenv("max_queue")) +-- 心跳定时间隔 +local HEART_TIMER_INTERVAL = 30 +local HEART_TIMEOUT_COUNT_MAX = 3 + +local CMD = {} +local f2u = {} +local u2i = {} -- {idx, fd, {lastHeart, timeOutCount, nextCheck}} +local idx2u = {} +local curIdx = 0 -- 下一个即将进入游戏的玩家索引 +local nextIdx = 0 -- 新加的位置 + + +local function getRank(uid) + local info = u2i[uid] + if not info then return -1 end + return (info[1] + MAX_COUNT - curIdx) % MAX_COUNT + 1 +end + + +function SendPacket(actionCode, bin, client_fd) + if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end + local head = string.pack("H", actionCode) + return socket.write(client_fd, netpack.pack(head .. bin)) +end + +local function checkQueue(fd) + if not f2u[fd] then return end + local info = u2i[f2u[fd]] + if info then + info[3][1] = skynet.timex() + end + local rank = getRank(f2u[fd]) + SendPacket(actionCodes.Sys_checkQueue, MsgPack.pack({rank = rank}), fd) +end + + +skynet.register_protocol { + name = "client", + id = skynet.PTYPE_CLIENT, + unpack = function (msg, sz) + local data = skynet.tostring(msg, sz) + local cmd = string.unpack("H", string.sub(data, 1, 2)) + return cmd, string.sub(data, 3) + end, + dispatch = function(session, address, cmd, data) + skynet.ignoreret() + if cmd == actionCodes.Sys_checkQueue then + checkQueue(session) + end + end +} + +function CMD.push(uid, fd) + uid = tostring(uid) + if u2i[uid] then -- 存在] + local oldfd = u2i[uid][2] + if oldfd and oldfd ~= fd then + -- 踢掉老的 + SendPacket(actionCodes.Sys_maintainNotice, MsgPack.pack({body = "server_accountOccupied", iskey = true}), oldfd) + skynet.timeout(10, function () + skynet.call(gate_serv, "lua", "kick", oldfd) + end) + f2u[oldfd] = nil + end + u2i[uid][2] = fd + f2u[fd] = uid + u2i[uid][3] = {skynet.timex(), 0, skynet.timex() + HEART_TIMER_INTERVAL} + else -- 新排队的用户 + if nextIdx == curIdx and next(idx2u) then -- 满了 + return + end + u2i[uid] = {nextIdx, fd, {skynet.timex(), 0, skynet.timex() + HEART_TIMER_INTERVAL}} + f2u[fd] = uid + idx2u[nextIdx] = uid + nextIdx = (nextIdx + 1) % MAX_COUNT + end + skynet.call(gate_serv, "lua", "forward", fd, 0, skynet.self()) + + return getRank(uid) +end + +function CMD.pop() + while true do + local uid = idx2u[curIdx] + if not uid then return end -- 空的 + local info = u2i[uid] + if not info then + idx2u[curIdx] = nil + else + if info[2] then + -- 找到合适的了 + u2i[uid] = nil + idx2u[curIdx] = nil + f2u[info[2]] = nil + + curIdx = (curIdx + 1) % MAX_COUNT + return uid, info[2] + else + idx2u[curIdx] = nil + u2i[uid] = nil + end + end + curIdx = (curIdx + 1) % MAX_COUNT + end +end + + +-- 下线了 +function CMD.socket_close(fd) + local uid = f2u[fd] + if not uid then return end + f2u[fd] = nil + local info = u2i[uid] + info[2] = nil +end + +function CMD.handle_timeout() + local now = skynet.timex() + for uid, info in pairs(u2i) do + if info[2] and info[3] and now >= info[3][3] then --存在fd 检查心跳 + if info[3][1] - now > HEART_TIMER_INTERVAL or now - info[3][1] > HEART_TIMER_INTERVAL then + info[3][2] = info[3][2] + 1 + info[3][3] = now + HEART_TIMER_INTERVAL + if info[3][2] >= HEART_TIMEOUT_COUNT_MAX then + skynet.error("timeout! then queued will closed", info[2], uid) + skynet.call(gate_serv, "lua", "kick", info[2]) + end + end + end + end +end + +function CMD.count() + return (nextIdx + MAX_COUNT - curIdx) % MAX_COUNT +end + +return CMD diff --git a/src/services/poold.lua b/src/services/poold.lua index 0f1552b..7c899bb 100644 --- a/src/services/poold.lua +++ b/src/services/poold.lua @@ -7,7 +7,7 @@ local deque = require "deque" local CMD = {} local factory -local PRE_FEED_COUNT = 1 +local PRE_FEED_COUNT = 5 local dead = 0 -- agent死亡,通知poold补充,当累计到5个agent时,马上生成5个agent放入poold中 -- 当然这里也可以写得更复杂,参考redis落地规则 diff --git a/src/services/queued.lua b/src/services/queued.lua deleted file mode 100644 index 23d0bf0..0000000 --- a/src/services/queued.lua +++ /dev/null @@ -1,178 +0,0 @@ --- 排队系统 - -require "ProtocolCode" -require "shared.init" -require "utils.init" -require "GlobalVar" -require "RedisKeys" -require "skynet.manager" - -local queue = require "skynet.queue" -local netpack = require "skynet.netpack" -local socket = require "skynet.socket" -local xxtea = require "xxtea" - -skynet = require "skynet" - -local MAX_COUNT = tonumber(skynet.getenv("max_queue")) --- 心跳定时间隔 -local HEART_TIMER_INTERVAL = 30 -local HEART_TIMEOUT_COUNT_MAX = 3 -local gate_serv - -local CMD = {} -local f2u = {} -local u2i = {} -- {idx, fd, {lastHeart, timeOutCount, nextCheck}} -local idx2u = {} -local curIdx = 0 -- 下一个即将进入游戏的玩家索引 -local nextIdx = 0 -- 新加的位置 - - - -local function getRank(uid) - local info = u2i[uid] - if not info then return -1 end - return (info[1] + MAX_COUNT - curIdx) % MAX_COUNT + 1 -end - - -function SendPacket(actionCode, bin, client_fd) - if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end - local head = string.pack("H", actionCode) - return socket.write(client_fd, netpack.pack(head .. bin)) -end - -local function checkQueue(fd) - if not f2u[fd] then return end - local info = u2i[f2u[fd]] - if info then - info[3][1] = skynet.timex() - end - local rank = getRank(f2u[fd]) - SendPacket(actionCodes.Sys_checkQueue, MsgPack.pack({rank = rank}), fd) -end - - -skynet.register_protocol { - name = "client", - id = skynet.PTYPE_CLIENT, - unpack = function (msg, sz) - local data = skynet.tostring(msg, sz) - local cmd = string.unpack("H", string.sub(data, 1, 2)) - return cmd, string.sub(data, 3) - end, - dispatch = function(session, address, cmd, data) - skynet.ignoreret() - cs(function() - if cmd == actionCodes.Sys_checkQueue then - checkQueue(session) - end - end) - end -} - -function CMD.open(serv) - gate_serv = serv -end - -function CMD.push(uid, fd) - uid = tostring(uid) - if u2i[uid] then -- 存在] - local oldfd = u2i[uid][2] - if oldfd and oldfd ~= fd then - -- 踢掉老的 - SendPacket(actionCodes.Sys_maintainNotice, MsgPack.pack({body = "server_accountOccupied", iskey = true}), oldfd) - skynet.timeout(10, function () - skynet.call(gate_serv, "lua", "kick", oldfd) - end) - f2u[oldfd] = nil - end - u2i[uid][2] = fd - f2u[fd] = uid - u2i[uid][3] = {skynet.timex(), 0, skynet.timex() + HEART_TIMER_INTERVAL} - else -- 新排队的用户 - if nextIdx == curIdx and next(idx2u) then -- 满了 - return - end - u2i[uid] = {nextIdx, fd, {skynet.timex(), 0, skynet.timex() + HEART_TIMER_INTERVAL}} - f2u[fd] = uid - idx2u[nextIdx] = uid - nextIdx = (nextIdx + 1) % MAX_COUNT - end - skynet.call(gate_serv, "lua", "forward", fd, 0, skynet.self()) - - return getRank(uid) -end - -function CMD.pop() - while true do - local uid = idx2u[curIdx] - if not uid then return end -- 空的 - local info = u2i[uid] - if not info then - idx2u[curIdx] = nil - else - if info[2] then - -- 找到合适的了 - u2i[uid] = nil - idx2u[curIdx] = nil - f2u[info[2]] = nil - - curIdx = (curIdx + 1) % MAX_COUNT - return uid, info[2] - else - idx2u[curIdx] = nil - u2i[uid] = nil - end - end - curIdx = (curIdx + 1) % MAX_COUNT - end -end - --- 下线了 -function CMD.socket_close(fd) - local uid = f2u[fd] - if not uid then return end - f2u[fd] = nil - local info = u2i[uid] - info[2] = nil -end - - -local function handle_timeout() - local now = skynet.timex() - for uid, info in pairs(u2i) do - if info[2] and info[3] and now >= info[3][3] then --存在fd 检查心跳 - if info[3][1] - now > HEART_TIMER_INTERVAL or now - info[3][1] > HEART_TIMER_INTERVAL then - info[3][2] = info[3][2] + 1 - info[3][3] = now + HEART_TIMER_INTERVAL - if info[3][2] >= HEART_TIMEOUT_COUNT_MAX then - skynet.error("timeout! then queued will closed", info[2], uid) - skynet.call(gate_serv, "lua", "kick", info[2]) - end - end - end - end - skynet.timeout(100, handle_timeout) -end - - -skynet.start(function() - skynet.dispatch("lua", function(session, source, command, ...) - -- skynet.trace() --执行序的跟踪统计 - local f = CMD[command] - if f then - skynet.ret(skynet.pack(f(...))) - end - end) - - skynet.info_func(function() - local info = {} - info.count = (nextIdx + MAX_COUNT - curIdx) % MAX_COUNT - return info - end) - - cs = queue() - - skynet.timeout(100, handle_timeout) -end) \ No newline at end of file diff --git a/src/services/watchdog.lua b/src/services/watchdog.lua index fd53571..2f7a8c7 100644 --- a/src/services/watchdog.lua +++ b/src/services/watchdog.lua @@ -66,7 +66,6 @@ end function CMD.start(conf) skynet.call(gate_serv, "lua", "open" , conf) skynet.call(redisd, "lua", "open", conf) - skynet.call(queued_serv, "lua", "open", gate_serv) if use_logd == 1 then skynet.call(logd, "lua", "open") @@ -138,5 +137,4 @@ skynet.start(function() skynet.newservice("services/chated") -- 网关服务 gate_serv = skynet.newservice("gate") - queued_serv = skynet.newservice("services/queued") end) -- libgit2 0.21.2