dbseed.lua 9.81 KB
require "shared.init"
require "utils.init"
require "GlobalVar"
require "RedisKeys"
require "ProtocolCode"
require "skynet.manager"
require "utils.MysqlUtil"

skynet = require "skynet"

redisproxy = require("shared.redisproxy")
mysqlproxy = require "shared.mysqlproxy"

SendPacket = function ( ... ) end

local function initSeaportTask()
	local keys = {[SEAPORT_TRADE_TASK_1] = "seaport_task_1", [SEAPORT_TRADE_TASK_2] = "seaport_task_2"}

	for key, tb_name in pairs(keys) do
		local res = mysqlproxy:query(string.format("SELECT `id`,`value` FROM %s;", tb_name))
		for _, v in pairs(res) do
			redisproxy:hset(key, v.id, v.value)
		end
	end
end

local function initRedisDb( ... )
	local function initAutoIncrementUid(tbName, keyName, fieldName)
		if not fieldName then fieldName = "value" end
		local mysqlVal = getDbCfgVal(tbName, keyName, fieldName)
		if not mysqlVal then 
			skynet.error(string.format("get db cfg fail, table %s, key %s, field %s", tbName, keyName, fieldName))
			return
		end
		local redisVal = tonum(redisproxy:hget(tbName, keyName))
		if redisVal < mysqlVal or redisVal == 0 then
			redisproxy:hset(tbName, keyName, mysqlVal)
		end
	end

	initAutoIncrementUid("autoincrement_set", "role")
	initAutoIncrementUid("autoincrement_set", "union")
	initAutoIncrementUid("autoincrement_set", "order")
	initAutoIncrementUid("autoincrement_set", "email")
	initAutoIncrementUid("autoincrement_set", "emailTimestamp")
	initAutoIncrementUid("autoincrement_set", "delay_email")
	initAutoIncrementUid("autoincrement_set", "stopcreate")
	initAutoIncrementUid("autoincrement_set", "maintain")
	initAutoIncrementUid("autoincrement_set", "seaportTime0")
	initAutoIncrementUid("autoincrement_set", "seaportTime1")
	initAutoIncrementUid("autoincrement_set", "seaportTime2")

	redisproxy:hsetnx("adv_season", "idx", 0)
	redisproxy:hsetnx("adv_season", "chapter", globalCsv.adv_endless_default_chapter)
	redisproxy:hsetnx("adv_season", "overTime", 0)
end

-- 初始化服务器数据库以及服务器信息表
local function initServerDatabase()
	local servId = skynet.getenv("servId")
	mysqlproxy:query(string.format("CREATE DATABASE IF NOT EXISTS server_%s DEFAULT CHARSET = utf8mb4 COLLATE utf8mb4_general_ci;", servId))
	mysqlproxy:query(string.format("use server_%s", servId))

	-- 服务器信息表 开服时间
	mysqlproxy:query [[
		CREATE TABLE IF NOT EXISTS `server_info` (
			`key` varchar(45) NOT NULL,
			`int_value` int(11) DEFAULT NULL,
			`str_value` varchar(128) DEFAULT NULL,
			PRIMARY KEY (`key`)
			) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
	]]

	local res = mysqlproxy:query("SELECT * FROM `server_info` where `key` = 'server_start';")
	if not next(res) then
		mysqlproxy:query(string.format("INSERT INTO `server_info`(`key`, `str_value`) VALUES('server_start', '%s');",
			os.date("%Y%m%d", skynet.timex())))
	end
end

