Blame view

src/agent.lua 9.45 KB
314bc5df   zhengshouren   提交服务器初始代码
1
2
3
4
5
6
7
  require "ProtocolCode"
  require "shared.init"
  require "utils.init"
  require "GlobalVar"
  require "RedisKeys"
  require "skynet.manager"
  
314bc5df   zhengshouren   提交服务器初始代码
8
9
10
  local queue = require "skynet.queue"
  local netpack = require "skynet.netpack"
  local socket = require "skynet.socket"
314bc5df   zhengshouren   提交服务器初始代码
11
12
13
14
  local xxtea = require "xxtea"
  
  skynet = require "skynet"
  redisproxy = require "shared.redisproxy"
0de80321   liuzujun   创建游戏数据库,role对应mys...
15
  mysqlproxy = require "shared.mysqlproxy"
314bc5df   zhengshouren   提交服务器初始代码
16
17
  datacenter = require "skynet.datacenter"
  mcast_util = require "services/mcast_util"
a5486ede   zhouhaihai   csvdata 修改为 share...
18
  csvdb = require "shared.csvdata"
320f5b54   zhouhaihai   cb 返利
19
  cluster = require "skynet.cluster"
314bc5df   zhengshouren   提交服务器初始代码
20
21
22
23
24
25
  
  local CMD = {}
  local agentInfo = {}  -- { client_fd, role, gate_serv, open_timer}
  
  local agent_util, cs
  
5e5d7680   zhouhaihai   热更新 优化
26
  _hotfixActions = _hotfixActions or {}
2d392ede   zhouhaihai   热更新 最终版
27
  _hotfixClass = _hotfixClass or {}
5e5d7680   zhouhaihai   热更新 优化
28
  
bc36785e   zhouhaihai   增加 session
29
30
  _codeSession = {}
  
314bc5df   zhengshouren   提交服务器初始代码
31
32
33
  --- {{{ 定时器相关
  local function handle_timeout()
  	if not agentInfo.open_timer then return end
314bc5df   zhengshouren   提交服务器初始代码
34
35
36
37
  	agent_util:update(agentInfo)
  	skynet.timeout(100, handle_timeout)
  end
  
8c154e11   saicom   定时gc
38
  local function handle_gc()
f2c7c6fa   saicom   添加补发充值邮件
39
  	if not agentInfo.open_timer then return end
8c154e11   saicom   定时gc
40
41
42
43
      collectgarbage("collect")
  	skynet.timeout(6000, handle_gc)
  end
  
314bc5df   zhengshouren   提交服务器初始代码
44
  function start_agent_timer()
44c6e479   zhouhaihai   增加部分日志
45
  	if agentInfo.open_timer then return end
314bc5df   zhengshouren   提交服务器初始代码
46
47
  	agentInfo.open_timer = true
  	skynet.timeout(150, handle_timeout)
8c154e11   saicom   定时gc
48
  	skynet.timeout(6000, handle_gc)
314bc5df   zhengshouren   提交服务器初始代码
49
50
51
52
53
54
55
  end
  
  function cancel_agent_timer()
  	agentInfo.open_timer = false
  end
  ---- 定时器相关 }}}
  
bc36785e   zhouhaihai   增加 session
56
  
23d89d13   zhouahaihai   冒险 结构
57
  local _pipelinings = {}  --可同时多个序列  栈的形式保证嵌套不出错
314bc5df   zhengshouren   提交服务器初始代码
58
  function SendPacket(actionCode, bin, client_fd)
51d9d20b   liuzujun   付费签到,应用市场反馈
59
60
  	--print(actionHandlers[actionCode])
  	--dump(MsgPack.unpack(bin))
59acc11e   zhouhaihai   赛季更新
61
62
63
64
65
66
67
  	-- 内部消息不扩散出去
  	if actionCode == actionCodes.Sys_endlessSeason then
  		if agentInfo.role then
  			agentInfo.role:advEndlessSeasonCheck()
  		end
  		return
  	end
