Blame view

src/services/agent_ctrl.lua 4.87 KB
314bc5df   zhengshouren   提交服务器初始代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
  local skynet = require "skynet"
  local socket = require "skynet.socket"
  local redisproxy = require "shared.redisproxy"
  local netpack = require "skynet.netpack"
  local xxtea = require "xxtea"
  local deque = require "deque"
  local datacenter = require "skynet.datacenter"
  
  local pcall = pcall
  local string_format = string.format
  
  local poold
  
  -- agent过期时间 10分钟
  local AGENT_EXPIRE_TIME				= 300
  
  local _M = {
  	-- fd -> uid
  	f2u = {},
  	-- uid -> 32 << fd | agent
  	u2f = {},
  	-- fd -> ip
  	f2i = {},
  	-- fd -> expire
  	f2e = {},
  	online = 0,
  }
  
  local function get_f(pack)
  	return (pack >> 32) & 0x7fffffff
  end
  
  local function get_a(pack)
  	return pack & 0xffffffff
  end
  
  local function set_pack(fd, agent)
  	return (fd << 32) | agent
  end
  
  function _M:init(obj, serice)
  	self.factory = deque.clone(obj)
  	poold = serice
  end
  
  -- @desc: agent退出
  function _M:exit_agent(fd)
  	self.f2e[fd] = nil
  	local uid = self.f2u[fd]
  	if not uid then return end
  
  	local pack = self.u2f[uid]
  	if not pack then
  		self.f2u[fd] = nil
  		return
  	end
  
  	local agent = get_a(pack)
  
  	pcall(skynet.send, agent, "lua", "exit")
  	pcall(skynet.send, poold, "lua", "feed")
  
  	self.f2u[fd] = nil
  	self.u2f[uid] = nil
  end
  
  -- @desc: 客户端连入
  function _M:socket_open(fd, addr)
  	self.f2i[fd] = addr
  end
  
  -- @desc: 网络关闭
  function _M:socket_close(fd)
  	self.f2i[fd] = nil
  	local uid = self.f2u[fd]
  	if not uid then return end
  	self.f2e[fd] = skynet.timex() + AGENT_EXPIRE_TIME
  
  	if not self.u2f[uid] then
  		self.f2u[fd] = nil
  		return 
  	end
  	local agent = get_a(self.u2f[uid])
  	local ok, _ = pcall(skynet.call, agent, "lua", "close")
  	if not ok then self:exit_agent(fd) end
  end
  
  -- @desc: 网络出错
  function _M:socket_error(fd)
  	self.f2i[fd] = nil
  	local uid = self.f2u[fd]
  	if not uid then return end
  
  	if not self.u2f[uid] then
  		self.f2u[fd] = nil
  		return 
  	end
  	local agent = get_a(self.u2f[uid])
  	-- 无论失败否,应该让逻辑走下去,保证agent状态以及索引正确
  	pcall(skynet.call, agent, "lua", "close")
  	self:exit_agent(fd)
  end
  
  local function table_nums(t)
      local count = 0
      for k, v in pairs(t) do
          count = count + 1
      end
      return count
  end
  
  local next_check_time = 0
  local next_log_time = 0
  local CHECK_AGENT_STATUS_INTERVAL 	= 100 -- 检查agent状态的定时间隔
  -- @desc: 检查agent状态,若过期,则让agent退出;并定时打日志统计在线人数
  function _M:check_agent_status()
  	local now = skynet.timex()
  	if now >= next_check_time then
  		next_check_time = now + CHECK_AGENT_STATUS_INTERVAL
  		for fd, expire in pairs(self.f2e) do
  			if expire < now then
  				self:exit_agent(fd)
  			end
  		end
  	end
  
  	if now >= next_log_time and now % 60 == 0 and logd then
  		next_log_time = now + 60
  		local count = table_nums(self.u2f)
  		datacenter.set("onlineCount", count)
  		pcall(skynet.send, logd, "lua", "log", "online", {count = count})
  	end
  end
  
  local function query_agent_response(fd, response)
  	local head = string.pack("H", actionCodes.Role_queryLoginRpc + rpcResponseBegin)
  
  	local bin = MsgPack.pack(response)
  	if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
  	socket.write(fd, netpack.pack(head .. bin))
  end
  
  -- @desc: 玩家登陆第一个包,queryLogin,watchdog为客户端分配一个agent,并告诉gate分配成功,之后的消息直接走agent
  function _M:query_agent(fd, uid)
  	local pack = self.u2f[uid]
  	if pack then
  		local f = get_f(pack)
  		if fd == f then
  			skynet.error(string.format("%s same fd %d", uid, fd))
  			return 
  		end
  
  		-- self.f2u[f] 肯定存在;self.f2e[f]不存在,则说明在线,则需要踢下线
  		if not self.f2e[f] then
  			local head = string.pack("H", actionCodes.Sys_kickdown)
be83d162   zhouahaihai   登陆成功。 增加数据结构修正功能
156
  			local bin = MsgPack.pack({body = "该账号已登上其他机器"})
314bc5df   zhengshouren   提交服务器初始代码
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
  			if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
  			socket.write(f, netpack.pack(head .. bin))
  			skynet.timeout(10, function ()
  				skynet.call(gate_serv, "lua", "kick", f)
  			end)
  		end
  
  		local agent = get_a(pack)
  		local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd])
  		if not ok then
  			query_agent_response(fd, {ret = "INNER_ERROR"})
  			return
  		end
  
  		self.f2e[f] = nil
  		self.f2u[f] = nil
  		self.u2f[uid] = set_pack(fd, agent)
  	else
  		-- 该uid未存储,则说明至少超过10分钟未登陆,由agent池服务pop出一个agent
  		local agent = self.factory:pop()
  		if not agent then
  			-- 服务器满
  			query_agent_response(fd, {ret = "RET_SERVER_FULL"})
  			return 
  		end
  
  		local ok = pcall(skynet.call, agent, "lua", "start", gate_serv, fd, self.f2i[fd])
  		if not ok then
  			self.factory:push(agent)
  			query_agent_response(fd, {ret = "INNER_ERROR"})
  			return
  		end
  
  		self.u2f[uid] = set_pack(fd, agent)
  	end
  
  	self.f2u[fd] = uid
  
  	local response = {}
  
  	local user = redisproxy:get(string_format("uid:%s", uid))
  	if user then
  		response.ret = "RET_HAS_EXISTED"
  		response.name = user
  	else
  		response.ret = "RET_NOT_EXIST"
  	end
  	query_agent_response(fd, response)
  end
  
  return _M