Blame view

src/services/logd.lua 2.7 KB
314bc5df   zhengshouren   提交服务器初始代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
  local skynet = require "skynet"
  local mongo = require "mongo"
  local queue = require "skynet.queue"
  local bson = require "bson"
  local socketdriver = require "socketdriver"
  
  local serverId = tonumber(skynet.getenv("servId"))
  
  require "shared.init"
  require "skynet.manager"
  
  local table_insert = table.insert
  local pairs = pairs
  local ipairs = ipairs
  local string_format = string.format
  
  local CMD, cache, client, cs = {}, {}
  local auto = {}
  
  local rsyslog_fd
  
  -- function getNextSequence(collect)
  -- 	local index = auto[collect]
  -- 	if not index then
  -- 		local res = client.counters:findAndModify({
  -- 			query = {},
  -- 			update = {["$inc"] = {[collect] = 1}},
  -- 			upsert = true,
  -- 			new = true,
  -- 		})
  -- 		index = res.value[collect]
  -- 	else
  -- 		index = index + 1
  -- 	end
  -- 	auto[collect] = index
  -- 	return index
  -- end
  
  local dateTypes = {"timestamp", "createTime", "lastLoginTime"}
  local stringTypes = {"s1", "s2", "s3"}
  local intTypes = {"int1", "int2", "int3", "int4"}
  function CMD.log(collect, doc)
  	-- write to rsyslog
  	local now = skynet.timex()
  	doc["timestamp"] = now
  	doc["server"] = serverId
  
  	for _, field in pairs(stringTypes) do
  		if doc[field] then
  			doc[field] = tostring(doc[field])
  		end
  	end
  	for _, field in pairs(intTypes) do
  		if doc[field] then
  			doc[field] = tonumber(doc[field])
  		end
  	end
  	for _, field in pairs(dateTypes) do
  		if doc[field] then
  			doc[field .. "_t"] = tonumber(os.date("%Y%m%d%H%M%S", doc[field]))
  		end
  	end
  	-- Local-6 + Info 
  	socketdriver.send(rsyslog_fd, string.format("<182>%s: %s\n", collect, json.encode(doc)))
  
  	-- if not cache[collect] then cache[collect] = {} end
  	-- doc["timestamp"] = now
  	-- doc["_id"] = getNextSequence(collect)
  	-- table_insert(cache[collect], doc)
  end
  
  function CMD.open(conf)
  	rsyslog_fd = socketdriver.connect("127.0.0.1", 514)
  	socketdriver.start(rsyslog_fd)
  
  	-- local db = mongo.client {
  	-- 	host = conf.mongohost,
  	-- 	port = conf.mongoport or 27017,
  	-- 	username = conf.mongouser,
  	-- 	password = conf.mongopswd,
  	-- }
  
  	-- assert(db, "mongo connect error")
  
  	-- local servId = skynet.getenv "servId"
  	-- client = db["s"..servId]
  end
  
  -- local function __loop__()
  -- 	while true do
  -- 		cs(function ()
  -- 			for col, docs in pairs(cache) do
  -- 				client[col]:batch_insert(docs)
  -- 				client.counters:update({}, {["$set"]={[col]=auto[col]}})
  -- 			end
  -- 			cache = {}
  -- 		end)
  -- 		skynet.sleep(200)
  -- 	end
  -- end
  
  local function __init__()
  	skynet.dispatch("lua", function (_, _, command, ...)
  		local f = CMD[command]
  		if command == "open" then
  			skynet.ret(skynet.pack(f(...)))
  		else
  			local collect, doc = ...
  			cs(function() f(collect, doc) end)
  		end
  	end)
  	cs = queue()
  
  	-- skynet.fork(__loop__)
  	skynet.register "LOGD"
  end
  
  skynet.start(__init__)