Blame view

src/services/logd.lua 3.12 KB
314bc5df   zhengshouren   提交服务器初始代码
1
  local skynet = require "skynet"
314bc5df   zhengshouren   提交服务器初始代码
2
3
  local queue = require "skynet.queue"
  local bson = require "bson"
df883a11   zhouhaihai   日志处理
4
  local socketdriver = require "skynet.socketdriver"
314bc5df   zhengshouren   提交服务器初始代码
5
6
7
8
9
  local serverId = tonumber(skynet.getenv("servId"))
  
  require "shared.init"
  require "skynet.manager"
  
3f604f2e   zhouhaihai   扩容 redis 和 log服务
10
  local logdIdx = ...
314bc5df   zhengshouren   提交服务器初始代码
11
12
13
14
15
  local table_insert = table.insert
  local pairs = pairs
  local ipairs = ipairs
  local string_format = string.format
  
f22a33af   zhouhaihai   自己的日志
16
  local logId = 0
df883a11   zhouhaihai   日志处理
17
  local CMD, cs = {}
3f604f2e   zhouhaihai   扩容 redis 和 log服务
18
  local prefix = "wasteland" .. logdIdx .. "S" .. serverId ..  "C" 
f22a33af   zhouhaihai   自己的日志
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
  
  local logHandle = {
  	bi = {
  		host = "127.0.0.1",
  		port = 13001,
  	},
  	log = {
  		host = "127.0.0.1",
  		port = 13002,
  		handle = function(doc)
  			-- 注入字段
  			local now = skynet.timex()
  			doc["time"] = now
  			doc["timestamp"] = now
  			doc["server"] = serverId
  			doc["game_name"] = "wasteland"
  			doc["env"] = "cb"
  			doc["game_name_type"] = "guaji"
3f604f2e   zhouhaihai   扩容 redis 和 log服务
37
  			doc["log_id"] = prefix .. logId .. "T" ..  now
f22a33af   zhouhaihai   自己的日志
38
39
40
41
42
43
44
45
46
47
  			logId = (logId + 1) % 10000000
  		end
  	},
  }
  
  local connect_relation = {}
  local function getConInfo(fd)
  	return logHandle[connect_relation[fd] or ""]
  end
  
df883a11   zhouhaihai   日志处理
48
49
50
51
52
53
54
55
56
57
58
59
  
  
  local socket_message = {}
  -- read skynet_socket.h for these macro
  -- SKYNET_SOCKET_TYPE_DATA = 1
  socket_message[1] = function(id, size, data)
  	skynet.error(string.format("LOG SOCKET: data: ", skynet.tostring(data, size)))
  	socketdriver.drop(data, size)
  end
  
  -- SKYNET_SOCKET_TYPE_CONNECT = 2
  socket_message[2] = function(id, _ , addr)
f22a33af   zhouhaihai   自己的日志
60
  	local cur = getConInfo(id)
df883a11   zhouhaihai   日志处理
61
  	skynet.error("LOG SOCKET: connect: ", addr)
f22a33af   zhouhaihai   自己的日志
62
63
  	cur.connected = true
  	cur.connecting = false
df883a11   zhouhaihai   日志处理
64
65
66
67
68
  end
  
  -- SKYNET_SOCKET_TYPE_CLOSE = 3
  socket_message[3] = function(id)
  	skynet.error("LOG SOCKET: closed")
f22a33af   zhouhaihai   自己的日志
69
70
71
72
73
  	local cur = getConInfo(id)
  	if not cur then return end
  	cur.connected = false
  	cur.connecting = false
  	connect_relation[id] = nil
df883a11   zhouhaihai   日志处理
74
75
76
77
78
  end
  
  -- SKYNET_SOCKET_TYPE_ERROR = 5
  socket_message[5] = function(id, _, err)
  	skynet.error("LOG SOCKET: error: ", err)
f22a33af   zhouhaihai   自己的日志
79
80
81
82
83
  	local cur = getConInfo(id)
  	if not cur then return end
  	cur.connected = false
  	cur.connecting = false
  	connect_relation[id] = nil
df883a11   zhouhaihai   日志处理
84
  end
314bc5df   zhengshouren   提交服务器初始代码
85
  
df883a11   zhouhaihai   日志处理
86
87
88
89
90
91
92
93
  
  skynet.register_protocol {
  	name = "socket",
  	id = skynet.PTYPE_SOCKET,	-- PTYPE_SOCKET = 6
  	unpack = socketdriver.unpack,
  	dispatch = function (_, _, t, ...)
  		if socket_message[t] then
  			socket_message[t](...)
314bc5df   zhengshouren   提交服务器初始代码
94
95
  		end
  	end
df883a11   zhouhaihai   日志处理
96
97
  }
  
f22a33af   zhouhaihai   自己的日志
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
  function CMD.log(doc, logTo)
  	logTo = logTo or "log"
  	local cur = logHandle[logTo] 
  	if not cur then
  		print("error log to ", logTo)
  	end
  	if cur.handle then
  		cur.handle(doc)
  	end
  	if cur.connected and cur.fd then
  		socketdriver.send(cur.fd, json.encode(doc) .. "\n")
  	else
  		-- 断线会丢失一部分日志
  		if not cur.connecting then
  			CMD.open() -- 连一下
314bc5df   zhengshouren   提交服务器初始代码
113
114
  		end
  	end
314bc5df   zhengshouren   提交服务器初始代码
115
116
  end
  
df883a11   zhouhaihai   日志处理
117
  function CMD.open()
f22a33af   zhouhaihai   自己的日志
118
119
120
121
122
123
124
  	for logTo, data in pairs(logHandle) do
  		if not data.connecting  and not data.connected then
  			data.connecting = true
  			data.fd = socketdriver.connect(data.host, data.port)
  			connect_relation[data.fd] = logTo
  		end
  	end
314bc5df   zhengshouren   提交服务器初始代码
125
126
  end
  
314bc5df   zhengshouren   提交服务器初始代码
127
  local function __init__()
df883a11   zhouhaihai   日志处理
128
  	skynet.dispatch("lua", function (session, address, command, ...)
314bc5df   zhengshouren   提交服务器初始代码
129
130
131
132
  		local f = CMD[command]
  		if command == "open" then
  			skynet.ret(skynet.pack(f(...)))
  		else
f22a33af   zhouhaihai   自己的日志
133
134
  			local doc, logTo = ...
  			cs(function() f(doc, logTo) end)
314bc5df   zhengshouren   提交服务器初始代码
135
136
137
138
  		end
  	end)
  	cs = queue()
  
3f604f2e   zhouhaihai   扩容 redis 和 log服务
139
  	skynet.register(".logd" .. logdIdx)
314bc5df   zhengshouren   提交服务器初始代码
140
141
142
  end
  
  skynet.start(__init__)