Blame view

publish/skynet/service/clusteragent.lua 3.36 KB
4d6f285d   zhouhaihai   增加发布功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
  local skynet = require "skynet"
  local sc = require "skynet.socketchannel"
  local socket = require "skynet.socket"
  local cluster = require "skynet.cluster.core"
  local ignoreret = skynet.ignoreret
  
  local clusterd, gate, fd = ...
  clusterd = tonumber(clusterd)
  gate = tonumber(gate)
  fd = tonumber(fd)
  
  local large_request = {}
  local inquery_name = {}
  
  local register_name_mt = { __index =
  	function(self, name)
  		local waitco = inquery_name[name]
  		if waitco then
  			local co=coroutine.running()
  			table.insert(waitco, co)
  			skynet.wait(co)
  			return rawget(self, name)
  		else
  			waitco = {}
  			inquery_name[name] = waitco
  			local addr = skynet.call(clusterd, "lua", "queryname", name:sub(2))	-- name must be '@xxxx'
  			if addr then
  				self[name] = addr
  			end
  			inquery_name[name] = nil
  			for _, co in ipairs(waitco) do
  				skynet.wakeup(co)
  			end
  			return addr
  		end
  	end
  }
  
  local function new_register_name()
  	return setmetatable({}, register_name_mt)
  end
  
  local register_name = new_register_name()
  
  local tracetag
  
  local function dispatch_request(_,_,addr, session, msg, sz, padding, is_push)
  	ignoreret()	-- session is fd, don't call skynet.ret
  	if session == nil then
  		-- trace
  		tracetag = addr
  		return
  	end
  	if padding then
  		local req = large_request[session] or { addr = addr , is_push = is_push, tracetag = tracetag }
  		tracetag = nil
  		large_request[session] = req
  		cluster.append(req, msg, sz)
  		return
  	else
  		local req = large_request[session]
  		if req then
  			tracetag = req.tracetag
  			large_request[session] = nil
  			cluster.append(req, msg, sz)
  			msg,sz = cluster.concat(req)
  			addr = req.addr
  			is_push = req.is_push
  		end
  		if not msg then
  			tracetag = nil
  			local response = cluster.packresponse(session, false, "Invalid large req")
  			socket.write(fd, response)
  			return
  		end
  	end
  	local ok, response
  	if addr == 0 then
  		local name = skynet.unpack(msg, sz)
  		skynet.trash(msg, sz)
  		local addr = register_name["@" .. name]
  		if addr then
  			ok = true
  			msg, sz = skynet.pack(addr)
  		else
  			ok = false
  			msg = "name not found"
  		end
  	else
  		if cluster.isname(addr) then
  			addr = register_name[addr]
  		end
  		if addr then
  			if is_push then
  				skynet.rawsend(addr, "lua", msg, sz)
  				return	-- no response
  			else
  				if tracetag then
  					ok , msg, sz = pcall(skynet.tracecall, tracetag, addr, "lua", msg, sz)
  					tracetag = nil
  				else
  					ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
  				end
  			end
  		else
  			ok = false
  			msg = "Invalid name"
  		end
  	end
  	if ok then
  		response = cluster.packresponse(session, true, msg, sz)
  		if type(response) == "table" then
  			for _, v in ipairs(response) do
  				socket.lwrite(fd, v)
  			end
  		else
  			socket.write(fd, response)
  		end
  	else
  		response = cluster.packresponse(session, false, msg)
  		socket.write(fd, response)
  	end
  end
  
  skynet.start(function()
  	skynet.register_protocol {
  		name = "client",
  		id = skynet.PTYPE_CLIENT,
  		unpack = cluster.unpackrequest,
  		dispatch = dispatch_request,
  	}
  	-- fd can write, but don't read fd, the data package will forward from gate though client protocol.
  	skynet.call(gate, "lua", "forward", fd)
  
  	skynet.dispatch("lua", function(_,source, cmd, ...)
  		if cmd == "exit" then
  			socket.close(fd)
  			skynet.exit()
  		elseif cmd == "namechange" then
  			register_name = new_register_name()
  		else
  			skynet.error(string.format("Invalid command %s from %s", cmd, skynet.address(source)))
  		end
  	end)
  end)