Blame view

src/services/agent_ctrl.lua 5.96 KB
314bc5df   zhengshouren   提交服务器初始代码
1
2
3
4
5
6
7
  local skynet = require "skynet"
  local socket = require "skynet.socket"
  local redisproxy = require "shared.redisproxy"
  local netpack = require "skynet.netpack"
  local xxtea = require "xxtea"
  local deque = require "deque"
  local datacenter = require "skynet.datacenter"
e24d1abd   zhouhaihai   修改 排队
8
  local agent_queued = require "services.agent_queued"
3f604f2e   zhouhaihai   扩容 redis 和 log服务
9
  local logproxy = require "shared.logproxy"
314bc5df   zhengshouren   提交服务器初始代码
10
11
12
  
  local pcall = pcall
  local string_format = string.format
01be2d78   liuzujun   修改mysql链接断开重连的bug
13
  require "utils.MysqlUtil"
314bc5df   zhengshouren   提交服务器初始代码
14
15
16
  
  local poold
  
56484297   zhouhaihai   冒险消息
17
  -- agent过期时间 5分钟
314bc5df   zhengshouren   提交服务器初始代码
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
  local AGENT_EXPIRE_TIME				= 300
  
  local _M = {
  	-- fd -> uid
  	f2u = {},
  	-- uid -> 32 << fd | agent
  	u2f = {},
  	-- fd -> ip
  	f2i = {},
  	-- fd -> expire
  	f2e = {},
  	online = 0,
  }
  
  local function get_f(pack)
  	return (pack >> 32) & 0x7fffffff
  end
  
  local function get_a(pack)
  	return pack & 0xffffffff
  end
  
  local function set_pack(fd, agent)
  	return (fd << 32) | agent
  end
  
  function _M:init(obj, serice)
  	self.factory = deque.clone(obj)
  	poold = serice
  end
  
  -- @desc: agent退出
  function _M:exit_agent(fd)
  	self.f2e[fd] = nil
  	local uid = self.f2u[fd]
  	if not uid then return end
  
  	local pack = self.u2f[uid]
  	if not pack then
  		self.f2u[fd] = nil
  		return
  	end
  
  	local agent = get_a(pack)
  
  	pcall(skynet.send, agent, "lua", "exit")
5e6af9d6   zhouhaihai   排队功能
64
  
e24d1abd   zhouhaihai   修改 排队
65
66
67
  	self.f2u[fd] = nil
  	self.u2f[uid] = nil
  	local nuid, nfd = agent_queued.pop()
5e6af9d6   zhouhaihai   排队功能
68
69
70
71
72
  	if not nuid then
  		pcall(skynet.send, poold, "lua", "feed")
  	else
  		self:query_agent(nfd, nuid, true)
  	end
314bc5df   zhengshouren   提交服务器初始代码
73
74
75
76
77
78
79
80
81
82
83
  end
  
  -- @desc: 客户端连入
  function _M:socket_open(fd, addr)
  	self.f2i[fd] = addr
  end
  
  -- @desc: 网络关闭
  function _M:socket_close(fd)
  	self.f2i[fd] = nil
  	local uid = self.f2u[fd]
5e6af9d6   zhouhaihai   排队功能
84
85
  	if not uid then 
  		-- 排队中?
e24d1abd   zhouhaihai   修改 排队
86
  		agent_queued.socket_close(fd)
5e6af9d6   zhouhaihai   排队功能
87
88
  		return 
  	end
314bc5df   zhengshouren   提交服务器初始代码
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
  	self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME
  
  	if not self.u2f[uid] then
  		self.f2u[fd] = nil
  		return 
  	end
  	local agent = get_a(self.u2f[uid])
  	local ok, _ = pcall(skynet.call, agent, "lua", "close")
  	if not ok then self:exit_agent(fd) end
  end
  
  -- @desc: 网络出错
  function _M:socket_error(fd)
  	self.f2i[fd] = nil
  	local uid = self.f2u[fd]
5e6af9d6   zhouhaihai   排队功能
104
105
  	if not uid then 
  		-- 排队中?
e24d1abd   zhouhaihai   修改 排队
106
  		agent_queued.socket_close(fd)
5e6af9d6   zhouhaihai   排队功能
107
108
  		return 
  	end
314bc5df   zhengshouren   提交服务器初始代码
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
  
  	if not self.u2f[uid] then
  		self.f2u[fd] = nil
  		return 
  	end
  	local agent = get_a(self.u2f[uid])
  	-- 无论失败否,应该让逻辑走下去,保证agent状态以及索引正确
  	pcall(skynet.call, agent, "lua", "close")
  	self:exit_agent(fd)
  end
  
  local function table_nums(t)
      local count = 0
      for k, v in pairs(t) do
          count = count + 1
      end
      return count
  end
  
  local next_check_time = 0
  local next_log_time = 0
  local CHECK_AGENT_STATUS_INTERVAL 	= 100 -- 检查agent状态的定时间隔
  -- @desc: 检查agent状态,若过期,则让agent退出;并定时打日志统计在线人数
  function _M:check_agent_status()
e24d1abd   zhouhaihai   修改 排队
133
  	agent_queued.handle_timeout()
314bc5df   zhengshouren   提交服务器初始代码
134
135
136
137
138
139
140
141
142
143
  	local now = skynet.timex()
  	if now >= next_check_time then
  		next_check_time = now + CHECK_AGENT_STATUS_INTERVAL
  		for fd, expire in pairs(self.f2e) do
  			if expire < now then
  				self:exit_agent(fd)
  			end
  		end
  	end
  
06c7246e   liuzujun   按500人分线
144
      datacenter.set("onlineCount", count)
