Blame view

publish/skynet/service/clusterd.lua 5.51 KB
4d6f285d   zhouhaihai   增加发布功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
  local skynet = require "skynet"
  require "skynet.manager"
  local cluster = require "skynet.cluster.core"
  
  local config_name = skynet.getenv "cluster"
  local node_address = {}
  local node_sender = {}
  local command = {}
  local config = {}
  local nodename = cluster.nodename()
  
  local connecting = {}
  
  local function open_channel(t, key)
  	local ct = connecting[key]
  	if ct then
  		local co = coroutine.running()
  		table.insert(ct, co)
  		skynet.wait(co)
  		return assert(ct.channel)
  	end
  	ct = {}
  	connecting[key] = ct
  	local address = node_address[key]
  	if address == nil and not config.nowaiting then
  		local co = coroutine.running()
  		assert(ct.namequery == nil)
  		ct.namequery = co
  		skynet.error("Waiting for cluster node [".. key.."]")
  		skynet.wait(co)
  		address = node_address[key]
  	end
  	local succ, err, c
  	if address then
  		local host, port = string.match(address, "([^:]+):(.*)$")
  		c = node_sender[key]
  		if c == nil then
  			c = skynet.newservice("clustersender", key, nodename, host, port)
  			if node_sender[key] then
  				-- double check
  				skynet.kill(c)
  				c = node_sender[key]
  			else
  				node_sender[key] = c
  			end
  		end
  
  		succ = pcall(skynet.call, c, "lua", "changenode", host, port)
  
  		if succ then
  			t[key] = c
  			ct.channel = c
  		else
  			err = string.format("changenode [%s] (%s:%s) failed", key, host, port)
  		end
  	else
  		err = string.format("cluster node [%s] is %s.", key,  address == false and "down" or "absent")
  	end
  	connecting[key] = nil
  	for _, co in ipairs(ct) do
  		skynet.wakeup(co)
  	end
  	assert(succ, err)
  	if node_address[key] ~= address then
  		return open_channel(t,key)
  	end
  	return c
  end
  
  local node_channel = setmetatable({}, { __index = open_channel })
  
  local function loadconfig(tmp)
  	if tmp == nil then
  		tmp = {}
  		if config_name then
  			local f = assert(io.open(config_name))
  			local source = f:read "*a"
  			f:close()
  			assert(load(source, "@"..config_name, "t", tmp))()
  		end
  	end
  	local reload = {}
  	for name,address in pairs(tmp) do
  		if name:sub(1,2) == "__" then
  			name = name:sub(3)
  			config[name] = address
  			skynet.error(string.format("Config %s = %s", name, address))
  		else
  			assert(address == false or type(address) == "string")
  			if node_address[name] ~= address then
  				-- address changed
  				if rawget(node_channel, name) then
  					node_channel[name] = nil	-- reset connection
  					table.insert(reload, name)
  				end
  				node_address[name] = address
  			end
  			local ct = connecting[name]
  			if ct and ct.namequery and not config.nowaiting then
  				skynet.error(string.format("Cluster node [%s] resloved : %s", name, address))
  				skynet.wakeup(ct.namequery)
  			end
  		end
  	end
  	if config.nowaiting then
  		-- wakeup all connecting request
  		for name, ct in pairs(connecting) do
  			if ct.namequery then
  				skynet.wakeup(ct.namequery)
  			end
  		end
  	end
  	for _, name in ipairs(reload) do
  		-- open_channel would block
  		skynet.fork(open_channel, node_channel, name)
  	end
  end
  
  function command.reload(source, config)
  	loadconfig(config)
  	skynet.ret(skynet.pack(nil))
  end
  
  function command.listen(source, addr, port)
  	local gate = skynet.newservice("gate")
  	if port == nil then
  		local address = assert(node_address[addr], addr .. " is down")
  		addr, port = string.match(address, "([^:]+):(.*)$")
  	end
  	skynet.call(gate, "lua", "open", { address = addr, port = port })
  	skynet.ret(skynet.pack(nil))
  end
  
  function command.sender(source, node)
  	skynet.ret(skynet.pack(node_channel[node]))
  end
  
  function command.senders(source)
  	skynet.retpack(node_sender)
  end
  
  local proxy = {}
  
  function command.proxy(source, node, name)
  	if name == nil then
  		node, name = node:match "^([^@.]+)([@.].+)"
  		if name == nil then
  			error ("Invalid name " .. tostring(node))
  		end
  	end
  	local fullname = node .. "." .. name
  	local p = proxy[fullname]
  	if p == nil then
  		p = skynet.newservice("clusterproxy", node, name)
  		-- double check
  		if proxy[fullname] then
  			skynet.kill(p)
  			p = proxy[fullname]
  		else
  			proxy[fullname] = p
  		end
  	end
  	skynet.ret(skynet.pack(p))
  end
  
  local cluster_agent = {}	-- fd:service
  local register_name = {}
  
  local function clearnamecache()
  	for fd, service in pairs(cluster_agent) do
  		if type(service) == "number" then
  			skynet.send(service, "lua", "namechange")
  		end
  	end
  end
  
  function command.register(source, name, addr)
  	assert(register_name[name] == nil)
  	addr = addr or source
  	local old_name = register_name[addr]
  	if old_name then
  		register_name[old_name] = nil
  		clearnamecache()
  	end
  	register_name[addr] = name
  	register_name[name] = addr
  	skynet.ret(nil)
  	skynet.error(string.format("Register [%s] :%08x", name, addr))
  end
  
  function command.queryname(source, name)
  	skynet.ret(skynet.pack(register_name[name]))
  end
  
  function command.socket(source, subcmd, fd, msg)
  	if subcmd == "open" then
  		skynet.error(string.format("socket accept from %s", msg))
  		-- new cluster agent
  		cluster_agent[fd] = false
  		local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
  		local closed = cluster_agent[fd]
  		cluster_agent[fd] = agent
  		if closed then
  			skynet.send(agent, "lua", "exit")
  			cluster_agent[fd] = nil
  		end
  	else
  		if subcmd == "close" or subcmd == "error" then
  			-- close cluster agent
  			local agent = cluster_agent[fd]
  			if type(agent) == "boolean" then
  				cluster_agent[fd] = true
  			elseif agent then
  				skynet.send(agent, "lua", "exit")
  				cluster_agent[fd] = nil
  			end
  		else
  			skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
  		end
  	end
  end
  
  skynet.start(function()
  	loadconfig()
  	skynet.dispatch("lua", function(session , source, cmd, ...)
  		local f = assert(command[cmd])
  		f(source, ...)
  	end)
  end)