logd.lua 2.7 KB
local skynet = require "skynet"
local mongo = require "mongo"
local queue = require "skynet.queue"
local bson = require "bson"
local socketdriver = require "socketdriver"

local serverId = tonumber(skynet.getenv("servId"))

require "shared.init"
require "skynet.manager"

local table_insert = table.insert
local pairs = pairs
local ipairs = ipairs
local string_format = string.format

local CMD, cache, client, cs = {}, {}
local auto = {}

local rsyslog_fd

-- function getNextSequence(collect)
-- 	local index = auto[collect]
-- 	if not index then
-- 		local res = client.counters:findAndModify({
-- 			query = {},
-- 			update = {["$inc"] = {[collect] = 1}},
-- 			upsert = true,
-- 			new = true,
-- 		})
-- 		index = res.value[collect]
-- 	else
-- 		index = index + 1
-- 	end
-- 	auto[collect] = index
-- 	return index
-- end

local dateTypes = {"timestamp", "createTime", "lastLoginTime"}
local stringTypes = {"s1", "s2", "s3"}
local intTypes = {"int1", "int2", "int3", "int4"}
function CMD.log(collect, doc)
	-- write to rsyslog
	local now = skynet.timex()
	doc["timestamp"] = now
	doc["server"] = serverId

	for _, field in pairs(stringTypes) do
		if doc[field] then
			doc[field] = tostring(doc[field])
		end
	end
	for _, field in pairs(intTypes) do
		if doc[field] then
			doc[field] = tonumber(doc[field])
		end
	end
	for _, field in pairs(dateTypes) do
		if doc[field] then
			doc[field .. "_t"] = tonumber(os.date("%Y%m%d%H%M%S", doc[field]))
		end
	end
	-- Local-6 + Info 
	socketdriver.send(rsyslog_fd, string.format("<182>%s: %s\n", collect, json.encode(doc)))

	-- if not cache[collect] then cache[collect] = {} end
	-- doc["timestamp"] = now
	-- doc["_id"] = getNextSequence(collect)
	-- table_insert(cache[collect], doc)
end

function CMD.open(conf)
	rsyslog_fd = socketdriver.connect("127.0.0.1", 514)
	socketdriver.start(rsyslog_fd)

	-- local db = mongo.client {
	-- 	host = conf.mongohost,
	-- 	port = conf.mongoport or 27017,
	-- 	username = conf.mongouser,
	-- 	password = conf.mongopswd,
	-- }

	-- assert(db, "mongo connect error")

	-- local servId = skynet.getenv "servId"
	-- client = db["s"..servId]
end

-- local function __loop__()
-- 	while true do
-- 		cs(function ()
-- 			for col, docs in pairs(cache) do
-- 				client[col]:batch_insert(docs)
-- 				client.counters:update({}, {["$set"]={[col]=auto[col]}})
-- 			end
-- 			cache = {}
-- 		end)
-- 		skynet.sleep(200)
-- 	end
-- end

local function __init__()
	skynet.dispatch("lua", function (_, _, command, ...)
		local f = CMD[command]
		if command == "open" then
			skynet.ret(skynet.pack(f(...)))
		else
			local collect, doc = ...
			cs(function() f(collect, doc) end)
		end
	end)
	cs = queue()

	-- skynet.fork(__loop__)
	skynet.register "LOGD"
end

skynet.start(__init__)