bc36785e   zhouhaihai   增加 session
68
  	
23d89d13   zhouahaihai   冒险 结构
69
70
  	client_fd = client_fd or agentInfo.client_fd
  	
314bc5df   zhengshouren   提交服务器初始代码
71
  	local handlerName = actionHandlers[actionCode]
bc36785e   zhouhaihai   增加 session
72
73
74
  	local isRpc = string.sub(handlerName, -3, -1) == "Rpc"
  	local session = _codeSession[actionCode] or 0
  	if isRpc then
314bc5df   zhengshouren   提交服务器初始代码
75
76
  		actionCode = actionCode + rpcResponseBegin
  	end
bc36785e   zhouhaihai   增加 session
77
  
23d89d13   zhouahaihai   冒险 结构
78
79
80
81
  	--  查看是否是在 流水线操作中
  	if #_pipelinings > 0 then
  		local _curPipelining = _pipelinings[#_pipelinings]
  		_curPipelining[client_fd] = _curPipelining[client_fd] or {} --区分不同客户端
bc36785e   zhouhaihai   增加 session
82
83
84
85
  		if not isRpc then
  			session = nil
  		end
  		table.insert(_curPipelining[client_fd], {actionCode, bin, session})
23d89d13   zhouahaihai   冒险 结构
86
87
88
  	else
  		if #bin > 0 then bin = xxtea.encrypt(bin, XXTEA_KEY) end
  		local head = string.pack("H", actionCode)
bc36785e   zhouhaihai   增加 session
89
90
91
92
  		if isRpc then
  			-- 是回复 -- 有session号
  			head = head .. string.pack("H", session)
  		end
23d89d13   zhouahaihai   冒险 结构
93
94
95
  		return socket.write(client_fd, netpack.pack(head .. bin))
  	end
  end
314bc5df   zhengshouren   提交服务器初始代码
96
  
23d89d13   zhouahaihai   冒险 结构
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
  function SendPipelining(callback)
  	if type(callback) ~= "function" then return end
  	--push 当前队列
  	table.insert(_pipelinings, {})
  	-- 执行代码块  输出错误信息
  	local ok, err = pcall(callback)
  	--弹出当前队列
  	local curPipelining = table.remove(_pipelinings)
  	-- 查看是否有消息
  	if next(curPipelining) then  
  		for client_fd, msg in pairs(curPipelining) do
  			SendPacket(actionCodes.Role_pipelining, MsgPack.pack(msg), client_fd)
  		end
  	end
  	if not ok then error(err) end
314bc5df   zhengshouren   提交服务器初始代码
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
  end
  
  function rpcAgent(roleId, funcName, ...)
  	local agent = datacenter.get("agent", roleId)
  	if agent then
  		return skynet.call(agent.serv, "lua", funcName, ...)
  	end
  end
  
  function rpcParter(serv, func, ...)
  	if serv then
  		local ok, result = pcall(skynet.call, serv, "role", func, ...)
  		if ok then
  			return result
  		end
  	end
  end
  
  local string_format = string.format
  local table_unpack = table.unpack
  function rpcRole(roleId, funcName, ...)
  	local fields = ...
  	local agent = datacenter.get("agent", roleId)
  	if agent and agent.serv then
  		if funcName == "getProperties" then
  			return true, skynet.call(agent.serv, "role", funcName, fields)
  		else
  			return true, skynet.call(agent.serv, "role", funcName, ...)
  		end
  	else
fa565e0c   zhouhaihai   优化结构
142
  		local roleCross = require("models.RoleCross")
da898074   zhouhaihai   pvp 高级领奖
143
144
145
146
147
  		if funcName == "getProperties" then
  			return false, roleCross.handle(funcName, roleId, fields)
  		else
  			return false, roleCross.handle(funcName, roleId, ...)
  		end
314bc5df   zhengshouren   提交服务器初始代码
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
  	end
  end
  
  function rpcUnion(funcName, ...)
  	local serv = agentInfo.userv
  	if not serv then
  		local consortiaId = agentInfo.role:getProperty("consortiaId")
  		if consortiaId == 0 then return true,1000 end
  		local union = datacenter.get("union", consortiaId)
  		if not union or not union.serv then
  			skynet.error("rpcUnion error: union serv addr not exist", funcName, consortiaId)
  			return
  		end
  		serv = union.serv
  	end
  	return skynet.call(serv, "lua", funcName, ...)
  end
  
  function rpcOtherUnion(id, funcName, ...)
  	local union = datacenter.get("union", id)
  	if union and union.serv then
  		return skynet.call(union.serv, "lua", funcName, ...)
  	end
  end
  
bc36785e   zhouhaihai   增加 session
173
  
314bc5df   zhengshouren   提交服务器初始代码
174
175
176
177
178
179
180
181
182
  skynet.register_protocol {
  	name = "client",
  	id = skynet.PTYPE_CLIENT,
  	unpack = function (msg, sz)
  		local data = skynet.tostring(msg, sz)
  		local cmd = string.unpack("H", string.sub(data, 1, 2))
  		return cmd, string.sub(data, 3)
  	end,
  	dispatch = function(session, address, cmd, data)
188558e8   zhouahaihai   skynet 1.2.0 use...
183
  		skynet.ignoreret()
9ad697c8   zhouahaihai   删除 fields ,增加数据库...
184
  		-- skynet.trace() --执行序的跟踪统计
314bc5df   zhengshouren   提交服务器初始代码
185
186
187
188
189
190
191
192
193
194
  		cs(function()
  			if cmd == actionCodes.Sys_heartBeat then
  				agent_util:heart_beat(agentInfo)
  				return
  			end
  			local actionName = actionHandlers[cmd]
  			if not actionName then
  				print("actionName not exist", actionName)
  				return
  			end
314bc5df   zhengshouren   提交服务器初始代码
195
  
bc36785e   zhouhaihai   增加 session
196
197
198
199
200
201
202
  			if string.sub(actionName, -3, -1) == "Rpc" then
  				-- 是请求 -- 有session号
  				local codeSession = string.unpack("H", string.sub(data, 1, 2))
  				_codeSession[cmd] = codeSession
  				data = string.sub(data, 3)
  			end
  
5e5d7680   zhouhaihai   热更新 优化
203
204
205
206
207
208
209
210
211
212
213
214
  			local method
  			if _hotfixActions[actionName] then
  				method = _hotfixActions[actionName]
  			else
  				local modName, funcName = actionName:match("(%w+)%.(%w+)")
  
  				local ok, action = pcall(require, "actions." .. modName .. "Action")
  				if not ok then
  					print("require module name error", action, modName)
  					return
  				end
  				method = action[funcName]
314bc5df   zhengshouren   提交服务器初始代码
215
  			end
5e5d7680   zhouhaihai   热更新 优化
216
  			
314bc5df   zhengshouren   提交服务器初始代码
217
218
219
220
221
222
  			if type(method) ~= "function" then
  				print("ERROR_SERVER_INVALID_ACTION", modName, funcName)
  				return
  			end
  
  			if #data > 0 then data = xxtea.decrypt(data, XXTEA_KEY) end
44c6e479   zhouhaihai   增加部分日志
223
224
225
226
227
228
  
  			-- 一次操作是一个关联操作 记录 ucode 是一样的
  			if agentInfo.role then
  				agentInfo.role:startActionUcode()
  			end
  			
314bc5df   zhengshouren   提交服务器初始代码
229
  			local result = method(agentInfo, data)
44c6e479   zhouhaihai   增加部分日志
230
231
232
233
234
  
  			if agentInfo.role then
  				agentInfo.role:endActionUcode()
  			end
  
bc36785e   zhouhaihai   增加 session
235
236
237
  			-- 清掉请求session
  			_codeSession[cmd] = nil
  
214061fa   zhengshouren   错误情况,返回错误码
238
239
  			if not result or type(result) == "number" then
  				SendPacket(actionCodes.Sys_innerErrorMsg, MsgPack.pack({id = cmd * 100 + (result or 0)}))
314bc5df   zhengshouren   提交服务器初始代码
240
241
242
243
244
245
246
  			end
  		end)
  	end
  }
  
  skynet.register_protocol {
  	name = "role",
a5486ede   zhouhaihai   csvdata 修改为 share...
247
  	id = 101,
314bc5df   zhengshouren   提交服务器初始代码
248
249
250
  	pack = skynet.pack,
  	unpack = skynet.unpack,
  	dispatch = function(session, address, submethod, ...)
9ad697c8   zhouahaihai   删除 fields ,增加数据库...
251
  		-- skynet.trace() --执行序的跟踪统计
314bc5df   zhengshouren   提交服务器初始代码
252
253
254
255
256
257
258
259
260
261
262
263
  		local result
  		if not agentInfo.role then
  			result = "__OFFLINE__"
  		else
  			result = agentInfo.role[submethod](agentInfo.role, ...)
  		end
  
  		skynet.ret(skynet.pack(result))
  	end,
  }
  
  -- function CMD.start(gate, fd, ip)
555f745e   zhangqijia   feat: 一番赏
264
  function CMD.start(session, source, gate, capsuled, fd, ip, hotfixs)
314bc5df   zhengshouren   提交服务器初始代码
265
266
267
268
  	ignoreHeartbeat = false
  
  	agentInfo.client_fd = fd
  	agentInfo.gate_serv = gate
555f745e   zhangqijia   feat: 一番赏
269
      agentInfo.capsule_serv = capsuled
314bc5df   zhengshouren   提交服务器初始代码
270
271
272
273
274
  	agentInfo.ip = ip
  
  	agent_util:reset()
  	math.randomInit()
  
a5b025b0   zhouhaihai   已存在agent 玩家重新登录不进行更新
275
276
277
278
  	if hotfixs then
  		for _, hotfix in ipairs(hotfixs) do
  			CMD.hotfix(hotfix)
  		end
5e5d7680   zhouhaihai   热更新 优化
279
  	end
3fe4471e   zhouhaihai   热更新 demo
280
  
7cde0c44   zhouhaihai   忽略心跳和未登录状态 超时处理
281
282
  	start_agent_timer()
  
314bc5df   zhengshouren   提交服务器初始代码
283
284
285
286
287
288
289
290
  	-- 这里将消息伪装成 watchdog 发出,这样就由 A->B->C->B->A 变成 A->B->C->A
  	skynet.redirect(gate, source, "lua", session, skynet.pack("forward", fd, 0, skynet.self()))
  end
  
  function CMD.close()
  	cancel_agent_timer()
  	mcast_util.usub_world()
  	mcast_util.usub_union()
314bc5df   zhengshouren   提交服务器初始代码
291
292
  	local role = agentInfo.role
  	if not role then return end
d10a9a36   zhangqijia   fix: 一番赏增加通知信息的数据...
293
  
fb29343f   zhangqijia   fix: 一番赏 断线离开扭蛋机房间
294
  	skynet.call(agentInfo.capsule_serv, "lua", "exit", role:getProperty("id"))
39bcd7ca   zhouhaihai   LOG
295
  	role:log("onLogout", {logtime = skynet.timex()-role:getProperty("ltime")})
f22a33af   zhouhaihai   自己的日志
296
  	role:mylog("logout", {int1 = skynet.timex()-role:getProperty("ltime")})
314bc5df   zhengshouren   提交服务器初始代码
297
298
299
300
301
  	role:onOfflineEvent()
  end
  
  function CMD.exit()
  	if agentInfo.role then
314bc5df   zhengshouren   提交服务器初始代码
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
  		datacenter.set("agent", agentInfo.role:getProperty("id"), nil)
  	end
  	skynet.exit()
  end
  
  function CMD.subUnion(consortiaId, union)
  	mcast_util.sub_union(consortiaId, union.chan)
  	agentInfo.userv = union.serv
  end
  
  function CMD:usubUnion()
  	mcast_util.usub_union()
  	agentInfo.userv = nil
  end
  
3fe4471e   zhouhaihai   热更新 demo
317
318
319
  function CMD.hotfix(code)
  	local ok, func = pcall(load, code)
  	if ok then
2d392ede   zhouhaihai   热更新 最终版
320
  		ok = pcall(func, agentInfo.role)
3fe4471e   zhouhaihai   热更新 demo
321
322
  	end
  	if not ok then
2d392ede   zhouhaihai   热更新 最终版
323
  		skynet.error("hotfix_code error by code " .. code)
3fe4471e   zhouhaihai   热更新 demo
324
325
326
  	end
  end
  
314bc5df   zhengshouren   提交服务器初始代码
327
328
329
330
331
  local function routeGM(cmd, params)
  	if type(params) ~= "table" or not agentInfo.role then
  		return "指令失败"
  	end
  	local _M = require "actions.GmAction"
3133cb76   zhouhaihai   日志
332
333
334
335
336
337
  
  	agentInfo.role:startActionUcode()
  	local status = _M[cmd](agentInfo.role, params)
  	agentInfo.role:endActionUcode()
  	
  	return status
314bc5df   zhengshouren   提交服务器初始代码
338
339
340
341
  end
  
  skynet.start(function()
  	skynet.dispatch("lua", function(session, source, command, ...)
9ad697c8   zhouahaihai   删除 fields ,增加数据库...
342
  		-- skynet.trace() --执行序的跟踪统计
314bc5df   zhengshouren   提交服务器初始代码
343
344
345
  		local f = CMD[command]
  		if f then
  			if command == "exit" then
188558e8   zhouahaihai   skynet 1.2.0 use...
346
  				skynet.ignoreret()
314bc5df   zhengshouren   提交服务器初始代码
347
348
  				f(...)
  			elseif command == "start" then
188558e8   zhouahaihai   skynet 1.2.0 use...
349
  				skynet.ignoreret()
314bc5df   zhengshouren   提交服务器初始代码
350
  				f(session, source, ...)
836228c9   zhouhaihai   热更新 bug
351
352
353
  			elseif command == "hotfix" then
  				skynet.ignoreret()
  				f(...)
314bc5df   zhengshouren   提交服务器初始代码
354
355
356
357
358
359
360
361
  			else
  				skynet.ret(skynet.pack(f(...)))
  			end
  		else
  			skynet.ret(skynet.pack(routeGM(command, ...)))
  		end
  	end)
  
3c0ea5fb   zhouhaihai   抽英雄
362
363
364
365
366
367
368
369
370
  	skynet.info_func(function()
  		local info = {}
  		info.ip = agentInfo.ip
  		if agentInfo.role then
  			info.roldId = agentInfo.role:getProperty("id")
  		end
  		return info
  	end)
  
314bc5df   zhengshouren   提交服务器初始代码
371
372
  	cs = queue()
  
a5486ede   zhouhaihai   csvdata 修改为 share...
373
  	pvpd = skynet.localname(".pvpcross")
314bc5df   zhengshouren   提交服务器初始代码
374
375
376
377
378
  	-- 错误码特殊处理
  	-- todo
  	-- for key, value in pairs(csvdb["sys_codesCsv"]) do
  	-- 	_G[string.upper(value.varname)] = key
  	-- end
a5486ede   zhouhaihai   csvdata 修改为 share...
379
  	globalCsv = csvdb["GlobalDefineCsv"]
314bc5df   zhengshouren   提交服务器初始代码
380
381
  	agent_util = require "services/agent_util"
  end)