logd.lua 3.19 KB
local skynet = require "skynet"
local queue = require "skynet.queue"
local bson = require "bson"
local socketdriver = require "skynet.socketdriver"
local serverId = tonumber(skynet.getenv("servId"))

require "shared.init"
require "skynet.manager"

local logdIdx = ...
local table_insert = table.insert
local pairs = pairs
local ipairs = ipairs
local string_format = string.format

local logId = 0
local CMD, cs = {}
local prefix = "wasteland" .. logdIdx .. "S" .. serverId ..  "C" 

local logHandle = {
	bi = {
		host = "172.16.83.194",
		port = 14001,
	},
	log = {
		host = "172.16.83.194",
		port = 14002,
		handle = function(doc)
			-- 注入字段
			local now = skynet.timex()
			doc["time"] = now
			doc["timestamp"] = now
			doc["@timestamp"] = os.date("%Y-%m-%dT%H:%M:%S", now - 8*3600)
			doc["server"] = serverId
			doc["game_name"] = "wasteland"
			doc["env"] = "cb"
			doc["game_name_type"] = "guaji"
			doc["log_id"] = prefix .. logId .. "T" ..  now
			logId = (logId + 1) % 10000000
		end
	},
}

local connect_relation = {}
local function getConInfo(fd)
	return logHandle[connect_relation[fd] or ""]
end



local socket_message = {}
-- read skynet_socket.h for these macro
-- SKYNET_SOCKET_TYPE_DATA = 1
socket_message[1] = function(id, size, data)
	skynet.error(string.format("LOG SOCKET: data: ", skynet.tostring(data, size)))
	socketdriver.drop(data, size)
end

-- SKYNET_SOCKET_TYPE_CONNECT = 2
socket_message[2] = function(id, _ , addr)
	local cur = getConInfo(id)
	skynet.error("LOG SOCKET: connect: ", addr)
	cur.connected = true
	cur.connecting = false
end

-- SKYNET_SOCKET_TYPE_CLOSE = 3
socket_message[3] = function(id)
	skynet.error("LOG SOCKET: closed")
	local cur = getConInfo(id)
	if not cur then return end
	cur.connected = false
	cur.connecting = false
	connect_relation[id] = nil
end

-- SKYNET_SOCKET_TYPE_ERROR = 5
socket_message[5] = function(id, _, err)
	skynet.error("LOG SOCKET: error: ", err)
	local cur = getConInfo(id)
	if not cur then return end
	cur.connected = false
	cur.connecting = false
	connect_relation[id] = nil
end


skynet.register_protocol {
	name = "socket",
	id = skynet.PTYPE_SOCKET,	-- PTYPE_SOCKET = 6
	unpack = socketdriver.unpack,
	dispatch = function (_, _, t, ...)
		if socket_message[t] then
			socket_message[t](...)
		end
	end
}

function CMD.log(doc, logTo)
	logTo = logTo or "log"
	local cur = logHandle[logTo] 
	if not cur then
		print("error log to ", logTo)
	end
	if cur.handle then
		cur.handle(doc)
	end
	if cur.connected and cur.fd then
		socketdriver.send(cur.fd, json.encode(doc) .. "\n")
	else
		-- 断线会丢失一部分日志
		if not cur.connecting then
			CMD.open() -- 连一下
		end
	end
end

function CMD.open()
	for logTo, data in pairs(logHandle) do
		if not data.connecting  and not data.connected then
			data.connecting = true
			data.fd = socketdriver.connect(data.host, data.port)
			connect_relation[data.fd] = logTo
		end
	end
end

local function __init__()
	skynet.dispatch("lua", function (session, address, command, ...)
		local f = CMD[command]
		if command == "open" then
			skynet.ret(skynet.pack(f(...)))
		else
			local doc, logTo = ...
			cs(function() f(doc, logTo) end)
		end
	end)
	cs = queue()

	skynet.register(".logd" .. logdIdx)
end

skynet.start(__init__)