Blame view

publish/skynet/lualib/skynet/multicast.lua 2.15 KB
4d6f285d   zhouhaihai   增加发布功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
  local skynet = require "skynet"
  local mc = require "skynet.multicast.core"
  
  local multicastd
  local multicast = {}
  local dispatch = {}
  
  local chan = {}
  local chan_meta = {
  	__index = chan,
  	__gc = function(self)
  		self:unsubscribe()
  	end,
  	__tostring = function (self)
  		return string.format("[Multicast:%x]",self.channel)
  	end,
  }
  
  function multicast.new(conf)
  	assert(multicastd, "Init first")
  	local self = {}
  	conf = conf or self
  	self.channel = conf.channel
  	if self.channel == nil then
  		self.channel = skynet.call(multicastd, "lua", "NEW")
  	end
  	self.__pack = conf.pack or skynet.pack
  	self.__unpack = conf.unpack or skynet.unpack
  	self.__dispatch = conf.dispatch
  
  	return setmetatable(self, chan_meta)
  end
  
  function chan:delete()
  	local c = assert(self.channel)
  	skynet.send(multicastd, "lua", "DEL", c)
  	self.channel = nil
  	self.__subscribe = nil
  end
  
  function chan:publish(...)
  	local c = assert(self.channel)
  	skynet.call(multicastd, "lua", "PUB", c, mc.pack(self.__pack(...)))
  end
  
  function chan:subscribe()
  	local c = assert(self.channel)
  	if self.__subscribe then
  		-- already subscribe
  		return
  	end
  	skynet.call(multicastd, "lua", "SUB", c)
  	self.__subscribe = true
  	dispatch[c] = self
  end
  
  function chan:unsubscribe()
  	if not self.__subscribe then
  		-- already unsubscribe
  		return
  	end
  	local c = assert(self.channel)
  	skynet.send(multicastd, "lua", "USUB", c)
  	self.__subscribe = nil
  	dispatch[c] = nil
  end
  
  local function dispatch_subscribe(channel, source, pack, msg, sz)
  	-- channel as session, do need response
  	skynet.ignoreret()
  	local self = dispatch[channel]
  	if not self then
  		mc.close(pack)
  		-- This channel may unsubscribe first, see #1141
  		return
  	end
  
  	if self.__subscribe then
  		local ok, err = pcall(self.__dispatch, self, source, self.__unpack(msg, sz))
  		mc.close(pack)
  		assert(ok, err)
  	else
  		-- maybe unsubscribe first, but the message is send out. drop the message unneed
  		mc.close(pack)
  	end
  end
  
  local function init()
  	multicastd = skynet.uniqueservice "multicastd"
  	skynet.register_protocol {
  		name = "multicast",
  		id = skynet.PTYPE_MULTICAST,
  		unpack = mc.unpack,
  		dispatch = dispatch_subscribe,
  	}
  end
  
  skynet.init(init, "multicast")
  
  return multicast