local function initAutoIncreUidTable()
	mysqlproxy:query [[
		CREATE TABLE IF NOT EXISTS `autoincrement_set` (
			`key` varchar(45) NOT NULL,
			`value` int(11) DEFAULT NULL,
			PRIMARY KEY (`key`)
			) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
	]]
	local servId = tonumber(skynet.getenv("servId"))
	if servId then
		local tpl = "INSERT INTO `autoincrement_set`(`key`, `value`) values('%s', %d)"
		mysqlproxy:query(string.format(tpl, "role", servId * MAX_ROLE_NUM))
		mysqlproxy:query(string.format(tpl, "union", servId * MAX_ROLE_NUM))
		mysqlproxy:query(string.format(tpl, "order", 0))
		mysqlproxy:query(string.format(tpl, "email", 0))
		mysqlproxy:query(string.format(tpl, "emailTimestamp", 0))
		mysqlproxy:query(string.format(tpl, "delay_email", 0))
		mysqlproxy:query(string.format(tpl, "stopcreate", 0))
		mysqlproxy:query(string.format(tpl, "maintain", 0))
		mysqlproxy:query(string.format(tpl, "seaportTime0", 0))
		mysqlproxy:query(string.format(tpl, "seaportTime1", 0))
		mysqlproxy:query(string.format(tpl, "seaportTime2", 0))

        -- 记录当前更新版本
		mysqlproxy:query(string.format(tpl, "db_ver", 0))
	end
end

local function initSeaportTable()
	-- 海港贸易任务
	mysqlproxy:query [[
		CREATE TABLE IF NOT EXISTS `seaport_task_1` (
			`id` int NOT NULL,
			`value` int(11) DEFAULT 0,
			PRIMARY KEY (`id`)
			) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
	]]
	mysqlproxy:query [[
		CREATE TABLE IF NOT EXISTS `seaport_task_2` (
			`id` int NOT NULL,
			`value` int(11) DEFAULT 0,
			PRIMARY KEY (`id`)
			) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
	]]
end

local function initAdvSeasonTable()
	--mysqlproxy:query [[
	--	CREATE TABLE IF NOT EXISTS `adv_season` (
	--		`key` varchar(45) NOT NULL,
	--		`value` int(11) DEFAULT NULL,
	--		PRIMARY KEY (`key`)
	--		) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
	--]]
	--local servId = tonumber(skynet.getenv("servId"))
	--if servId then
	--	local tpl = "INSERT INTO `adv_season`(`key`, `value`) values('%s', %d)"

	--	mysqlproxy:query(string.format(tpl, "idx", 0))
	--	mysqlproxy:query(string.format(tpl, "chapter", globalCsv.adv_endless_default_chapter))
	--	mysqlproxy:query(string.format(tpl, "overTime", 0))
	--end
end

local function checkRoleTables()
	local list = {"Role", "Daily", "Activity", "Diner", "Store", "Hero", "Rune", "Order", "Email", "Friend", "Spark"}
	for _, name in ipairs(list) do
		local obj = require("models."..name).new({key = "key"})
		print("check table [" .. name .. "] begin.")
		obj:checkTableSchema()
		print("check table [" .. name .. "] end.")
	end
end

local function createMysqlSp()
	mysqlproxy:query "DROP PROCEDURE IF EXISTS `add_friends`"
	mysqlproxy:query [[
		CREATE PROCEDURE `add_friends`(IN role_id bigint, IN friend_id bigint, IN add_time int)
		BEGIN
			DECLARE t_error INTEGER DEFAULT 0;    
			DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET t_error=1;    
    
			START TRANSACTION;    
				INSERT INTO `Friend`(`roleid`,`fid`,`addTime`) VALUES(role_id, friend_id, add_time);       
				INSERT INTO `Friend`(`roleid`,`fid`,`addTime`) VALUES(friend_id, role_id, add_time);       
    
			IF t_error = 1 THEN    
				ROLLBACK;    
			ELSE    
				COMMIT;    
			END IF;    
			select t_error; 
		END
	]]

	mysqlproxy:query "DROP PROCEDURE IF EXISTS `del_friends`"
	mysqlproxy:query [[
		CREATE PROCEDURE `del_friends`(IN role_id bigint, IN friend_id bigint)
		BEGIN
			DECLARE t_error INTEGER DEFAULT 0;    
			DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET t_error=1;    
    
			START TRANSACTION;    
				DELETE FROM `Friend` WHERE `roleid` = role_id AND `fid` = friend_id;       
				DELETE FROM `Friend` WHERE `roleid` = friend_id AND `fid` = role_id;       
    
			IF t_error = 1 THEN    
				ROLLBACK;    
			ELSE    
				COMMIT;    
			END IF;    
			select t_error; 
		END
	]]
