Blame view

publish/skynet/lualib/skynet/mqueue.lua 1.76 KB
4d6f285d   zhouhaihai   增加发布功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
  -- This is a deprecated module, use skynet.queue instead.
  
  local skynet = require "skynet"
  local c = require "skynet.core"
  
  local mqueue = {}
  local init_once
  local thread_id
  local message_queue = {}
  
  skynet.register_protocol {
  	name = "queue",
  	-- please read skynet.h for magic number 8
  	id = 8,
  	pack = skynet.pack,
  	unpack = skynet.unpack,
  	dispatch = function(session, from, ...)
  		table.insert(message_queue, {session = session, addr = from, ... })
  		if thread_id then
  			skynet.wakeup(thread_id)
  			thread_id = nil
  		end
  	end
  }
  
  local function do_func(f, msg)
  	return pcall(f, table.unpack(msg))
  end
  
  local function message_dispatch(f)
  	while true do
  		if #message_queue==0 then
  			thread_id = coroutine.running()
  			skynet.wait()
  		else
  			local msg = table.remove(message_queue,1)
  			local session = msg.session
  			if session == 0 then
  				local ok, msg = do_func(f, msg)
  				if ok then
  					if msg then
  						skynet.fork(message_dispatch,f)
  						error(string.format("[:%x] send a message to [:%x] return something", msg.addr, skynet.self()))
  					end
  				else
  					skynet.fork(message_dispatch,f)
  					error(string.format("[:%x] send a message to [:%x] throw an error : %s", msg.addr, skynet.self(),msg))
  				end
  			else
  				local data, size = skynet.pack(do_func(f,msg))
  				-- 1 means response
  				c.send(msg.addr, 1, session, data, size)
  			end
  		end
  	end
  end
  
  function mqueue.register(f)
  	assert(init_once == nil)
  	init_once = true
  	skynet.fork(message_dispatch,f)
  end
  
  local function catch(succ, ...)
  	if succ then
  		return ...
  	else
  		error(...)
  	end
  end
  
  function mqueue.call(addr, ...)
  	return catch(skynet.call(addr, "queue", ...))
  end
  
  function mqueue.send(addr, ...)
  	return skynet.send(addr, "queue", ...)
  end
  
  function mqueue.size()
  	return #message_queue
  end
  
  return mqueue