logd.lua
2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
local skynet = require "skynet"
local mongo = require "mongo"
local queue = require "skynet.queue"
local bson = require "bson"
local socketdriver = require "socketdriver"
local serverId = tonumber(skynet.getenv("servId"))
require "shared.init"
require "skynet.manager"
local table_insert = table.insert
local pairs = pairs
local ipairs = ipairs
local string_format = string.format
local CMD, cache, client, cs = {}, {}
local auto = {}
local rsyslog_fd
-- function getNextSequence(collect)
-- 	local index = auto[collect]
-- 	if not index then
-- 		local res = client.counters:findAndModify({
-- 			query = {},
-- 			update = {["$inc"] = {[collect] = 1}},
-- 			upsert = true,
-- 			new = true,
-- 		})
-- 		index = res.value[collect]
-- 	else
-- 		index = index + 1
-- 	end
-- 	auto[collect] = index
-- 	return index
-- end
local dateTypes = {"timestamp", "createTime", "lastLoginTime"}
local stringTypes = {"s1", "s2", "s3"}
local intTypes = {"int1", "int2", "int3", "int4"}
function CMD.log(collect, doc)
	-- write to rsyslog
	local now = skynet.timex()
	doc["timestamp"] = now
	doc["server"] = serverId
	for _, field in pairs(stringTypes) do
		if doc[field] then
			doc[field] = tostring(doc[field])
		end
	end
	for _, field in pairs(intTypes) do
		if doc[field] then
			doc[field] = tonumber(doc[field])
		end
	end
	for _, field in pairs(dateTypes) do
		if doc[field] then
			doc[field .. "_t"] = tonumber(os.date("%Y%m%d%H%M%S", doc[field]))
		end
	end
	-- Local-6 + Info 
	socketdriver.send(rsyslog_fd, string.format("<182>%s: %s\n", collect, json.encode(doc)))
	-- if not cache[collect] then cache[collect] = {} end
	-- doc["timestamp"] = now
	-- doc["_id"] = getNextSequence(collect)
	-- table_insert(cache[collect], doc)
end
function CMD.open(conf)
	rsyslog_fd = socketdriver.connect("127.0.0.1", 514)
	socketdriver.start(rsyslog_fd)
	-- local db = mongo.client {
	-- 	host = conf.mongohost,
	-- 	port = conf.mongoport or 27017,
	-- 	username = conf.mongouser,
	-- 	password = conf.mongopswd,
	-- }
	-- assert(db, "mongo connect error")
	-- local servId = skynet.getenv "servId"
	-- client = db["s"..servId]
end
-- local function __loop__()
-- 	while true do
-- 		cs(function ()
-- 			for col, docs in pairs(cache) do
-- 				client[col]:batch_insert(docs)
-- 				client.counters:update({}, {["$set"]={[col]=auto[col]}})
-- 			end
-- 			cache = {}
-- 		end)
-- 		skynet.sleep(200)
-- 	end
-- end
local function __init__()
	skynet.dispatch("lua", function (_, _, command, ...)
		local f = CMD[command]
		if command == "open" then
			skynet.ret(skynet.pack(f(...)))
		else
			local collect, doc = ...
			cs(function() f(collect, doc) end)
		end
	end)
	cs = queue()
	-- skynet.fork(__loop__)
	skynet.register "LOGD"
end
skynet.start(__init__)