gateserver.lua
3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
local skynet = require "skynet"
local netpack = require "skynet.netpack"
local socketdriver = require "skynet.socketdriver"
local gateserver = {}
local socket -- listen socket
local queue -- message queue
local maxclient -- max client
local client_number = 0
local CMD = setmetatable({}, { __gc = function() netpack.clear(queue) end })
local nodelay = false
local connection = {}
function gateserver.openclient(fd)
if connection[fd] then
socketdriver.start(fd)
end
end
function gateserver.closeclient(fd)
local c = connection[fd]
if c then
connection[fd] = false
socketdriver.close(fd)
end
end
function gateserver.start(handler)
assert(handler.message)
assert(handler.connect)
function CMD.open( source, conf )
assert(not socket)
local address = conf.address or "0.0.0.0"
local port = assert(conf.port)
maxclient = conf.maxclient or 1024
nodelay = conf.nodelay
skynet.error(string.format("Listen on %s:%d", address, port))
socket = socketdriver.listen(address, port)
socketdriver.start(socket)
if handler.open then
return handler.open(source, conf)
end
end
function CMD.close()
assert(socket)
socketdriver.close(socket)
end
local MSG = {}
local function dispatch_msg(fd, msg, sz)
if connection[fd] then
handler.message(fd, msg, sz)
else
skynet.error(string.format("Drop message from fd (%d) : %s", fd, netpack.tostring(msg,sz)))
end
end
MSG.data = dispatch_msg
local function dispatch_queue()
local fd, msg, sz = netpack.pop(queue)
if fd then
-- may dispatch even the handler.message blocked
-- If the handler.message never block, the queue should be empty, so only fork once and then exit.
skynet.fork(dispatch_queue)
dispatch_msg(fd, msg, sz)
for fd, msg, sz in netpack.pop, queue do
dispatch_msg(fd, msg, sz)
end
end
end
MSG.more = dispatch_queue
function MSG.open(fd, msg)
if client_number >= maxclient then
socketdriver.close(fd)
return
end
if nodelay then
socketdriver.nodelay(fd)
end
connection[fd] = true
client_number = client_number + 1
handler.connect(fd, msg)
end
local function close_fd(fd)
local c = connection[fd]
if c ~= nil then
connection[fd] = nil
client_number = client_number - 1
end
end
function MSG.close(fd)
if fd ~= socket then
if handler.disconnect then
handler.disconnect(fd)
end
close_fd(fd)
else
socket = nil
end
end
function MSG.error(fd, msg)
if fd == socket then
socketdriver.close(fd)
skynet.error("gateserver close listen socket, accpet error:",msg)
else
if handler.error then
handler.error(fd, msg)
end
close_fd(fd)
end
end
function MSG.warning(fd, size)
if handler.warning then
handler.warning(fd, size)
end
end
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = function ( msg, sz )
return netpack.filter( queue, msg, sz)
end,
dispatch = function (_, _, q, type, ...)
queue = q
if type then
MSG[type](...)
end
end
}
skynet.start(function()
skynet.dispatch("lua", function (_, address, cmd, ...)
local f = CMD[cmd]
if f then
skynet.ret(skynet.pack(f(address, ...)))
else
skynet.ret(skynet.pack(handler.command(cmd, address, ...)))
end
end)
end)
end
return gateserver