local skynet = require "skynet" local mongo = require "mongo" local queue = require "skynet.queue" local bson = require "bson" local socketdriver = require "socketdriver" local serverId = tonumber(skynet.getenv("servId")) require "shared.init" require "skynet.manager" local table_insert = table.insert local pairs = pairs local ipairs = ipairs local string_format = string.format local CMD, cache, client, cs = {}, {} local auto = {} local rsyslog_fd -- function getNextSequence(collect) -- local index = auto[collect] -- if not index then -- local res = client.counters:findAndModify({ -- query = {}, -- update = {["$inc"] = {[collect] = 1}}, -- upsert = true, -- new = true, -- }) -- index = res.value[collect] -- else -- index = index + 1 -- end -- auto[collect] = index -- return index -- end local dateTypes = {"timestamp", "createTime", "lastLoginTime"} local stringTypes = {"s1", "s2", "s3"} local intTypes = {"int1", "int2", "int3", "int4"} function CMD.log(collect, doc) -- write to rsyslog local now = skynet.timex() doc["timestamp"] = now doc["server"] = serverId for _, field in pairs(stringTypes) do if doc[field] then doc[field] = tostring(doc[field]) end end for _, field in pairs(intTypes) do if doc[field] then doc[field] = tonumber(doc[field]) end end for _, field in pairs(dateTypes) do if doc[field] then doc[field .. "_t"] = tonumber(os.date("%Y%m%d%H%M%S", doc[field])) end end -- Local-6 + Info socketdriver.send(rsyslog_fd, string.format("<182>%s: %s\n", collect, json.encode(doc))) -- if not cache[collect] then cache[collect] = {} end -- doc["timestamp"] = now -- doc["_id"] = getNextSequence(collect) -- table_insert(cache[collect], doc) end function CMD.open(conf) rsyslog_fd = socketdriver.connect("127.0.0.1", 514) socketdriver.start(rsyslog_fd) -- local db = mongo.client { -- host = conf.mongohost, -- port = conf.mongoport or 27017, -- username = conf.mongouser, -- password = conf.mongopswd, -- } -- assert(db, "mongo connect error") -- local servId = skynet.getenv "servId" -- client = db["s"..servId] end -- local function __loop__() -- while true do -- cs(function () -- for col, docs in pairs(cache) do -- client[col]:batch_insert(docs) -- client.counters:update({}, {["$set"]={[col]=auto[col]}}) -- end -- cache = {} -- end) -- skynet.sleep(200) -- end -- end local function __init__() skynet.dispatch("lua", function (_, _, command, ...) local f = CMD[command] if command == "open" then skynet.ret(skynet.pack(f(...))) else local collect, doc = ... cs(function() f(collect, doc) end) end end) cs = queue() -- skynet.fork(__loop__) skynet.register "LOGD" end skynet.start(__init__)