diff --git a/src/models/RoleLog.lua b/src/models/RoleLog.lua index 606e960..fed641e 100644 --- a/src/models/RoleLog.lua +++ b/src/models/RoleLog.lua @@ -1,65 +1,171 @@ - --- logType 日志 mapping 信息(确定后不能更改类型) 修改需要设置 es 日志mapping - - - +-- logType local LogType = { - create = { - ip = "string" - }, - login = { - ip = "string" - }, - logout = { - online = "number" - }, - + create = "common", + login = "common", + logout = "common", } +-- 如要修改 要提前修改 _template mapping -- 对应 mapping 为 gamelog-* +local Mapping = { + -- 预留一些数据格式 + common = { + desc = "keyword",--索引的短字符串 + ucode = "keyword",--关联日志对应ucode + key1 = "keyword", --可索引的短字符串 + key2 = "keyword", --可索引的短字符串 + -- 几乎不用的长文本 + text1 = "text", --长字符串不索引的类型 + -- 五个不同类型的数字 基本上满足数量要求 尽量从低到高用 + short1 = "short", + int1 = "integer", + int2 = "integer", + long1 = "long", + float1 = "float", + } +} --- 所有的日志都包括的部分 role 里面的信息 -local commonField = { - name = "string", - id = "number", - uid = "string", - sId = "number", - device = "string", - ctime = "number", - ltime = "number", - level = "number", - rmbC = "number", +-- 所有的日志都包括的部分 role 里面的信息 -- mapping 信息在 gamelog-role +local commonRoleField = { + name = "keyword", + id = "integer", + uid = "keyword", + sid = "short", + device = "keyword", + ctime = "integer", + ltime = "integer", + level = "short", + rmbC = "integer", } -local RoleLog = {} -function RoleLog.bind(Role) +local function checkType(logType, field, value, ctype) + local typecheckfunc = { + keyword = function() + --长度不超过256 + if type(value) ~= "string" then + value = tostring(value) + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [keyword]", logType, field)) + else + if #value > 256 then + print(string.format("LOG ERROR: logType [%s] field [%s] [keyword] type to long.", logType, field)) + end + end + return value + end, + text = function() + if type(value) ~= "string" then + value = tostring(value) + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [text]", logType, field)) + end + return value + end, + integer = function() + if type(value) ~= "number" then + value = tonumber(value) + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [integer]", logType, field)) + end + if value then + if math.type(value) ~= "integer" then + local oldValue = value + value = math.floor(value) + if value ~= oldValue then + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [integer], is float", logType, field)) + end + end + if -2147483648 > value or value > 2147483647 then + value = nil + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [integer], too big", logType, field)) + end + end + return value + end, + short = function() + if type(value) ~= "number" then + value = tonumber(value) + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [short]", logType, field)) + end + if value then + if math.type(value) ~= "integer" then + local oldValue = value + value = math.floor(value) + if value ~= oldValue then + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [short], is float", logType, field)) + end + end + + if -32768 > value or value > 32768 then + value = nil + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [short], too big", logType, field)) + end + end + return value + end, + long = function() + if type(value) ~= "number" then + value = tonumber(value) + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [long]", logType, field)) + end + if value then + if math.type(value) ~= "integer" then + local oldValue = value + value = math.floor(value) + if type(value) ~= "integer" then + value = nil + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [long], too big", logType, field)) + elseif value ~= oldValue then + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [long], is float", logType, field)) + end + end + end + return value + end, + float = function() + if type(value) ~= "number" then + value = tonumber(value) + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [float]", logType, field)) + end + return value + end, + } + + if typecheckfunc[ctype] then + return typecheckfunc[ctype]() + else + print(string.format("LOG ERROR: logType [%s] field [%s] have a new type [%s] need add check.", logType, field, ctype)) + return nil + end +end +local RoleLog = {} +function RoleLog.bind(Role) function Role:log(logType, contents) - - local _logType = LogType[logType] - if not _logType then return end - - - if not logd then return end - contents = contents or {} + local _logType = LogType[logType] + if not _logType then + print(string.format("LOG ERROR: new logType [%s] need Add Maping.", logType)) + return + end local doc = {} - for field, _typ in pairs(commonField) do - doc[field] = self:getProperty(field) + for field, ctype in pairs(commonRoleField) do + if contents[field] then + print(string.format("LOG ERROR: logType [%s] had field [%s] overwrite default.", logType, field)) + end + doc[field] = checkType("commonRoleField", field, self:getProperty(field), ctype) end + + local mapping = Mapping[_logType] + for field, value in pairs(contents) do - if _logType[field] then - doc[field] = value + local ftype = mapping[field] + if ftype then + doc[field] = checkType(logType, field, value, ftype) else - print(string.format("LOG ERROR: %s had new field %s no type.", logType, field)) + print(string.format("LOG ERROR: logType [%s] have new field [%s] no type in mapping.", logType, field)) end end - - doc.mid = doc.uid:sub(-2, -1) - pcall(skynet.send, logd, "lua", "log", logType, doc) + if not logd then return end + pcall(skynet.send, logd, "lua", "log", logType, doc, _logType) end - end - return RoleLog \ No newline at end of file diff --git a/src/services/logd.lua b/src/services/logd.lua index 7b6885e..fd1a6c2 100644 --- a/src/services/logd.lua +++ b/src/services/logd.lua @@ -1,8 +1,7 @@ local skynet = require "skynet" -local mongo = require "mongo" local queue = require "skynet.queue" local bson = require "bson" -local socketdriver = require "socketdriver" +local socketdriver = require "skynet.socketdriver" local serverId = tonumber(skynet.getenv("servId")) @@ -14,105 +13,102 @@ local pairs = pairs local ipairs = ipairs local string_format = string.format -local CMD, cache, client, cs = {}, {} -local auto = {} - -local rsyslog_fd - --- function getNextSequence(collect) --- local index = auto[collect] --- if not index then --- local res = client.counters:findAndModify({ --- query = {}, --- update = {["$inc"] = {[collect] = 1}}, --- upsert = true, --- new = true, --- }) --- index = res.value[collect] --- else --- index = index + 1 --- end --- auto[collect] = index --- return index --- end - -local dateTypes = {"timestamp", "createTime", "lastLoginTime"} -local stringTypes = {"s1", "s2", "s3"} -local intTypes = {"int1", "int2", "int3", "int4"} -function CMD.log(collect, doc) - -- write to rsyslog - local now = skynet.timex() - doc["timestamp"] = now - doc["server"] = serverId +local CMD, cs = {} +local log_fd, connecting = nil , false + + +local socket_message = {} +-- read skynet_socket.h for these macro +-- SKYNET_SOCKET_TYPE_DATA = 1 +socket_message[1] = function(id, size, data) + skynet.error(string.format("LOG SOCKET: data: ", skynet.tostring(data, size))) + socketdriver.drop(data, size) +end + +-- SKYNET_SOCKET_TYPE_CONNECT = 2 +socket_message[2] = function(id, _ , addr) + skynet.error("LOG SOCKET: connect: ", addr) + connecting = false +end + +-- SKYNET_SOCKET_TYPE_CLOSE = 3 +socket_message[3] = function(id) + skynet.error("LOG SOCKET: closed") + connecting = false +end + +-- SKYNET_SOCKET_TYPE_ERROR = 5 +socket_message[5] = function(id, _, err) + skynet.error("LOG SOCKET: error: ", err) + connecting = false +end - for _, field in pairs(stringTypes) do - if doc[field] then - doc[field] = tostring(doc[field]) + +skynet.register_protocol { + name = "socket", + id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6 + unpack = socketdriver.unpack, + dispatch = function (_, _, t, ...) + if socket_message[t] then + socket_message[t](...) end end - for _, field in pairs(intTypes) do - if doc[field] then - doc[field] = tonumber(doc[field]) +} + + + +-- 日志 index 不包含 日期的 index_suffix +local IndexNoDate = { + online = true, +} +-- 不走 role log 的日志都要自行注意 mapping 设置【重要】 +-- index_suffix index 后缀 默认为 common +function CMD.log(logType, doc, index_suffix) + index_suffix = index_suffix or "common" + if index_suffix == "common" then + doc["@type"] = logType + else + if logType ~= index_suffix then -- 定制后缀 不一定有type 不相等时才有type + doc["@type"] = logType end end - for _, field in pairs(dateTypes) do - if doc[field] then - doc[field .. "_t"] = tonumber(os.date("%Y%m%d%H%M%S", doc[field])) + + local now = skynet.timex() + doc["timestamp"] = now + doc["@timestamp"] = os.date("%Y-%m-%d %H:%M:%S", now) + doc["server"] = serverId + + -- 自己加好 index + if IndexNoDate[index_suffix] then + doc["@index"] = string.format("gamelog-%s", index_suffix) + else + doc["@index"] = string.format("gamelog-%s-%s", os.date("%Y%m%d", now), index_suffix) + end + if not socketdriver.send(log_fd, json.encode(doc) .. "\n") then + if not connecting then + CMD.open() -- 连一下试试 end end - -- Local-6 + Info - socketdriver.send(rsyslog_fd, string.format("<182>%s: %s\n", collect, json.encode(doc))) - - -- if not cache[collect] then cache[collect] = {} end - -- doc["timestamp"] = now - -- doc["_id"] = getNextSequence(collect) - -- table_insert(cache[collect], doc) end -function CMD.open(conf) - rsyslog_fd = socketdriver.connect("127.0.0.1", 514) - socketdriver.start(rsyslog_fd) - - -- local db = mongo.client { - -- host = conf.mongohost, - -- port = conf.mongoport or 27017, - -- username = conf.mongouser, - -- password = conf.mongopswd, - -- } - - -- assert(db, "mongo connect error") - - -- local servId = skynet.getenv "servId" - -- client = db["s"..servId] +function CMD.open() + log_fd = socketdriver.connect("127.0.0.1", 5170) + connecting = true end --- local function __loop__() --- while true do --- cs(function () --- for col, docs in pairs(cache) do --- client[col]:batch_insert(docs) --- client.counters:update({}, {["$set"]={[col]=auto[col]}}) --- end --- cache = {} --- end) --- skynet.sleep(200) --- end --- end - local function __init__() - skynet.dispatch("lua", function (_, _, command, ...) + skynet.dispatch("lua", function (session, address, command, ...) local f = CMD[command] if command == "open" then skynet.ret(skynet.pack(f(...))) else - local collect, doc = ... - cs(function() f(collect, doc) end) + local logType, doc, index_suffix = ... + cs(function() f(logType, doc, index_suffix) end) end end) cs = queue() - -- skynet.fork(__loop__) - skynet.register "LOGD" + skynet.register(".LOGD") end skynet.start(__init__) -- libgit2 0.21.2