diff --git a/config/develop.lua b/config/develop.lua index e9dc86e..b9e8547 100644 --- a/config/develop.lua +++ b/config/develop.lua @@ -6,6 +6,7 @@ codeurl = "192.168.8.223:9090" servId = 1 max_client = 64 +max_queue = 64 server_port = 12001 debug_port = 10001 diff --git a/src/ProtocolCode.lua b/src/ProtocolCode.lua index d864e92..8dfad0d 100644 --- a/src/ProtocolCode.lua +++ b/src/ProtocolCode.lua @@ -7,6 +7,7 @@ actionCodes = { Sys_commonNotice = 5, Sys_maintainNotice = 6, Sys_customNotice = 7, + Sys_checkQueue = 8, Gm_clientRequest = 20, Gm_receiveResponse = 21, diff --git a/src/main.lua b/src/main.lua index e766113..d967df0 100644 --- a/src/main.lua +++ b/src/main.lua @@ -1,6 +1,7 @@ local skynet = require "skynet" local max_client = tonumber(skynet.getenv("max_client")) +local max_queue = tonumber(skynet.getenv("max_queue")) skynet.start(function() print("Server start") @@ -11,7 +12,7 @@ skynet.start(function() skynet.call(watchdog, "lua", "start", { port = tonumber(skynet.getenv("server_port")), - maxclient = max_client, + maxclient = max_client + max_queue + 10, httpd = httpd, redishost = skynet.getenv("redis_host"), diff --git a/src/services/agent_ctrl.lua b/src/services/agent_ctrl.lua index 44a5ab2..3aa274c 100644 --- a/src/services/agent_ctrl.lua +++ b/src/services/agent_ctrl.lua @@ -58,7 +58,14 @@ function _M:exit_agent(fd) local agent = get_a(pack) pcall(skynet.send, agent, "lua", "exit") - pcall(skynet.send, poold, "lua", "feed") + + -- 这里检查是否有排队用户 + local nuid, nfd = skynet.call(queued_serv, "lua", "pop") + if not nuid then + pcall(skynet.send, poold, "lua", "feed") + else + self:query_agent(nfd, nuid, true) + end self.f2u[fd] = nil self.u2f[uid] = nil @@ -73,7 +80,11 @@ end function _M:socket_close(fd) self.f2i[fd] = nil local uid = self.f2u[fd] - if not uid then return end + if not uid then + -- 排队中? + pcall(skynet.call, queued_serv, "lua", "socket_close", fd) + return + end self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME if not self.u2f[uid] then @@ -89,7 +100,11 @@ end function _M:socket_error(fd) self.f2i[fd] = nil local uid = self.f2u[fd] - if not uid then return end + if not uid then + -- 排队中? + pcall(skynet.call, queued_serv, "lua", "socket_close", fd) + return + end if not self.u2f[uid] then self.f2u[fd] = nil @@ -151,7 +166,7 @@ local function query_agent_response(fd, response) end -- @desc: 玩家登陆第一个包,queryLogin,watchdog为客户端分配一个agent,并告诉gate分配成功,之后的消息直接走agent -function _M:query_agent(fd, uid) +function _M:query_agent(fd, uid, isQueue) local pack = self.u2f[uid] if pack then local f = get_f(pack) @@ -159,7 +174,6 @@ function _M:query_agent(fd, uid) skynet.error(string.format("%s same fd %d", uid, fd)) return end - -- self.f2u[f] 肯定存在;self.f2e[f]不存在,则说明在线,则需要踢下线 if not self.f2e[f] then local head = string.pack("H", actionCodes.Sys_maintainNotice) @@ -183,11 +197,18 @@ function _M:query_agent(fd, uid) self.u2f[uid] = set_pack(fd, agent) else -- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent - local agent = self.factory:pop() - if not agent then - -- 服务器满 - query_agent_response(fd, {ret = "RET_SERVER_FULL"}) - return + local agent + if isQueue then + agent = skynet.newservice("agent") + else + agent = self.factory:pop() + if not agent then + -- 服务器满 + -- 开始排队 + local ok, rank = pcall(skynet.call, queued_serv, "lua", "push", uid, fd) + query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank}) + return + end end local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd], hotfixList) diff --git a/src/services/poold.lua b/src/services/poold.lua index 9629846..0f1552b 100644 --- a/src/services/poold.lua +++ b/src/services/poold.lua @@ -7,14 +7,15 @@ local deque = require "deque" local CMD = {} local factory +local PRE_FEED_COUNT = 1 local dead = 0 -- agent死亡,通知poold补充,当累计到5个agent时,马上生成5个agent放入poold中 -- 当然这里也可以写得更复杂,参考redis落地规则 function CMD.feed() dead = dead + 1 - if dead == 5 then + if dead == PRE_FEED_COUNT then dead = 0 - for i=1, 5 do + for i=1, PRE_FEED_COUNT do factory:push(skynet.newservice("agent")) end end diff --git a/src/services/queued.lua b/src/services/queued.lua new file mode 100644 index 0000000..23d0bf0 --- /dev/null +++ b/src/services/queued.lua @@ -0,0 +1,178 @@ +-- 排队系统 + +require "ProtocolCode" +require "shared.init" +require "utils.init" +require "GlobalVar" +require "RedisKeys" +require "skynet.manager" + +local queue = require "skynet.queue" +local netpack = require "skynet.netpack" +local socket = require "skynet.socket" +local xxtea = require "xxtea" + +skynet = require "skynet" + +local MAX_COUNT = tonumber(skynet.getenv("max_queue")) +-- 心跳定时间隔 +local HEART_TIMER_INTERVAL = 30 +local HEART_TIMEOUT_COUNT_MAX = 3 +local gate_serv + +local CMD = {} +local f2u = {} +local u2i = {} -- {idx, fd, {lastHeart, timeOutCount, nextCheck}} +local idx2u = {} +local curIdx = 0 -- 下一个即将进入游戏的玩家索引 +local nextIdx = 0 -- 新加的位置 + + + +local function getRank(uid) + local info = u2i[uid] + if not info then return -1 end + return (info[1] + MAX_COUNT - curIdx) % MAX_COUNT + 1 +end + + +function SendPacket(actionCode, bin, client_fd) + if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end + local head = string.pack("H", actionCode) + return socket.write(client_fd, netpack.pack(head .. bin)) +end + +local function checkQueue(fd) + if not f2u[fd] then return end + local info = u2i[f2u[fd]] + if info then + info[3][1] = skynet.timex() + end + local rank = getRank(f2u[fd]) + SendPacket(actionCodes.Sys_checkQueue, MsgPack.pack({rank = rank}), fd) +end + + +skynet.register_protocol { + name = "client", + id = skynet.PTYPE_CLIENT, + unpack = function (msg, sz) + local data = skynet.tostring(msg, sz) + local cmd = string.unpack("H", string.sub(data, 1, 2)) + return cmd, string.sub(data, 3) + end, + dispatch = function(session, address, cmd, data) + skynet.ignoreret() + cs(function() + if cmd == actionCodes.Sys_checkQueue then + checkQueue(session) + end + end) + end +} + +function CMD.open(serv) + gate_serv = serv +end + +function CMD.push(uid, fd) + uid = tostring(uid) + if u2i[uid] then -- 存在] + local oldfd = u2i[uid][2] + if oldfd and oldfd ~= fd then + -- 踢掉老的 + SendPacket(actionCodes.Sys_maintainNotice, MsgPack.pack({body = "server_accountOccupied", iskey = true}), oldfd) + skynet.timeout(10, function () + skynet.call(gate_serv, "lua", "kick", oldfd) + end) + f2u[oldfd] = nil + end + u2i[uid][2] = fd + f2u[fd] = uid + u2i[uid][3] = {skynet.timex(), 0, skynet.timex() + HEART_TIMER_INTERVAL} + else -- 新排队的用户 + if nextIdx == curIdx and next(idx2u) then -- 满了 + return + end + u2i[uid] = {nextIdx, fd, {skynet.timex(), 0, skynet.timex() + HEART_TIMER_INTERVAL}} + f2u[fd] = uid + idx2u[nextIdx] = uid + nextIdx = (nextIdx + 1) % MAX_COUNT + end + skynet.call(gate_serv, "lua", "forward", fd, 0, skynet.self()) + + return getRank(uid) +end + +function CMD.pop() + while true do + local uid = idx2u[curIdx] + if not uid then return end -- 空的 + local info = u2i[uid] + if not info then + idx2u[curIdx] = nil + else + if info[2] then + -- 找到合适的了 + u2i[uid] = nil + idx2u[curIdx] = nil + f2u[info[2]] = nil + + curIdx = (curIdx + 1) % MAX_COUNT + return uid, info[2] + else + idx2u[curIdx] = nil + u2i[uid] = nil + end + end + curIdx = (curIdx + 1) % MAX_COUNT + end +end + +-- 下线了 +function CMD.socket_close(fd) + local uid = f2u[fd] + if not uid then return end + f2u[fd] = nil + local info = u2i[uid] + info[2] = nil +end + + +local function handle_timeout() + local now = skynet.timex() + for uid, info in pairs(u2i) do + if info[2] and info[3] and now >= info[3][3] then --存在fd 检查心跳 + if info[3][1] - now > HEART_TIMER_INTERVAL or now - info[3][1] > HEART_TIMER_INTERVAL then + info[3][2] = info[3][2] + 1 + info[3][3] = now + HEART_TIMER_INTERVAL + if info[3][2] >= HEART_TIMEOUT_COUNT_MAX then + skynet.error("timeout! then queued will closed", info[2], uid) + skynet.call(gate_serv, "lua", "kick", info[2]) + end + end + end + end + skynet.timeout(100, handle_timeout) +end + + +skynet.start(function() + skynet.dispatch("lua", function(session, source, command, ...) + -- skynet.trace() --执行序的跟踪统计 + local f = CMD[command] + if f then + skynet.ret(skynet.pack(f(...))) + end + end) + + skynet.info_func(function() + local info = {} + info.count = (nextIdx + MAX_COUNT - curIdx) % MAX_COUNT + return info + end) + + cs = queue() + + skynet.timeout(100, handle_timeout) +end) \ No newline at end of file diff --git a/src/services/watchdog.lua b/src/services/watchdog.lua index 2f7a8c7..fd53571 100644 --- a/src/services/watchdog.lua +++ b/src/services/watchdog.lua @@ -66,6 +66,7 @@ end function CMD.start(conf) skynet.call(gate_serv, "lua", "open" , conf) skynet.call(redisd, "lua", "open", conf) + skynet.call(queued_serv, "lua", "open", gate_serv) if use_logd == 1 then skynet.call(logd, "lua", "open") @@ -137,4 +138,5 @@ skynet.start(function() skynet.newservice("services/chated") -- 网关服务 gate_serv = skynet.newservice("gate") + queued_serv = skynet.newservice("services/queued") end) -- libgit2 0.21.2