Commit df883a114d53e40339eded5e5d209566565812ce

Authored by zhouhaihai
1 parent cfeb432d

日志处理

Showing 2 changed files with 228 additions and 126 deletions   Show diff stats
src/models/RoleLog.lua
1 1
2 -  
3 --- logType 日志 mapping 信息(确定后不能更改类型) 修改需要设置 es 日志mapping  
4 -  
5 -  
6 - 2 +-- logType
7 local LogType = { 3 local LogType = {
8 - create = {  
9 - ip = "string"  
10 - },  
11 - login = {  
12 - ip = "string"  
13 - },  
14 - logout = {  
15 - online = "number"  
16 - },  
17 - 4 + create = "common",
  5 + login = "common",
  6 + logout = "common",
18 } 7 }
19 8
  9 +-- 如要修改 要提前修改 _template mapping -- 对应 mapping 为 gamelog-*
  10 +local Mapping = {
  11 + -- 预留一些数据格式
  12 + common = {
  13 + desc = "keyword",--索引的短字符串
  14 + ucode = "keyword",--关联日志对应ucode
  15 + key1 = "keyword", --可索引的短字符串
  16 + key2 = "keyword", --可索引的短字符串
  17 + -- 几乎不用的长文本
  18 + text1 = "text", --长字符串不索引的类型
  19 + -- 五个不同类型的数字 基本上满足数量要求 尽量从低到高用
  20 + short1 = "short",
  21 + int1 = "integer",
  22 + int2 = "integer",
  23 + long1 = "long",
  24 + float1 = "float",
  25 + }
  26 +}
20 27
21 --- 所有的日志都包括的部分 role 里面的信息  
22 -local commonField = {  
23 - name = "string",  
24 - id = "number",  
25 - uid = "string",  
26 - sId = "number",  
27 - device = "string",  
28 - ctime = "number",  
29 - ltime = "number",  
30 - level = "number",  
31 - rmbC = "number", 28 +-- 所有的日志都包括的部分 role 里面的信息 -- mapping 信息在 gamelog-role
  29 +local commonRoleField = {
  30 + name = "keyword",
  31 + id = "integer",
  32 + uid = "keyword",
  33 + sid = "short",
  34 + device = "keyword",
  35 + ctime = "integer",
  36 + ltime = "integer",
  37 + level = "short",
  38 + rmbC = "integer",
32 } 39 }
33 40
34 -local RoleLog = {}  
35 41
36 -function RoleLog.bind(Role) 42 +local function checkType(logType, field, value, ctype)
  43 + local typecheckfunc = {
  44 + keyword = function()
  45 + --长度不超过256
  46 + if type(value) ~= "string" then
  47 + value = tostring(value)
  48 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [keyword]", logType, field))
  49 + else
  50 + if #value > 256 then
  51 + print(string.format("LOG ERROR: logType [%s] field [%s] [keyword] type to long.", logType, field))
  52 + end
  53 + end
  54 + return value
  55 + end,
  56 + text = function()
  57 + if type(value) ~= "string" then
  58 + value = tostring(value)
  59 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [text]", logType, field))
  60 + end
  61 + return value
  62 + end,
  63 + integer = function()
  64 + if type(value) ~= "number" then
  65 + value = tonumber(value)
  66 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [integer]", logType, field))
  67 + end
  68 + if value then
  69 + if math.type(value) ~= "integer" then
  70 + local oldValue = value
  71 + value = math.floor(value)
  72 + if value ~= oldValue then
  73 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [integer], is float", logType, field))
  74 + end
  75 + end
  76 + if -2147483648 > value or value > 2147483647 then
  77 + value = nil
  78 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [integer], too big", logType, field))
  79 + end
  80 + end
  81 + return value
  82 + end,
  83 + short = function()
  84 + if type(value) ~= "number" then
  85 + value = tonumber(value)
  86 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [short]", logType, field))
  87 + end
  88 + if value then
  89 + if math.type(value) ~= "integer" then
  90 + local oldValue = value
  91 + value = math.floor(value)
  92 + if value ~= oldValue then
  93 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [short], is float", logType, field))
  94 + end
  95 + end
  96 +
  97 + if -32768 > value or value > 32768 then
  98 + value = nil
  99 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [short], too big", logType, field))
  100 + end
  101 + end
  102 + return value
  103 + end,
  104 + long = function()
  105 + if type(value) ~= "number" then
  106 + value = tonumber(value)
  107 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [long]", logType, field))
  108 + end
  109 + if value then
  110 + if math.type(value) ~= "integer" then
  111 + local oldValue = value
  112 + value = math.floor(value)
  113 + if type(value) ~= "integer" then
  114 + value = nil
  115 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [long], too big", logType, field))
  116 + elseif value ~= oldValue then
  117 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [long], is float", logType, field))
  118 + end
  119 + end
  120 + end
  121 + return value
  122 + end,
  123 + float = function()
  124 + if type(value) ~= "number" then
  125 + value = tonumber(value)
  126 + print(string.format("LOG ERROR: logType [%s] field [%s] isn't [float]", logType, field))
  127 + end
  128 + return value
  129 + end,
  130 + }
  131 +
  132 + if typecheckfunc[ctype] then
  133 + return typecheckfunc[ctype]()
  134 + else
  135 + print(string.format("LOG ERROR: logType [%s] field [%s] have a new type [%s] need add check.", logType, field, ctype))
  136 + return nil
  137 + end
  138 +end
