Blame view

publish/skynet/service/cmaster.lua 3.19 KB
4d6f285d   zhouhaihai   增加发布功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
  local skynet = require "skynet"
  local socket = require "skynet.socket"
  
  --[[
  	master manage data :
  		1. all the slaves address : id -> ipaddr:port
  		2. all the global names : name -> address
  
  	master hold connections from slaves .
  
  	protocol slave->master :
  		package size 1 byte
  		type 1 byte :
  			'H' : HANDSHAKE, report slave id, and address.
  			'R' : REGISTER name address
  			'Q' : QUERY name
  
  
  	protocol master->slave:
  		package size 1 byte
  		type 1 byte :
  			'W' : WAIT n
  			'C' : CONNECT slave_id slave_address
  			'N' : NAME globalname address
  			'D' : DISCONNECT slave_id
  ]]
  
  local slave_node = {}
  local global_name = {}
  
  local function read_package(fd)
  	local sz = socket.read(fd, 1)
  	assert(sz, "closed")
  	sz = string.byte(sz)
  	local content = assert(socket.read(fd, sz), "closed")
  	return skynet.unpack(content)
  end
  
  local function pack_package(...)
  	local message = skynet.packstring(...)
  	local size = #message
  	assert(size <= 255 , "too long")
  	return string.char(size) .. message
  end
  
  local function report_slave(fd, slave_id, slave_addr)
  	local message = pack_package("C", slave_id, slave_addr)
  	local n = 0
  	for k,v in pairs(slave_node) do
  		if v.fd ~= 0 then
  			socket.write(v.fd, message)
  			n = n + 1
  		end
  	end
  	socket.write(fd, pack_package("W", n))
  end
  
  local function handshake(fd)
  	local t, slave_id, slave_addr = read_package(fd)
  	assert(t=='H', "Invalid handshake type " .. t)
  	assert(slave_id ~= 0 , "Invalid slave id 0")
  	if slave_node[slave_id] then
  		error(string.format("Slave %d already register on %s", slave_id, slave_node[slave_id].addr))
  	end
  	report_slave(fd, slave_id, slave_addr)
  	slave_node[slave_id] = {
  		fd = fd,
  		id = slave_id,
  		addr = slave_addr,
  	}
  	return slave_id , slave_addr
  end
  
  local function dispatch_slave(fd)
  	local t, name, address = read_package(fd)
  	if t == 'R' then
  		-- register name
  		assert(type(address)=="number", "Invalid request")
  		if not global_name[name] then
  			global_name[name] = address
  		end
  		local message = pack_package("N", name, address)
  		for k,v in pairs(slave_node) do
  			socket.write(v.fd, message)
  		end
  	elseif t == 'Q' then
  		-- query name
  		local address = global_name[name]
  		if address then
  			socket.write(fd, pack_package("N", name, address))
  		end
  	else
  		skynet.error("Invalid slave message type " .. t)
  	end
  end
  
  local function monitor_slave(slave_id, slave_address)
  	local fd = slave_node[slave_id].fd
  	skynet.error(string.format("Harbor %d (fd=%d) report %s", slave_id, fd, slave_address))
  	while pcall(dispatch_slave, fd) do end
  	skynet.error("slave " ..slave_id .. " is down")
  	local message = pack_package("D", slave_id)
  	slave_node[slave_id].fd = 0
  	for k,v in pairs(slave_node) do
  		socket.write(v.fd, message)
  	end
  	socket.close(fd)
  end
  
  skynet.start(function()
  	local master_addr = skynet.getenv "standalone"
  	skynet.error("master listen socket " .. tostring(master_addr))
  	local fd = socket.listen(master_addr)
  	socket.start(fd , function(id, addr)
  		skynet.error("connect from " .. addr .. " " .. id)
  		socket.start(id)
  		local ok, slave, slave_addr = pcall(handshake, id)
  		if ok then
  			skynet.fork(monitor_slave, slave, slave_addr)
  		else
  			skynet.error(string.format("disconnect fd = %d, error = %s", id, slave))
  			socket.close(id)
  		end
  	end)
  end)