cluster.lua
2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
local skynet = require "skynet"
local clusterd
local cluster = {}
local sender = {}
local task_queue = {}
local function request_sender(q, node)
local ok, c = pcall(skynet.call, clusterd, "lua", "sender", node)
if not ok then
skynet.error(c)
c = nil
end
-- run tasks in queue
local confirm = coroutine.running()
q.confirm = confirm
q.sender = c
for _, task in ipairs(q) do
if type(task) == "table" then
if c then
skynet.send(c, "lua", "push", task[1], skynet.pack(table.unpack(task,2,task.n)))
end
else
skynet.wakeup(task)
skynet.wait(confirm)
end
end
task_queue[node] = nil
sender[node] = c
end
local function get_queue(t, node)
local q = {}
t[node] = q
skynet.fork(request_sender, q, node)
return q
end
setmetatable(task_queue, { __index = get_queue } )
local function get_sender(node)
local s = sender[node]
if not s then
local q = task_queue[node]
local task = coroutine.running()
table.insert(q, task)
skynet.wait(task)
skynet.wakeup(q.confirm)
return q.sender
end
return s
end
function cluster.call(node, address, ...)
-- skynet.pack(...) will free by cluster.core.packrequest
return skynet.call(get_sender(node), "lua", "req", address, skynet.pack(...))
end
function cluster.send(node, address, ...)
-- push is the same with req, but no response
local s = sender[node]
if not s then
table.insert(task_queue[node], table.pack(address, ...))
else
skynet.send(sender[node], "lua", "push", address, skynet.pack(...))
end
end
function cluster.open(port)
if type(port) == "string" then
skynet.call(clusterd, "lua", "listen", port)
else
skynet.call(clusterd, "lua", "listen", "0.0.0.0", port)
end
end
function cluster.reload(config)
skynet.call(clusterd, "lua", "reload", config)
end
function cluster.proxy(node, name)
return skynet.call(clusterd, "lua", "proxy", node, name)
end
function cluster.snax(node, name, address)
local snax = require "skynet.snax"
if not address then
address = cluster.call(node, ".service", "QUERY", "snaxd" , name)
end
local handle = skynet.call(clusterd, "lua", "proxy", node, address)
return snax.bind(handle, name)
end
function cluster.register(name, addr)
assert(type(name) == "string")
assert(addr == nil or type(addr) == "number")
return skynet.call(clusterd, "lua", "register", name, addr)
end
function cluster.query(node, name)
return skynet.call(get_sender(node), "lua", "req", 0, skynet.pack(name))
end
skynet.init(function()
clusterd = skynet.uniqueservice("clusterd")
end)
return cluster