3f604f2e   zhouhaihai   扩容 redis 和 log服务
145
  	if now >= next_log_time and now % 60 == 0 then
314bc5df   zhengshouren   提交服务器初始代码
146
147
  		next_log_time = now + 60
  		local count = table_nums(self.u2f)
06c7246e   liuzujun   按500人分线
148
  		--datacenter.set("onlineCount", count)
3f604f2e   zhouhaihai   扩容 redis 和 log服务
149
  		logproxy:log({["@type"] = "online", count = count}, "log")
314bc5df   zhengshouren   提交服务器初始代码
150
151
152
  	end
  end
  
5e5d7680   zhouhaihai   热更新 优化
153
154
  local hotfixList = {}
  
3fe4471e   zhouhaihai   热更新 demo
155
  function _M:hotfix(code)
5e5d7680   zhouhaihai   热更新 优化
156
  	table.insert(hotfixList, code)
3fe4471e   zhouhaihai   热更新 demo
157
158
159
160
161
162
  	for uid, pack in pairs(self.u2f) do
  		local agent = get_a(pack)
  		pcall(skynet.send, agent, "lua", "hotfix", code)
  	end
  end
  
314bc5df   zhengshouren   提交服务器初始代码
163
164
165
166
167
168
169
170
171
  local function query_agent_response(fd, response)
  	local head = string.pack("H", actionCodes.Role_queryLoginRpc + rpcResponseBegin)
  
  	local bin = MsgPack.pack(response)
  	if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
  	socket.write(fd, netpack.pack(head .. bin))
  end
  
  -- @desc: 玩家登陆第一个包,queryLogin,watchdog为客户端分配一个agent,并告诉gate分配成功,之后的消息直接走agent
5e6af9d6   zhouhaihai   排队功能
172
  function _M:query_agent(fd, uid, isQueue)
314bc5df   zhengshouren   提交服务器初始代码
173
174
175
176
177
178
179
  	local pack = self.u2f[uid]
  	if pack then
  		local f = get_f(pack)
  		if fd == f then
  			skynet.error(string.format("%s same fd %d", uid, fd))
  			return 
  		end
314bc5df   zhengshouren   提交服务器初始代码
180
181
  		-- self.f2u[f] 肯定存在;self.f2e[f]不存在,则说明在线,则需要踢下线
  		if not self.f2e[f] then
2ee779d1   zhouhaihai   已经登陆
182
183
  			local head = string.pack("H", actionCodes.Sys_maintainNotice)
  			local bin = MsgPack.pack({body = "server_accountOccupied", iskey = true})
314bc5df   zhengshouren   提交服务器初始代码
184
185
186
187
188
189
190
191
  			if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
  			socket.write(f, netpack.pack(head .. bin))
  			skynet.timeout(10, function ()
  				skynet.call(gate_serv, "lua", "kick", f)
  			end)
  		end
  
  		local agent = get_a(pack)
555f745e   zhangqijia   feat: 一番赏
192
  		local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, capsuled, fd, self.f2i[fd])
314bc5df   zhengshouren   提交服务器初始代码
193
194
195
196
197
198
199
200
201
  		if not ok then
  			query_agent_response(fd, {ret = "INNER_ERROR"})
  			return
  		end
  
  		self.f2e[f] = nil
  		self.f2u[f] = nil
  		self.u2f[uid] = set_pack(fd, agent)
  	else
e24d1abd   zhouhaihai   修改 排队
202
  		local agent
5e6af9d6   zhouhaihai   排队功能
203
  		if isQueue then
e24d1abd   zhouhaihai   修改 排队
204
205
206
207
208
209
  			agent = self.factory:pop()
  			if agent then
  				pcall(skynet.send, poold, "lua", "feed")
  			else
  				agent = skynet.newservice("agent")
  			end
5e6af9d6   zhouhaihai   排队功能
210
  		else
e24d1abd   zhouhaihai   修改 排队
211
212
213
214
215
216
217
218
  			-- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent  
  			if agent_queued.count() > 0 then
  				-- 服务器满 开始排队
  				local rank = agent_queued.push(uid, fd)
  				query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank})
  				return 
  			end
  
5e6af9d6   zhouhaihai   排队功能
219
220
  			agent = self.factory:pop()
  			if not agent then
e24d1abd   zhouhaihai   修改 排队
221
  				local rank = agent_queued.push(uid, fd)
5e6af9d6   zhouhaihai   排队功能
222
223
224
  				query_agent_response(fd, {ret = "RET_SERVER_FULL", rank = rank})
  				return 
  			end
314bc5df   zhengshouren   提交服务器初始代码
225
226
  		end
  
555f745e   zhangqijia   feat: 一番赏
227
  		local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, capsuled, fd, self.f2i[fd], hotfixList)
314bc5df   zhengshouren   提交服务器初始代码
228
229
230
231
232
233
234
235
236
237
238
239
240
  		if not ok then
  			self.factory:push(agent)
  			query_agent_response(fd, {ret = "INNER_ERROR"})
  			return
  		end
  
  		self.u2f[uid] = set_pack(fd, agent)
  	end
  
  	self.f2u[fd] = uid
  
  	local response = {}
  
01be2d78   liuzujun   修改mysql链接断开重连的bug
241
242
243
  	--local user = redisproxy:get(string_format("uid:%s", uid))
  	local res, user = roleUidExists(uid)
  	if res then
314bc5df   zhengshouren   提交服务器初始代码
244
245
246
247
248
249
250
251
252
  		response.ret = "RET_HAS_EXISTED"
  		response.name = user
  	else
  		response.ret = "RET_NOT_EXIST"
  	end
  	query_agent_response(fd, response)
  end
  
  return _M