logd.lua
3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
local skynet = require "skynet"
local queue = require "skynet.queue"
local bson = require "bson"
local socketdriver = require "skynet.socketdriver"
local serverId = tonumber(skynet.getenv("servId"))
require "shared.init"
require "skynet.manager"
local logdIdx = ...
local table_insert = table.insert
local pairs = pairs
local ipairs = ipairs
local string_format = string.format
local logId = 0
local CMD, cs = {}
local prefix = "wasteland" .. logdIdx .. "S" .. serverId .. "C"
local logHandle = {
bi = {
host = "172.16.83.194",
port = 14001,
},
log = {
host = "172.16.83.194",
port = 14002,
handle = function(doc)
-- 注入字段
local now = skynet.timex()
doc["time"] = now
doc["timestamp"] = now
doc["@timestamp"] = os.date("%Y-%m-%dT%H:%M:%S", now - 8*3600)
doc["server"] = serverId
doc["game_name"] = "wasteland"
doc["env"] = "cb"
doc["game_name_type"] = "guaji"
doc["log_id"] = prefix .. logId .. "T" .. now
logId = (logId + 1) % 10000000
end
},
}
local connect_relation = {}
local function getConInfo(fd)
return logHandle[connect_relation[fd] or ""]
end
local socket_message = {}
-- read skynet_socket.h for these macro
-- SKYNET_SOCKET_TYPE_DATA = 1
socket_message[1] = function(id, size, data)
skynet.error(string.format("LOG SOCKET: data: ", skynet.tostring(data, size)))
socketdriver.drop(data, size)
end
-- SKYNET_SOCKET_TYPE_CONNECT = 2
socket_message[2] = function(id, _ , addr)
local cur = getConInfo(id)
skynet.error("LOG SOCKET: connect: ", addr)
cur.connected = true
cur.connecting = false
end
-- SKYNET_SOCKET_TYPE_CLOSE = 3
socket_message[3] = function(id)
skynet.error("LOG SOCKET: closed")
local cur = getConInfo(id)
if not cur then return end
cur.connected = false
cur.connecting = false
connect_relation[id] = nil
end
-- SKYNET_SOCKET_TYPE_ERROR = 5
socket_message[5] = function(id, _, err)
skynet.error("LOG SOCKET: error: ", err)
local cur = getConInfo(id)
if not cur then return end
cur.connected = false
cur.connecting = false
connect_relation[id] = nil
end
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = socketdriver.unpack,
dispatch = function (_, _, t, ...)
if socket_message[t] then
socket_message[t](...)
end
end
}
function CMD.log(doc, logTo)
logTo = logTo or "log"
local cur = logHandle[logTo]
if not cur then
print("error log to ", logTo)
end
if cur.handle then
cur.handle(doc)
end
if cur.connected and cur.fd then
socketdriver.send(cur.fd, json.encode(doc) .. "\n")
else
-- 断线会丢失一部分日志
if not cur.connecting then
CMD.open() -- 连一下
end
end
end
function CMD.open()
for logTo, data in pairs(logHandle) do
if not data.connecting and not data.connected then
data.connecting = true
data.fd = socketdriver.connect(data.host, data.port)
connect_relation[data.fd] = logTo
end
end
end
local function __init__()
skynet.dispatch("lua", function (session, address, command, ...)
local f = CMD[command]
if command == "open" then
skynet.ret(skynet.pack(f(...)))
else
local doc, logTo = ...
cs(function() f(doc, logTo) end)
end
end)
cs = queue()
skynet.register(".logd" .. logdIdx)
end
skynet.start(__init__)