clustersender.lua
1.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
local skynet = require "skynet"
local sc = require "skynet.socketchannel"
local socket = require "skynet.socket"
local cluster = require "skynet.cluster.core"
local channel
local session = 1
local node, nodename, init_host, init_port = ...
local command = {}
local function send_request(addr, msg, sz)
-- msg is a local pointer, cluster.packrequest will free it
local current_session = session
local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
session = new_session
local tracetag = skynet.tracetag()
if tracetag then
if tracetag:sub(1,1) ~= "(" then
-- add nodename
local newtag = string.format("(%s-%s-%d)%s", nodename, node, session, tracetag)
skynet.tracelog(tracetag, string.format("session %s", newtag))
tracetag = newtag
end
skynet.tracelog(tracetag, string.format("cluster %s", node))
channel:request(cluster.packtrace(tracetag))
end
return channel:request(request, current_session, padding)
end
function command.req(...)
local ok, msg = pcall(send_request, ...)
if ok then
if type(msg) == "table" then
skynet.ret(cluster.concat(msg))
else
skynet.ret(msg)
end
else
skynet.error(msg)
skynet.response()(false)
end
end
function command.push(addr, msg, sz)
local request, new_session, padding = cluster.packpush(addr, session, msg, sz)
if padding then -- is multi push
session = new_session
end
channel:request(request, nil, padding)
end
local function read_response(sock)
local sz = socket.header(sock:read(2))
local msg = sock:read(sz)
return cluster.unpackresponse(msg) -- session, ok, data, padding
end
function command.changenode(host, port)
channel:changehost(host, tonumber(port))
channel:connect(true)
skynet.ret(skynet.pack(nil))
end
skynet.start(function()
channel = sc.channel {
host = init_host,
port = tonumber(init_port),
response = read_response,
nodelay = true,
}
skynet.dispatch("lua", function(session , source, cmd, ...)
local f = assert(command[cmd])
f(...)
end)
end)