Blame view

publish/skynet/lualib/skynet/db/redis.lua 6.26 KB
4d6f285d   zhouhaihai   增加发布功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
  local skynet = require "skynet"
  local socket = require "skynet.socket"
  local socketchannel = require "skynet.socketchannel"
  
  local table = table
  local string = string
  local assert = assert
  
  local redis = {}
  local command = {}
  local meta = {
  	__index = command,
  	-- DO NOT close channel in __gc
  }
  
  ---------- redis response
  local redcmd = {}
  
  redcmd[36] = function(fd, data) -- '$'
  	local bytes = tonumber(data)
  	if bytes < 0 then
  		return true,nil
  	end
  	local firstline = fd:read(bytes+2)
  	return true,string.sub(firstline,1,-3)
  end
  
  redcmd[43] = function(fd, data) -- '+'
  	return true,data
  end
  
  redcmd[45] = function(fd, data) -- '-'
  	return false,data
  end
  
  redcmd[58] = function(fd, data) -- ':'
  	-- todo: return string later
  	return true, tonumber(data)
  end
  
  local function read_response(fd)
  	local result = fd:readline "\r\n"
  	local firstchar = string.byte(result)
  	local data = string.sub(result,2)
  	return redcmd[firstchar](fd,data)
  end
  
  redcmd[42] = function(fd, data)	-- '*'
  	local n = tonumber(data)
  	if n < 0 then
  		return true, nil
  	end
  	local bulk = {}
  	local noerr = true
  	for i = 1,n do
  		local ok, v = read_response(fd)
  		if not ok then
  			noerr = false
  		end
  		bulk[i] = v
  	end
  	return noerr, bulk
  end
  
  -------------------
  
  function command:disconnect()
  	self[1]:close()
  	setmetatable(self, nil)
  end
  
  -- msg could be any type of value
  
  local function make_cache(f)
  	return setmetatable({}, {
  		__mode = "kv",
  		__index = f,
  	})
  end
  
  local header_cache = make_cache(function(t,k)
  		local s = "\r\n$" .. k .. "\r\n"
  		t[k] = s
  		return s
  	end)
  
  local command_cache = make_cache(function(t,cmd)
  		local s = "\r\n$"..#cmd.."\r\n"..cmd:upper()
  		t[cmd] = s
  		return s
  	end)
  
  local count_cache = make_cache(function(t,k)
  		local s = "*" .. k
  		t[k] = s
  		return s
  	end)
  
  local function compose_message(cmd, msg)
  	local t = type(msg)
  	local lines = {}
  
  	if t == "table" then
  		lines[1] = count_cache[#msg+1]
  		lines[2] = command_cache[cmd]
  		local idx = 3
  		for _,v in ipairs(msg) do
  			v= tostring(v)
  			lines[idx] = header_cache[#v]
  			lines[idx+1] = v
  			idx = idx + 2
  		end
  		lines[idx] = "\r\n"
  	else
  		msg = tostring(msg)
  		lines[1] = "*2"
  		lines[2] = command_cache[cmd]
  		lines[3] = header_cache[#msg]
  		lines[4] = msg
  		lines[5] = "\r\n"
  	end
  
  	return lines
  end
  
  local function redis_login(auth, db)
  	if auth == nil and db == nil then
  		return
  	end
  	return function(so)
  		if auth then
  			so:request(compose_message("AUTH", auth), read_response)
  		end
  		if db then
  			so:request(compose_message("SELECT", db), read_response)
  		end
  	end
  end
  
  function redis.connect(db_conf)
  	local channel = socketchannel.channel {
  		host = db_conf.host,
  		port = db_conf.port or 6379,
  		auth = redis_login(db_conf.auth, db_conf.db),
  		nodelay = true,
  		overload = db_conf.overload,
  	}
  	-- try connect first only once
  	channel:connect(true)
  	return setmetatable( { channel }, meta )
  end
  
  setmetatable(command, { __index = function(t,k)
  	local cmd = string.upper(k)
  	local f = function (self, v, ...)
  		if type(v) == "table" then
  			return self[1]:request(compose_message(cmd, v), read_response)
  		else
  			return self[1]:request(compose_message(cmd, {v, ...}), read_response)
  		end
  	end
  	t[k] = f
  	return f
  end})
  
  local function read_boolean(so)
  	local ok, result = read_response(so)
  	return ok, result ~= 0
  end
  
  function command:exists(key)
  	local fd = self[1]
  	return fd:request(compose_message ("EXISTS", key), read_boolean)
  end
  
  function command:sismember(key, value)
  	local fd = self[1]
  	return fd:request(compose_message ("SISMEMBER", {key, value}), read_boolean)
  end
  
  local function compose_table(lines, msg)
  	local tinsert = table.insert
  	tinsert(lines, count_cache[#msg])
  	for _,v in ipairs(msg) do
  		v = tostring(v)
  		tinsert(lines,header_cache[#v])
  		tinsert(lines,v)
  	end
  	tinsert(lines, "\r\n")
  	return lines
  end
  
  function command:pipeline(ops, transaction, resp)
  	assert(ops and #ops > 0, "pipeline is null")
  
  	local fd = self[1]
  
  	local cmds = {}
  	for _, cmd in ipairs(ops) do
  		compose_table(cmds, cmd)
  	end
  
  	if resp then
  		return fd:request(cmds, function (fd)
  			for i=1, #ops do
  				local ok, out = read_response(fd)
  				table.insert(resp, {ok = ok, out = out})
  			end
  			return true, resp
  		end)
  	else
  		return fd:request(cmds, function (fd)
  			if transaction then
  				local ok, out
  				for i=1, #ops do
  					ok, out = read_response(fd)
  				end
  				-- return last response
  				return ok,out
  			else
  				local re = {}
  				for i=1, #ops do
  					ok, out = read_response(fd)
  					re[i] = out
  				end
  				return true, re
  			end
  		end)
  	end
  end
  
  --- watch mode
  
  local watch = {}
  
  local watchmeta = {
  	__index = watch,
  	__gc = function(self)
  		self.__sock:close()
  	end,
  }
  
  local function watch_login(obj, auth)
  	return function(so)
  		if auth then
  			so:request(compose_message("AUTH", auth), read_response)
  		end
  		for k in pairs(obj.__psubscribe) do
  			so:request(compose_message ("PSUBSCRIBE", k))
  		end
  		for k in pairs(obj.__subscribe) do
  			so:request(compose_message("SUBSCRIBE", k))
  		end
  	end
  end
  
  function redis.watch(db_conf)
  	local obj = {
  		__subscribe = {},
  		__psubscribe = {},
  	}
  	local channel = socketchannel.channel {
  		host = db_conf.host,
  		port = db_conf.port or 6379,
  		auth = watch_login(obj, db_conf.auth),
  		nodelay = true,
  	}
  	obj.__sock = channel
  
  	-- try connect first only once
  	channel:connect(true)
  	return setmetatable( obj, watchmeta )
  end
  
  function watch:disconnect()
  	self.__sock:close()
  	setmetatable(self, nil)
  end
  
  local function watch_func( name )
  	local NAME = string.upper(name)
  	watch[name] = function(self, ...)
  		local so = self.__sock
  		for i = 1, select("#", ...) do
  			local v = select(i, ...)
  			so:request(compose_message(NAME, v))
  		end
  	end
  end
  
  watch_func "subscribe"
  watch_func "psubscribe"
  watch_func "unsubscribe"
  watch_func "punsubscribe"
  
  function watch:message()
  	local so = self.__sock
  	while true do
  		local ret = so:response(read_response)
  		local type , channel, data , data2 = ret[1], ret[2], ret[3], ret[4]
  		if type == "message" then
  			return data, channel
  		elseif type == "pmessage" then
  			return data2, data, channel
  		elseif type == "subscribe" then
  			self.__subscribe[channel] = true
  		elseif type == "psubscribe" then
  			self.__psubscribe[channel] = true
  		elseif type == "unsubscribe" then
  			self.__subscribe[channel] = nil
  		elseif type == "punsubscribe" then
  			self.__psubscribe[channel] = nil
  		end
  	end
  end
  
  return redis