Commit 5e6af9d6f42d8aa0faac656c6e1baf07bc04022d

Authored by zhouhaihai
1 parent bb5d94bd

排队功能

config/develop.lua
... ... @@ -6,6 +6,7 @@ codeurl = "192.168.8.223:9090"
6 6 servId = 1
7 7  
8 8 max_client = 64
  9 +max_queue = 64
9 10  
10 11 server_port = 12001
11 12 debug_port = 10001
... ...
src/ProtocolCode.lua
... ... @@ -7,6 +7,7 @@ actionCodes = {
7 7 Sys_commonNotice = 5,
8 8 Sys_maintainNotice = 6,
9 9 Sys_customNotice = 7,
  10 + Sys_checkQueue = 8,
10 11  
11 12 Gm_clientRequest = 20,
12 13 Gm_receiveResponse = 21,
... ...
src/main.lua
1 1 local skynet = require "skynet"
2 2  
3 3 local max_client = tonumber(skynet.getenv("max_client"))
  4 +local max_queue = tonumber(skynet.getenv("max_queue"))
4 5  
5 6 skynet.start(function()
6 7 print("Server start")
... ... @@ -11,7 +12,7 @@ skynet.start(function()
11 12  
12 13 skynet.call(watchdog, "lua", "start", {
13 14 port = tonumber(skynet.getenv("server_port")),
14   - maxclient = max_client,
  15 + maxclient = max_client + max_queue + 10,
15 16 httpd = httpd,
16 17  
17 18 redishost = skynet.getenv("redis_host"),
... ...
src/services/agent_ctrl.lua
... ... @@ -58,7 +58,14 @@ function _M:exit_agent(fd)
58 58 local agent = get_a(pack)
59 59  
60 60 pcall(skynet.send, agent, "lua", "exit")
61   - pcall(skynet.send, poold, "lua", "feed")
  61 +
  62 + -- 这里检查是否有排队用户
  63 + local nuid, nfd = skynet.call(queued_serv, "lua", "pop")
  64 + if not nuid then
  65 + pcall(skynet.send, poold, "lua", "feed")
  66 + else
  67 + self:query_agent(nfd, nuid, true)
  68 + end
62 69  
63 70 self.f2u[fd] = nil
64 71 self.u2f[uid] = nil
... ... @@ -73,7 +80,11 @@ end
73 80 function _M:socket_close(fd)
74 81 self.f2i[fd] = nil
75 82 local uid = self.f2u[fd]
76   - if not uid then return end
  83 + if not uid then
  84 + -- 排队中?
  85 + pcall(skynet.call, queued_serv, "lua", "socket_close", fd)
  86 + return
  87 + end
77 88 self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME
78 89  
79 90 if not self.u2f[uid] then
... ... @@ -89,7 +100,11 @@ end
89 100 function _M:socket_error(fd)
90 101 self.f2i[fd] = nil
91 102 local uid = self.f2u[fd]
92   - if not uid then return end
  103 + if not uid then
  104 + -- 排队中?
  105 + pcall(skynet.call, queued_serv, "lua", "socket_close", fd)
  106 + return
  107 + end
93 108  
94 109 if not self.u2f[uid] then
95 110 self.f2u[fd] = nil
... ... @@ -151,7 +166,7 @@ local function query_agent_response(fd, response)
151 166 end
152 167  
153 168 -- @desc: 玩家登陆第一个包,queryLogin,watchdog为客户端分配一个agent,并告诉gate分配成功,之后的消息直接走agent
154   -function _M:query_agent(fd, uid)
  169 +function _M:query_agent(fd, uid, isQueue)
155 170 local pack = self.u2f[uid]
156 171 if pack then
157 172 local f = get_f(pack)
... ... @@ -159,7 +174,6 @@ function _M:query_agent(fd, uid)
159 174 skynet.error(string.format("%s same fd %d", uid, fd))
160 175 return
161 176 end
162   -
163 177 -- self.f2u[f] 肯定存在;self.f2e[f]不存在,则说明在线,则需要踢下线
164 178 if not self.f2e[f] then
165 179 local head = string.pack("H", actionCodes.Sys_maintainNotice)
... ... @@ -183,11 +197,18 @@ function _M:query_agent(fd, uid)
183 197 self.u2f[uid] = set_pack(fd, agent)
184 198 else
185 199 -- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent
186   - local agent = self.factory:pop()
187   - if not agent then
188   - -- 服务器满
189   - query_agent_response(fd, {ret = "RET_SERVER_FULL"})
190   - return
  200 + local agent
  201 + if isQueue then
  202 + agent = skynet.newservice("agent")
  203 + else
  204 + agent = self.factory:pop()
  205 + if not agent then
  206 + -- 服务器满
  207 + -- 开始排队
  208 + local ok, rank = pcall(skynet.call, queued_serv, "lua", "push", uid, fd)
  209 + query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank})
  210 + return
  211 + end
191 212 end
192 213  
193 214 local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd], hotfixList)
... ...
src/services/poold.lua
... ... @@ -7,14 +7,15 @@ local deque = require "deque"
7 7 local CMD = {}
8 8 local factory
9 9  
  10 +local PRE_FEED_COUNT = 1
10 11 local dead = 0
11 12 -- agent死亡,通知poold补充,当累计到5个agent时,马上生成5个agent放入poold中
12 13 -- 当然这里也可以写得更复杂,参考redis落地规则
13 14 function CMD.feed()
14 15 dead = dead + 1
15   - if dead == 5 then
  16 + if dead == PRE_FEED_COUNT then
16 17 dead = 0
17   - for i=1, 5 do
  18 + for i=1, PRE_FEED_COUNT do
18 19 factory:push(skynet.newservice("agent"))
19 20 end
20 21 end
... ...
src/services/queued.lua 0 → 100644
... ... @@ -0,0 +1,178 @@
  1 +-- 排队系统
  2 +
  3 +require "ProtocolCode"
  4 +require "shared.init"
  5 +require "utils.init"
  6 +require "GlobalVar"
  7 +require "RedisKeys"
  8 +require "skynet.manager"
  9 +
  10 +local queue = require "skynet.queue"
  11 +local netpack = require "skynet.netpack"
  12 +local socket = require "skynet.socket"
  13 +local xxtea = require "xxtea"
  14 +
  15 +skynet = require "skynet"
  16 +
  17 +local MAX_COUNT = tonumber(skynet.getenv("max_queue"))
  18 +-- 心跳定时间隔
  19 +local HEART_TIMER_INTERVAL = 30
  20 +local HEART_TIMEOUT_COUNT_MAX = 3
  21 +local gate_serv
  22 +
  23 +local CMD = {}
  24 +local f2u = {}
  25 +local u2i = {} -- {idx, fd, {lastHeart, timeOutCount, nextCheck}}
  26 +local idx2u = {}
  27 +local curIdx = 0 -- 下一个即将进入游戏的玩家索引
  28 +local nextIdx = 0 -- 新加的位置
  29 +
  30 +
  31 +
  32 +local function getRank(uid)
  33 + local info = u2i[uid]
  34 + if not info then return -1 end
  35 + return (info[1] + MAX_COUNT - curIdx) % MAX_COUNT + 1
  36 +end
  37 +
  38 +
  39 +function SendPacket(actionCode, bin, client_fd)
  40 + if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
  41 + local head = string.pack("H", actionCode)
  42 + return socket.write(client_fd, netpack.pack(head .. bin))
  43 +end
  44 +
  45 +local function checkQueue(fd)
  46 + if not f2u[fd] then return end
  47 + local info = u2i[f2u[fd]]
  48 + if info then
  49 + info[3][1] = skynet.timex()
  50 + end
  51 + local rank = getRank(f2u[fd])
  52 + SendPacket(actionCodes.Sys_checkQueue, MsgPack.pack({rank = rank}), fd)
  53 +end
  54 +
  55 +
  56 +skynet.register_protocol {
  57 + name = "client",
  58 + id = skynet.PTYPE_CLIENT,
  59 + unpack = function (msg, sz)
  60 + local data = skynet.tostring(msg, sz)
  61 + local cmd = string.unpack("H", string.sub(data, 1, 2))
  62 + return cmd, string.sub(data, 3)
  63 + end,
  64 + dispatch = function(session, address, cmd, data)
  65 + skynet.ignoreret()
  66 + cs(function()
  67 + if cmd == actionCodes.Sys_checkQueue then
  68 + checkQueue(session)
  69 + end
  70 + end)
  71 + end
  72 +}
  73 +
  74 +function CMD.open(serv)
  75 + gate_serv = serv
  76 +end
  77 +
  78 +function CMD.push(uid, fd)
  79 + uid = tostring(uid)
  80 + if u2i[uid] then -- 存在]
  81 + local oldfd = u2i[uid][2]
  82 + if oldfd and oldfd ~= fd then
  83 + -- 踢掉老的
  84 + SendPacket(actionCodes.Sys_maintainNotice, MsgPack.pack({body = "server_accountOccupied", iskey = true}), oldfd)
  85 + skynet.timeout(10, function ()
  86 + skynet.call(gate_serv, "lua", "kick", oldfd)
  87 + end)
  88 + f2u[oldfd] = nil
  89 + end
  90 + u2i[uid][2] = fd
  91 + f2u[fd] = uid
  92 + u2i[uid][3] = {skynet.timex(), 0, skynet.timex() + HEART_TIMER_INTERVAL}
  93 + else -- 新排队的用户
  94 + if nextIdx == curIdx and next(idx2u) then -- 满了
  95 + return
  96 + end
  97 + u2i[uid] = {nextIdx, fd, {skynet.timex(), 0, skynet.timex() + HEART_TIMER_INTERVAL}}
  98 + f2u[fd] = uid
  99 + idx2u[nextIdx] = uid
  100 + nextIdx = (nextIdx + 1) % MAX_COUNT
  101 + end
  102 + skynet.call(gate_serv, "lua", "forward", fd, 0, skynet.self())
  103 +
  104 + return getRank(uid)
  105 +end
  106 +
  107 +function CMD.pop()
  108 + while true do
  109 + local uid = idx2u[curIdx]
  110 + if not uid then return end -- 空的
  111 + local info = u2i[uid]
  112 + if not info then
  113 + idx2u[curIdx] = nil
  114 + else
  115 + if info[2] then
  116 + -- 找到合适的了
  117 + u2i[uid] = nil
  118 + idx2u[curIdx] = nil
  119 + f2u[info[2]] = nil
  120 +
  121 + curIdx = (curIdx + 1) % MAX_COUNT
  122 + return uid, info[2]
  123 + else
  124 + idx2u[curIdx] = nil
  125 + u2i[uid] = nil
  126 + end
  127 + end
  128 + curIdx = (curIdx + 1) % MAX_COUNT
  129 + end
  130 +end
  131 +
  132 +-- 下线了
  133 +function CMD.socket_close(fd)
  134 + local uid = f2u[fd]
  135 + if not uid then return end
  136 + f2u[fd] = nil
  137 + local info = u2i[uid]
  138 + info[2] = nil
  139 +end
  140 +
  141 +
  142 +local function handle_timeout()
  143 + local now = skynet.timex()
  144 + for uid, info in pairs(u2i) do
  145 + if info[2] and info[3] and now >= info[3][3] then --存在fd 检查心跳
  146 + if info[3][1] - now > HEART_TIMER_INTERVAL or now - info[3][1] > HEART_TIMER_INTERVAL then
  147 + info[3][2] = info[3][2] + 1
  148 + info[3][3] = now + HEART_TIMER_INTERVAL
  149 + if info[3][2] >= HEART_TIMEOUT_COUNT_MAX then
  150 + skynet.error("timeout! then queued will closed", info[2], uid)
  151 + skynet.call(gate_serv, "lua", "kick", info[2])
  152 + end
  153 + end
  154 + end
  155 + end
  156 + skynet.timeout(100, handle_timeout)
  157 +end
  158 +
  159 +
  160 +skynet.start(function()
  161 + skynet.dispatch("lua", function(session, source, command, ...)
  162 + -- skynet.trace() --执行序的跟踪统计
  163 + local f = CMD[command]
  164 + if f then
  165 + skynet.ret(skynet.pack(f(...)))
  166 + end
  167 + end)
  168 +
  169 + skynet.info_func(function()
  170 + local info = {}
  171 + info.count = (nextIdx + MAX_COUNT - curIdx) % MAX_COUNT
  172 + return info
  173 + end)
  174 +
  175 + cs = queue()
  176 +
  177 + skynet.timeout(100, handle_timeout)
  178 +end)
0 179 \ No newline at end of file
... ...
src/services/watchdog.lua
... ... @@ -66,6 +66,7 @@ end
66 66 function CMD.start(conf)
67 67 skynet.call(gate_serv, "lua", "open" , conf)
68 68 skynet.call(redisd, "lua", "open", conf)
  69 + skynet.call(queued_serv, "lua", "open", gate_serv)
69 70  
70 71 if use_logd == 1 then
71 72 skynet.call(logd, "lua", "open")
... ... @@ -137,4 +138,5 @@ skynet.start(function()
137 138 skynet.newservice("services/chated")
138 139 -- 网关服务
139 140 gate_serv = skynet.newservice("gate")
  141 + queued_serv = skynet.newservice("services/queued")
140 142 end)
... ...