Blame view

publish/skynet/lualib/skynet/cluster.lua 2.47 KB
4d6f285d   zhouhaihai   增加发布功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
  local skynet = require "skynet"
  
  local clusterd
  local cluster = {}
  local sender = {}
  local task_queue = {}
  
  local function request_sender(q, node)
  	local ok, c = pcall(skynet.call, clusterd, "lua", "sender", node)
  	if not ok then
  		skynet.error(c)
  		c = nil
  	end
  	-- run tasks in queue
  	local confirm = coroutine.running()
  	q.confirm = confirm
  	q.sender = c
  	for _, task in ipairs(q) do
  		if type(task) == "table" then
  			if c then
  				skynet.send(c, "lua", "push", task[1], skynet.pack(table.unpack(task,2,task.n)))
  			end
  		else
  			skynet.wakeup(task)
  			skynet.wait(confirm)
  		end
  	end
  	task_queue[node] = nil
  	sender[node] = c
  end
  
  local function get_queue(t, node)
  	local q = {}
  	t[node] = q
  	skynet.fork(request_sender, q, node)
  	return q
  end
  
  setmetatable(task_queue, { __index = get_queue } )
  
  local function get_sender(node)
  	local s = sender[node]
  	if not s then
  		local q = task_queue[node]
  		local task = coroutine.running()
  		table.insert(q, task)
  		skynet.wait(task)
  		skynet.wakeup(q.confirm)
  		return q.sender
  	end
  	return s
  end
  
  function cluster.call(node, address, ...)
  	-- skynet.pack(...) will free by cluster.core.packrequest
  	return skynet.call(get_sender(node), "lua", "req",  address, skynet.pack(...))
  end
  
  function cluster.send(node, address, ...)
  	-- push is the same with req, but no response
  	local s = sender[node]
  	if not s then
  		table.insert(task_queue[node], table.pack(address, ...))
  	else
  		skynet.send(sender[node], "lua", "push", address, skynet.pack(...))
  	end
  end
  
  function cluster.open(port)
  	if type(port) == "string" then
  		skynet.call(clusterd, "lua", "listen", port)
  	else
  		skynet.call(clusterd, "lua", "listen", "0.0.0.0", port)
  	end
  end
  
  function cluster.reload(config)
  	skynet.call(clusterd, "lua", "reload", config)
  end
  
  function cluster.proxy(node, name)
  	return skynet.call(clusterd, "lua", "proxy", node, name)
  end
  
  function cluster.snax(node, name, address)
  	local snax = require "skynet.snax"
  	if not address then
  		address = cluster.call(node, ".service", "QUERY", "snaxd" , name)
  	end
  	local handle = skynet.call(clusterd, "lua", "proxy", node, address)
  	return snax.bind(handle, name)
  end
  
  function cluster.register(name, addr)
  	assert(type(name) == "string")
  	assert(addr == nil or type(addr) == "number")
  	return skynet.call(clusterd, "lua", "register", name, addr)
  end
  
  function cluster.query(node, name)
  	return skynet.call(get_sender(node), "lua", "req", 0, skynet.pack(name))
  end
  
  skynet.init(function()
  	clusterd = skynet.uniqueservice("clusterd")
  end)
  
  return cluster