end


local steps = {
	[1] = {
		handler = initServerDatabase,
		desc = "initialize server database "
	},
	[2] = {
		handler = initAutoIncreUidTable,
		desc = "initialize auto_increment_uid table "
	},
	[3] = {
		handler = initAdvSeasonTable,
		desc = "initialize adv_season table "
	},
	[4] = {
		handler = checkRoleTables,
		desc = "check role tables "
	},
	[5] = {
		handler = createMysqlSp,
		desc = "create mysql store procedure "
	},
	[6] = {
		handler = initSeaportTable,
		desc = "initialize seaport table "
	},
}

local function loadAllUserInfo()
	local maxId = 0
	local sql = "SELECT `id`, `uid`, `name` FROM `Role` WHERE `id` > %d ORDER BY `id` LIMIT 1000;"
	while true do
		local res = mysqlproxy:query(string.format(sql, maxId))
		if not next(res) then
			return
		else
			for _, info in ipairs(res) do
				if info["id"] > maxId then
					maxId = info["id"]
				end
				redisproxy:pipelining(function (red)
					local dbName = string.upper(info["name"])
					red:set(string.format("user:%s", dbName), info["id"])
					red:set(string.format("uid:%s", info["uid"]), dbName)
				end)
			end
		end
	end
end

local function selectDb()
	local work_count = tonumber(skynet.getenv("thread"))
	for i = 1, work_count do
		local mysqld = skynet.localname(".mysql" .. i)

		local servId = skynet.getenv("servId")
		local ok, result = pcall(skynet.call, mysqld, "lua", "query", string.format("use server_%s", servId))
		if not ok then
			skynet.error("select db error", "\n", debug.traceback(coroutine.running(), nil))
			return
		end
	end
end

local function doUpdateSql()
    local updates = {
        [1] = {
            "ALTER TABLE `Email` ADD INDEX roleid_createtime_id(`roleId`,`createtime`,`id`),DROP INDEX roleId_Index",
            "ALTER TABLE `Role` ADD INDEX uid_Index(`uid`)",
        },
        [2] = {
            "ALTER TABLE `Rune` MODIFY COLUMN `refer` bigint;"
        },
        [3] = {
            "ALTER TABLE `Email` MODIFY COLUMN `content` VARCHAR(2048);"
        },
    }
	-- 更新记录表
    local res = mysqlproxy:query("SELECT `value` FROM `autoincrement_set` WHERE `key` = 'db_ver';")
    local ver = res[1].value
    for k, sqls in ipairs(updates) do
        if ver < k then
            skynet.error("do update sql, version:"..k)
            for _, sql in ipairs(sqls) do
                local r = mysqlproxy:query(sql)
                if r["errno"] then
                    skynet.error(string.format("%s, err:%s",sql, r["err"]))
                    return
                end
            end
            mysqlproxy:query(string.format("UPDATE `autoincrement_set` SET `value` = %d WHERE `key` = 'db_ver';", k))
        end
    end
end

skynet.start(function ()
	--local new = redisproxy:hsetnx("autoincrement_set", "server_start", os.date("%Y%m%d", skynet.timex())) == 1
	--if not new then
	--	print("server has been initialized...")
	--	skynet.exit()
	--	return
	--end
	csvdb = require "shared.csvdata"
	globalCsv = csvdb["GlobalDefineCsv"]

	for _, action in ipairs(steps) do
		print(action.desc .. "start ...")
		action.handler()
		print(action.desc .. "finished ...")
	end
	initRedisDb()
	initSeaportTask()		-- 海港任务数据初始化
	loadAllUserInfo()
    doUpdateSql()       -- 执行更新sql
	selectDb()
	skynet.exit()
end)