logd.lua
2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
local skynet = require "skynet"
local mongo = require "mongo"
local queue = require "skynet.queue"
local bson = require "bson"
local socketdriver = require "socketdriver"
local serverId = tonumber(skynet.getenv("servId"))
require "shared.init"
require "skynet.manager"
local table_insert = table.insert
local pairs = pairs
local ipairs = ipairs
local string_format = string.format
local CMD, cache, client, cs = {}, {}
local auto = {}
local rsyslog_fd
-- function getNextSequence(collect)
-- local index = auto[collect]
-- if not index then
-- local res = client.counters:findAndModify({
-- query = {},
-- update = {["$inc"] = {[collect] = 1}},
-- upsert = true,
-- new = true,
-- })
-- index = res.value[collect]
-- else
-- index = index + 1
-- end
-- auto[collect] = index
-- return index
-- end
local dateTypes = {"timestamp", "createTime", "lastLoginTime"}
local stringTypes = {"s1", "s2", "s3"}
local intTypes = {"int1", "int2", "int3", "int4"}
function CMD.log(collect, doc)
-- write to rsyslog
local now = skynet.timex()
doc["timestamp"] = now
doc["server"] = serverId
for _, field in pairs(stringTypes) do
if doc[field] then
doc[field] = tostring(doc[field])
end
end
for _, field in pairs(intTypes) do
if doc[field] then
doc[field] = tonumber(doc[field])
end
end
for _, field in pairs(dateTypes) do
if doc[field] then
doc[field .. "_t"] = tonumber(os.date("%Y%m%d%H%M%S", doc[field]))
end
end
-- Local-6 + Info
socketdriver.send(rsyslog_fd, string.format("<182>%s: %s\n", collect, json.encode(doc)))
-- if not cache[collect] then cache[collect] = {} end
-- doc["timestamp"] = now
-- doc["_id"] = getNextSequence(collect)
-- table_insert(cache[collect], doc)
end
function CMD.open(conf)
rsyslog_fd = socketdriver.connect("127.0.0.1", 514)
socketdriver.start(rsyslog_fd)
-- local db = mongo.client {
-- host = conf.mongohost,
-- port = conf.mongoport or 27017,
-- username = conf.mongouser,
-- password = conf.mongopswd,
-- }
-- assert(db, "mongo connect error")
-- local servId = skynet.getenv "servId"
-- client = db["s"..servId]
end
-- local function __loop__()
-- while true do
-- cs(function ()
-- for col, docs in pairs(cache) do
-- client[col]:batch_insert(docs)
-- client.counters:update({}, {["$set"]={[col]=auto[col]}})
-- end
-- cache = {}
-- end)
-- skynet.sleep(200)
-- end
-- end
local function __init__()
skynet.dispatch("lua", function (_, _, command, ...)
local f = CMD[command]
if command == "open" then
skynet.ret(skynet.pack(f(...)))
else
local collect, doc = ...
cs(function() f(collect, doc) end)
end
end)
cs = queue()
-- skynet.fork(__loop__)
skynet.register "LOGD"
end
skynet.start(__init__)