local skynet = require "skynet" require "skynet.manager" local cluster = require "skynet.cluster.core" local config_name = skynet.getenv "cluster" local node_address = {} local node_sender = {} local command = {} local config = {} local nodename = cluster.nodename() local connecting = {} local function open_channel(t, key) local ct = connecting[key] if ct then local co = coroutine.running() table.insert(ct, co) skynet.wait(co) return assert(ct.channel) end ct = {} connecting[key] = ct local address = node_address[key] if address == nil and not config.nowaiting then local co = coroutine.running() assert(ct.namequery == nil) ct.namequery = co skynet.error("Waiting for cluster node [".. key.."]") skynet.wait(co) address = node_address[key] end local succ, err, c if address then local host, port = string.match(address, "([^:]+):(.*)$") c = node_sender[key] if c == nil then c = skynet.newservice("clustersender", key, nodename, host, port) if node_sender[key] then -- double check skynet.kill(c) c = node_sender[key] else node_sender[key] = c end end succ = pcall(skynet.call, c, "lua", "changenode", host, port) if succ then t[key] = c ct.channel = c else err = string.format("changenode [%s] (%s:%s) failed", key, host, port) end else err = string.format("cluster node [%s] is %s.", key, address == false and "down" or "absent") end connecting[key] = nil for _, co in ipairs(ct) do skynet.wakeup(co) end assert(succ, err) if node_address[key] ~= address then return open_channel(t,key) end return c end local node_channel = setmetatable({}, { __index = open_channel }) local function loadconfig(tmp) if tmp == nil then tmp = {} if config_name then local f = assert(io.open(config_name)) local source = f:read "*a" f:close() assert(load(source, "@"..config_name, "t", tmp))() end end local reload = {} for name,address in pairs(tmp) do if name:sub(1,2) == "__" then name = name:sub(3) config[name] = address skynet.error(string.format("Config %s = %s", name, address)) else assert(address == false or type(address) == "string") if node_address[name] ~= address then -- address changed if rawget(node_channel, name) then node_channel[name] = nil -- reset connection table.insert(reload, name) end node_address[name] = address end local ct = connecting[name] if ct and ct.namequery and not config.nowaiting then skynet.error(string.format("Cluster node [%s] resloved : %s", name, address)) skynet.wakeup(ct.namequery) end end end if config.nowaiting then -- wakeup all connecting request for name, ct in pairs(connecting) do if ct.namequery then skynet.wakeup(ct.namequery) end end end for _, name in ipairs(reload) do -- open_channel would block skynet.fork(open_channel, node_channel, name) end end function command.reload(source, config) loadconfig(config) skynet.ret(skynet.pack(nil)) end function command.listen(source, addr, port) local gate = skynet.newservice("gate") if port == nil then local address = assert(node_address[addr], addr .. " is down") addr, port = string.match(address, "([^:]+):(.*)$") end skynet.call(gate, "lua", "open", { address = addr, port = port }) skynet.ret(skynet.pack(nil)) end function command.sender(source, node) skynet.ret(skynet.pack(node_channel[node])) end function command.senders(source) skynet.retpack(node_sender) end local proxy = {} function command.proxy(source, node, name) if name == nil then node, name = node:match "^([^@.]+)([@.].+)" if name == nil then error ("Invalid name " .. tostring(node)) end end local fullname = node .. "." .. name local p = proxy[fullname] if p == nil then p = skynet.newservice("clusterproxy", node, name) -- double check if proxy[fullname] then skynet.kill(p) p = proxy[fullname] else proxy[fullname] = p end end skynet.ret(skynet.pack(p)) end local cluster_agent = {} -- fd:service local register_name = {} local function clearnamecache() for fd, service in pairs(cluster_agent) do if type(service) == "number" then skynet.send(service, "lua", "namechange") end end end function command.register(source, name, addr) assert(register_name[name] == nil) addr = addr or source local old_name = register_name[addr] if old_name then register_name[old_name] = nil clearnamecache() end register_name[addr] = name register_name[name] = addr skynet.ret(nil) skynet.error(string.format("Register [%s] :%08x", name, addr)) end function command.queryname(source, name) skynet.ret(skynet.pack(register_name[name])) end function command.socket(source, subcmd, fd, msg) if subcmd == "open" then skynet.error(string.format("socket accept from %s", msg)) -- new cluster agent cluster_agent[fd] = false local agent = skynet.newservice("clusteragent", skynet.self(), source, fd) local closed = cluster_agent[fd] cluster_agent[fd] = agent if closed then skynet.send(agent, "lua", "exit") cluster_agent[fd] = nil end else if subcmd == "close" or subcmd == "error" then -- close cluster agent local agent = cluster_agent[fd] if type(agent) == "boolean" then cluster_agent[fd] = true elseif agent then skynet.send(agent, "lua", "exit") cluster_agent[fd] = nil end else skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or "")) end end end skynet.start(function() loadconfig() skynet.dispatch("lua", function(session , source, cmd, ...) local f = assert(command[cmd]) f(source, ...) end) end)