clusteragent.lua
3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
local skynet = require "skynet"
local sc = require "skynet.socketchannel"
local socket = require "skynet.socket"
local cluster = require "skynet.cluster.core"
local ignoreret = skynet.ignoreret
local clusterd, gate, fd = ...
clusterd = tonumber(clusterd)
gate = tonumber(gate)
fd = tonumber(fd)
local large_request = {}
local inquery_name = {}
local register_name_mt = { __index =
	function(self, name)
		local waitco = inquery_name[name]
		if waitco then
			local co=coroutine.running()
			table.insert(waitco, co)
			skynet.wait(co)
			return rawget(self, name)
		else
			waitco = {}
			inquery_name[name] = waitco
			local addr = skynet.call(clusterd, "lua", "queryname", name:sub(2))	-- name must be '@xxxx'
			if addr then
				self[name] = addr
			end
			inquery_name[name] = nil
			for _, co in ipairs(waitco) do
				skynet.wakeup(co)
			end
			return addr
		end
	end
}
local function new_register_name()
	return setmetatable({}, register_name_mt)
end
local register_name = new_register_name()
local tracetag
local function dispatch_request(_,_,addr, session, msg, sz, padding, is_push)
	ignoreret()	-- session is fd, don't call skynet.ret
	if session == nil then
		-- trace
		tracetag = addr
		return
	end
	if padding then
		local req = large_request[session] or { addr = addr , is_push = is_push, tracetag = tracetag }
		tracetag = nil
		large_request[session] = req
		cluster.append(req, msg, sz)
		return
	else
		local req = large_request[session]
		if req then
			tracetag = req.tracetag
			large_request[session] = nil
			cluster.append(req, msg, sz)
			msg,sz = cluster.concat(req)
			addr = req.addr
			is_push = req.is_push
		end
		if not msg then
			tracetag = nil
			local response = cluster.packresponse(session, false, "Invalid large req")
			socket.write(fd, response)
			return
		end
	end
	local ok, response
	if addr == 0 then
		local name = skynet.unpack(msg, sz)
		skynet.trash(msg, sz)
		local addr = register_name["@" .. name]
		if addr then
			ok = true
			msg, sz = skynet.pack(addr)
		else
			ok = false
			msg = "name not found"
		end
	else
		if cluster.isname(addr) then
			addr = register_name[addr]
		end
		if addr then
			if is_push then
				skynet.rawsend(addr, "lua", msg, sz)
				return	-- no response
			else
				if tracetag then
					ok , msg, sz = pcall(skynet.tracecall, tracetag, addr, "lua", msg, sz)
					tracetag = nil
				else
					ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
				end
			end
		else
			ok = false
			msg = "Invalid name"
		end
	end
	if ok then
		response = cluster.packresponse(session, true, msg, sz)
		if type(response) == "table" then
			for _, v in ipairs(response) do
				socket.lwrite(fd, v)
			end
		else
			socket.write(fd, response)
		end
	else
		response = cluster.packresponse(session, false, msg)
		socket.write(fd, response)
	end
end
skynet.start(function()
	skynet.register_protocol {
		name = "client",
		id = skynet.PTYPE_CLIENT,
		unpack = cluster.unpackrequest,
		dispatch = dispatch_request,
	}
	-- fd can write, but don't read fd, the data package will forward from gate though client protocol.
	skynet.call(gate, "lua", "forward", fd)
	skynet.dispatch("lua", function(_,source, cmd, ...)
		if cmd == "exit" then
			socket.close(fd)
			skynet.exit()
		elseif cmd == "namechange" then
			register_name = new_register_name()
		else
			skynet.error(string.format("Invalid command %s from %s", cmd, skynet.address(source)))
		end
	end)
end)