37 139
  140 +local RoleLog = {}
  141 +function RoleLog.bind(Role)
38 function Role:log(logType, contents) 142 function Role:log(logType, contents)
39 -  
40 - local _logType = LogType[logType]  
41 - if not _logType then return end  
42 -  
43 -  
44 - if not logd then return end  
45 -  
46 contents = contents or {} 143 contents = contents or {}
  144 + local _logType = LogType[logType]
  145 + if not _logType then
  146 + print(string.format("LOG ERROR: new logType [%s] need Add Maping.", logType))
  147 + return
  148 + end
47 local doc = {} 149 local doc = {}
48 - for field, _typ in pairs(commonField) do  
49 - doc[field] = self:getProperty(field) 150 + for field, ctype in pairs(commonRoleField) do
  151 + if contents[field] then
  152 + print(string.format("LOG ERROR: logType [%s] had field [%s] overwrite default.", logType, field))
  153 + end
  154 + doc[field] = checkType("commonRoleField", field, self:getProperty(field), ctype)
50 end 155 end
  156 +
  157 + local mapping = Mapping[_logType]
  158 +
51 for field, value in pairs(contents) do 159 for field, value in pairs(contents) do
52 - if _logType[field] then  
53 - doc[field] = value 160 + local ftype = mapping[field]
  161 + if ftype then
  162 + doc[field] = checkType(logType, field, value, ftype)
54 else 163 else
55 - print(string.format("LOG ERROR: %s had new field %s no type.", logType, field)) 164 + print(string.format("LOG ERROR: logType [%s] have new field [%s] no type in mapping.", logType, field))
56 end 165 end
57 end 166 end
58 -  
59 - doc.mid = doc.uid:sub(-2, -1)  
60 - pcall(skynet.send, logd, "lua", "log", logType, doc) 167 + if not logd then return end
  168 + pcall(skynet.send, logd, "lua", "log", logType, doc, _logType)
61 end 169 end
62 -  
63 end 170 end
64 -  
65 return RoleLog 171 return RoleLog
66 \ No newline at end of file 172 \ No newline at end of file
src/services/logd.lua
1 local skynet = require "skynet" 1 local skynet = require "skynet"
2 -local mongo = require "mongo"  
3 local queue = require "skynet.queue" 2 local queue = require "skynet.queue"
4 local bson = require "bson" 3 local bson = require "bson"
5 -local socketdriver = require "socketdriver" 4 +local socketdriver = require "skynet.socketdriver"
6 5
7 local serverId = tonumber(skynet.getenv("servId")) 6 local serverId = tonumber(skynet.getenv("servId"))
8 7
@@ -14,105 +13,102 @@ local pairs = pairs @@ -14,105 +13,102 @@ local pairs = pairs
14 local ipairs = ipairs 13 local ipairs = ipairs
15 local string_format = string.format 14 local string_format = string.format
16 15
17 -local CMD, cache, client, cs = {}, {}  
18 -local auto = {}  
19 -  
20 -local rsyslog_fd  
21 -  
22 --- function getNextSequence(collect)  
23 --- local index = auto[collect]  
24 --- if not index then  
25 --- local res = client.counters:findAndModify({  
26 --- query = {},  
27 --- update = {["$inc"] = {[collect] = 1}},  
28 --- upsert = true,  
29 --- new = true,  
30 --- })  
31 --- index = res.value[collect]  
32 --- else  
33 --- index = index + 1  
34 --- end  
35 --- auto[collect] = index  
36 --- return index  
37 --- end  
38 -  
39 -local dateTypes = {"timestamp", "createTime", "lastLoginTime"}  
40 -local stringTypes = {"s1", "s2", "s3"}  
41 -local intTypes = {"int1", "int2", "int3", "int4"}  
42 -function CMD.log(collect, doc)  
43 - -- write to rsyslog  
44 - local now = skynet.timex()  
45 - doc["timestamp"] = now  
46 - doc["server"] = serverId 16 +local CMD, cs = {}
  17 +local log_fd, connecting = nil , false
  18 +
  19 +
  20 +local socket_message = {}
  21 +-- read skynet_socket.h for these macro
  22 +-- SKYNET_SOCKET_TYPE_DATA = 1
  23 +socket_message[1] = function(id, size, data)
  24 + skynet.error(string.format("LOG SOCKET: data: ", skynet.tostring(data, size)))
  25 + socketdriver.drop(data, size)
  26 +end
  27 +
  28 +-- SKYNET_SOCKET_TYPE_CONNECT = 2
  29 +socket_message[2] = function(id, _ , addr)
  30 + skynet.error("LOG SOCKET: connect: ", addr)
  31 + connecting = false
  32 +end
  33 +
  34 +-- SKYNET_SOCKET_TYPE_CLOSE = 3
  35 +socket_message[3] = function(id)
  36 + skynet.error("LOG SOCKET: closed")
  37 + connecting = false
  38 +end
  39 +
  40 +-- SKYNET_SOCKET_TYPE_ERROR = 5
  41 +socket_message[5] = function(id, _, err)
  42 + skynet.error("LOG SOCKET: error: ", err)
  43 + connecting = false
  44 +end
