Blame view

src/services/agent_ctrl.lua 5.83 KB
314bc5df   zhengshouren   提交服务器初始代码
1
2
3
4
5
6
7
  local skynet = require "skynet"
  local socket = require "skynet.socket"
  local redisproxy = require "shared.redisproxy"
  local netpack = require "skynet.netpack"
  local xxtea = require "xxtea"
  local deque = require "deque"
  local datacenter = require "skynet.datacenter"
e24d1abd   zhouhaihai   修改 排队
8
  local agent_queued = require "services.agent_queued"
3f604f2e   zhouhaihai   扩容 redis 和 log服务
9
  local logproxy = require "shared.logproxy"
314bc5df   zhengshouren   提交服务器初始代码
10
11
12
13
14
15
  
  local pcall = pcall
  local string_format = string.format
  
  local poold
  
56484297   zhouhaihai   冒险消息
16
  -- agent过期时间 5分钟
314bc5df   zhengshouren   提交服务器初始代码
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
  local AGENT_EXPIRE_TIME				= 300
  
  local _M = {
  	-- fd -> uid
  	f2u = {},
  	-- uid -> 32 << fd | agent
  	u2f = {},
  	-- fd -> ip
  	f2i = {},
  	-- fd -> expire
  	f2e = {},
  	online = 0,
  }
  
  local function get_f(pack)
  	return (pack >> 32) & 0x7fffffff
  end
  
  local function get_a(pack)
  	return pack & 0xffffffff
  end
  
  local function set_pack(fd, agent)
  	return (fd << 32) | agent
  end
  
  function _M:init(obj, serice)
  	self.factory = deque.clone(obj)
  	poold = serice
  end
  
  -- @desc: agent退出
  function _M:exit_agent(fd)
  	self.f2e[fd] = nil
  	local uid = self.f2u[fd]
  	if not uid then return end
  
  	local pack = self.u2f[uid]
  	if not pack then
  		self.f2u[fd] = nil
  		return
  	end
  
  	local agent = get_a(pack)
  
  	pcall(skynet.send, agent, "lua", "exit")
5e6af9d6   zhouhaihai   排队功能
63
  
e24d1abd   zhouhaihai   修改 排队
64
65
66
  	self.f2u[fd] = nil
  	self.u2f[uid] = nil
  	local nuid, nfd = agent_queued.pop()
5e6af9d6   zhouhaihai   排队功能
67
68
69
70
71
  	if not nuid then
  		pcall(skynet.send, poold, "lua", "feed")
  	else
  		self:query_agent(nfd, nuid, true)
  	end
314bc5df   zhengshouren   提交服务器初始代码
72
73
74
75
76
77
78
79
80
81
82
  end
  
  -- @desc: 客户端连入
  function _M:socket_open(fd, addr)
  	self.f2i[fd] = addr
  end
  
  -- @desc: 网络关闭
  function _M:socket_close(fd)
  	self.f2i[fd] = nil
  	local uid = self.f2u[fd]
5e6af9d6   zhouhaihai   排队功能
83
84
  	if not uid then 
  		-- 排队中?
e24d1abd   zhouhaihai   修改 排队
85
  		agent_queued.socket_close(fd)
5e6af9d6   zhouhaihai   排队功能
86
87
  		return 
  	end
314bc5df   zhengshouren   提交服务器初始代码
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
  	self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME
  
  	if not self.u2f[uid] then
  		self.f2u[fd] = nil
  		return 
  	end
  	local agent = get_a(self.u2f[uid])
  	local ok, _ = pcall(skynet.call, agent, "lua", "close")
  	if not ok then self:exit_agent(fd) end
  end
  
  -- @desc: 网络出错
  function _M:socket_error(fd)
  	self.f2i[fd] = nil
  	local uid = self.f2u[fd]
5e6af9d6   zhouhaihai   排队功能
103
104
  	if not uid then 
  		-- 排队中?
e24d1abd   zhouhaihai   修改 排队
105
  		agent_queued.socket_close(fd)
5e6af9d6   zhouhaihai   排队功能
106
107
  		return 
  	end
314bc5df   zhengshouren   提交服务器初始代码
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
  
  	if not self.u2f[uid] then
  		self.f2u[fd] = nil
  		return 
  	end
  	local agent = get_a(self.u2f[uid])
  	-- 无论失败否,应该让逻辑走下去,保证agent状态以及索引正确
  	pcall(skynet.call, agent, "lua", "close")
  	self:exit_agent(fd)
  end
  
  local function table_nums(t)
      local count = 0
      for k, v in pairs(t) do
          count = count + 1
      end
      return count
  end
  
  local next_check_time = 0
  local next_log_time = 0
  local CHECK_AGENT_STATUS_INTERVAL 	= 100 -- 检查agent状态的定时间隔
  -- @desc: 检查agent状态,若过期,则让agent退出;并定时打日志统计在线人数
  function _M:check_agent_status()
e24d1abd   zhouhaihai   修改 排队
132
  	agent_queued.handle_timeout()
314bc5df   zhengshouren   提交服务器初始代码
133
134
135
136
137
138
139
140
141
142
  	local now = skynet.timex()
  	if now >= next_check_time then
  		next_check_time = now + CHECK_AGENT_STATUS_INTERVAL
  		for fd, expire in pairs(self.f2e) do
  			if expire < now then
  				self:exit_agent(fd)
  			end
  		end
  	end
  
3f604f2e   zhouhaihai   扩容 redis 和 log服务
143
  	if now >= next_log_time and now % 60 == 0 then
314bc5df   zhengshouren   提交服务器初始代码
144
145
146
  		next_log_time = now + 60
  		local count = table_nums(self.u2f)
  		datacenter.set("onlineCount", count)
3f604f2e   zhouhaihai   扩容 redis 和 log服务
147
  		logproxy:log({["@type"] = "online", count = count}, "log")
314bc5df   zhengshouren   提交服务器初始代码
148
149
150
  	end
  end
  
5e5d7680   zhouhaihai   热更新 优化
151
152
  local hotfixList = {}
  
3fe4471e   zhouhaihai   热更新 demo
153
  function _M:hotfix(code)
5e5d7680   zhouhaihai   热更新 优化
154
  	table.insert(hotfixList, code)
3fe4471e   zhouhaihai   热更新 demo
155
156
157
158
159
160
  	for uid, pack in pairs(self.u2f) do
  		local agent = get_a(pack)
  		pcall(skynet.send, agent, "lua", "hotfix", code)
  	end
  end
  
314bc5df   zhengshouren   提交服务器初始代码
161
162
163
164
165
166
167
168
169
  local function query_agent_response(fd, response)
  	local head = string.pack("H", actionCodes.Role_queryLoginRpc + rpcResponseBegin)
  
  	local bin = MsgPack.pack(response)
  	if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
  	socket.write(fd, netpack.pack(head .. bin))
  end
  
  -- @desc: 玩家登陆第一个包,queryLogin,watchdog为客户端分配一个agent,并告诉gate分配成功,之后的消息直接走agent
5e6af9d6   zhouhaihai   排队功能
170
  function _M:query_agent(fd, uid, isQueue)
314bc5df   zhengshouren   提交服务器初始代码
171
172
173
174
175
176
177
  	local pack = self.u2f[uid]
  	if pack then
  		local f = get_f(pack)
  		if fd == f then
  			skynet.error(string.format("%s same fd %d", uid, fd))
  			return 
  		end
314bc5df   zhengshouren   提交服务器初始代码
178
179
  		-- self.f2u[f] 肯定存在;self.f2e[f]不存在,则说明在线,则需要踢下线
  		if not self.f2e[f] then
2ee779d1   zhouhaihai   已经登陆
180
181
  			local head = string.pack("H", actionCodes.Sys_maintainNotice)
  			local bin = MsgPack.pack({body = "server_accountOccupied", iskey = true})
314bc5df   zhengshouren   提交服务器初始代码
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
  			if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
  			socket.write(f, netpack.pack(head .. bin))
  			skynet.timeout(10, function ()
  				skynet.call(gate_serv, "lua", "kick", f)
  			end)
  		end
  
  		local agent = get_a(pack)
  		local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd])
  		if not ok then
  			query_agent_response(fd, {ret = "INNER_ERROR"})
  			return
  		end
  
  		self.f2e[f] = nil
  		self.f2u[f] = nil
  		self.u2f[uid] = set_pack(fd, agent)
  	else
e24d1abd   zhouhaihai   修改 排队
200
  		local agent
5e6af9d6   zhouhaihai   排队功能
201
  		if isQueue then
e24d1abd   zhouhaihai   修改 排队
202
203
204
205
206
207
  			agent = self.factory:pop()
  			if agent then
  				pcall(skynet.send, poold, "lua", "feed")
  			else
  				agent = skynet.newservice("agent")
  			end
5e6af9d6   zhouhaihai   排队功能
208
  		else
e24d1abd   zhouhaihai   修改 排队
209
210
211
212
213
214
215
216
  			-- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent  
  			if agent_queued.count() > 0 then
  				-- 服务器满 开始排队
  				local rank = agent_queued.push(uid, fd)
  				query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank})
  				return 
  			end
  
5e6af9d6   zhouhaihai   排队功能
217
218
  			agent = self.factory:pop()
  			if not agent then
e24d1abd   zhouhaihai   修改 排队
219
  				local rank = agent_queued.push(uid, fd)
5e6af9d6   zhouhaihai   排队功能
220
221
222
  				query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank})
  				return 
  			end
314bc5df   zhengshouren   提交服务器初始代码
223
224
  		end
  
5e5d7680   zhouhaihai   热更新 优化
225
  		local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd], hotfixList)
314bc5df   zhengshouren   提交服务器初始代码
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
  		if not ok then
  			self.factory:push(agent)
  			query_agent_response(fd, {ret = "INNER_ERROR"})
  			return
  		end
  
  		self.u2f[uid] = set_pack(fd, agent)
  	end
  
  	self.f2u[fd] = uid
  
  	local response = {}
  
  	local user = redisproxy:get(string_format("uid:%s", uid))
  	if user then
  		response.ret = "RET_HAS_EXISTED"
  		response.name = user
  	else
  		response.ret = "RET_NOT_EXIST"
  	end
  	query_agent_response(fd, response)
  end
  
  return _M