47 45
48 - for _, field in pairs(stringTypes) do  
49 - if doc[field] then  
50 - doc[field] = tostring(doc[field]) 46 +
  47 +skynet.register_protocol {
  48 + name = "socket",
  49 + id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
  50 + unpack = socketdriver.unpack,
  51 + dispatch = function (_, _, t, ...)
  52 + if socket_message[t] then
  53 + socket_message[t](...)
51 end 54 end
52 end 55 end
53 - for _, field in pairs(intTypes) do  
54 - if doc[field] then  
55 - doc[field] = tonumber(doc[field]) 56 +}
  57 +
  58 +
  59 +
  60 +-- 日志 index 不包含 日期的 index_suffix
  61 +local IndexNoDate = {
  62 + online = true,
  63 +}
  64 +-- 不走 role log 的日志都要自行注意 mapping 设置【重要】
  65 +-- index_suffix index 后缀 默认为 common
  66 +function CMD.log(logType, doc, index_suffix)
  67 + index_suffix = index_suffix or "common"
  68 + if index_suffix == "common" then
  69 + doc["@type"] = logType
  70 + else
  71 + if logType ~= index_suffix then -- 定制后缀 不一定有type 不相等时才有type
  72 + doc["@type"] = logType
56 end 73 end
57 end 74 end
58 - for _, field in pairs(dateTypes) do  
59 - if doc[field] then  
60 - doc[field .. "_t"] = tonumber(os.date("%Y%m%d%H%M%S", doc[field])) 75 +
  76 + local now = skynet.timex()
  77 + doc["timestamp"] = now
  78 + doc["@timestamp"] = os.date("%Y-%m-%d %H:%M:%S", now)
  79 + doc["server"] = serverId
  80 +
  81 + -- 自己加好 index
  82 + if IndexNoDate[index_suffix] then
  83 + doc["@index"] = string.format("gamelog-%s", index_suffix)
  84 + else
  85 + doc["@index"] = string.format("gamelog-%s-%s", os.date("%Y%m%d", now), index_suffix)
  86 + end
  87 + if not socketdriver.send(log_fd, json.encode(doc) .. "\n") then
  88 + if not connecting then
  89 + CMD.open() -- 连一下试试
61 end 90 end
62 end 91 end
63 - -- Local-6 + Info  
64 - socketdriver.send(rsyslog_fd, string.format("<182>%s: %s\n", collect, json.encode(doc)))  
65 -  
66 - -- if not cache[collect] then cache[collect] = {} end  
67 - -- doc["timestamp"] = now  
68 - -- doc["_id"] = getNextSequence(collect)  
69 - -- table_insert(cache[collect], doc)  
70 end 92 end
71 93
72 -function CMD.open(conf)  
73 - rsyslog_fd = socketdriver.connect("127.0.0.1", 514)  
74 - socketdriver.start(rsyslog_fd)  
75 -  
76 - -- local db = mongo.client {  
77 - -- host = conf.mongohost,  
78 - -- port = conf.mongoport or 27017,  
79 - -- username = conf.mongouser,  
80 - -- password = conf.mongopswd,  
81 - -- }  
82 -  
83 - -- assert(db, "mongo connect error")  
84 -  
85 - -- local servId = skynet.getenv "servId"  
86 - -- client = db["s"..servId] 94 +function CMD.open()
  95 + log_fd = socketdriver.connect("127.0.0.1", 5170)
  96 + connecting = true
87 end 97 end
88 98
89 --- local function __loop__()  
90 --- while true do  
91 --- cs(function ()  
92 --- for col, docs in pairs(cache) do  
93 --- client[col]:batch_insert(docs)  
94 --- client.counters:update({}, {["$set"]={[col]=auto[col]}})  
95 --- end  
96 --- cache = {}  
97 --- end)  
98 --- skynet.sleep(200)  
99 --- end  
100 --- end  
101 -  
102 local function __init__() 99 local function __init__()
103 - skynet.dispatch("lua", function (_, _, command, ...) 100 + skynet.dispatch("lua", function (session, address, command, ...)
104 local f = CMD[command] 101 local f = CMD[command]
105 if command == "open" then 102 if command == "open" then
106 skynet.ret(skynet.pack(f(...))) 103 skynet.ret(skynet.pack(f(...)))
107 else 104 else
108 - local collect, doc = ...  
109 - cs(function() f(collect, doc) end) 105 + local logType, doc, index_suffix = ...
  106 + cs(function() f(logType, doc, index_suffix) end)
110 end 107 end
111 end) 108 end)
112 cs = queue() 109 cs = queue()
113 110
114 - -- skynet.fork(__loop__)  
115 - skynet.register "LOGD" 111 + skynet.register(".LOGD")
116 end 112 end
117 113
118 skynet.start(__init__) 114 skynet.start(__init__)