server.go 3.58 KB
package net

import (
	"fmt"
	"github.com/golang/protobuf/proto"
	"net"
	"plugin"
	"pro2d/conf"
	"pro2d/protos/pb"
	"pro2d/src/common"
	"pro2d/src/components/db"
	"pro2d/src/components/etcd"
	"pro2d/src/components/logger"
	"pro2d/src/components/timewheel"
	"pro2d/src/models"
	"sync"
	"time"
)

type ActionHandler func (msg *MsgPkg)  (int32, proto.Message)
var ActionMap map[pb.ProtoCode]ActionHandler
var TimeWheel *timewheel.TimeWheel

type Server struct {
	SConf      *conf.SConf
	Clients *sync.Map
	EtcdClient *etcd.EtcdClient

	TaskQueue []chan func()
}

func NewServer(sConf *conf.SConf) *Server {
	return &Server{
		SConf:      sConf,
		Clients:    new(sync.Map),
		EtcdClient: new(etcd.EtcdClient),

		TaskQueue: make([]chan func(), common.WorkerPoolSize),
	}
}

//StartWorkerPool 启动worker工作池
func (s *Server) StartWorkerPool() {
	//遍历需要启动worker的数量,依此启动
	for i := 0; i < s.SConf.WorkerPoolSize; i++ {
		//一个worker被启动
		//给当前worker对应的任务队列开辟空间
		s.TaskQueue[i] = make(chan func(),  common.MaxTaskPerWorker)
		//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
		go s.StartOneWorker(i, s.TaskQueue[i])
	}
}

//StartOneWorker 启动一个Worker工作流程
func (s *Server) StartOneWorker(workerID int, taskQueue chan func()) {
	//不断的等待队列中的消息
	for {
		select {
		//有消息则取出队列的Request,并执行绑定的业务方法
		case f:= <-taskQueue:
			_ = workerID
			f()
			//s.DoMsgHandler(request)
		}
	}
}

func (s *Server) DoMsgHandler(msg *MsgPkg) {
	if md, ok := ActionMap[pb.ProtoCode(msg.Head.Cmd)]; ok {
		logger.Debug("protocode handler: %d", msg.Head.Cmd)
		errCode, protomsg := md(msg)
		rsp, err := proto.Marshal(protomsg)
		fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg)
		if err != nil {
			msg.Conn.SendMsgByCode(-100, msg.Head.Cmd, nil)
			return
		}
		msg.Conn.SendMsgByCode(errCode, msg.Head.Cmd, rsp)
		return
	}
	logger.Error("protocode not handler: %d", msg.Head.Cmd)
}

func (s *Server) OnClose(conn *Connection) {
	//conn.Stop()
	s.Clients.Delete(conn.Id)
}

func (s *Server) LoadPlugin() {
	//重新加载
	_, err:=plugin.Open(conf.GlobalConf.GameConf.PluginPath)
	if err != nil {
		logger.Error("load plugin err: %v, %s", err, conf.GlobalConf.GameConf.PluginPath)
		return
	}
	logger.Debug("load plugin success")
}

func (s *Server) handleTimeOut() {
	s.Clients.Range(func(key, value interface{}) bool {
		client := value.(*Connection)
		client.update()
		return true
	})

	timewheel.TimeOut(1*time.Second, s.handleTimeOut)
}

func (s *Server)Start() error {
	//mongo 初始化
	db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.GameConf.DBName)
	models.InitGameServerModels()

	//Etcd 初始化
	var err error
	s.EtcdClient, err = etcd.NewEtcdClient(conf.GlobalConf.Etcd)
	if err != nil {
		return err
	}
	s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5)

	//初始化plugin
	//_, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath)
	//if err != nil {
	//	return err
	//}

	port := fmt.Sprintf(":%d", s.SConf.Port)
	l, err := net.Listen("tcp", port)
	if err != nil {
		return err
	}

	//启动协程池
	s.StartWorkerPool()

	//启动定时器
	s.handleTimeOut()

	//监听端口
	logger.Debug("listen on %s\n", port)
	id := 0
	for {
		conn, err := l.Accept()
		if err != nil {
			return err
		}

		id++
		client := NewConn(id, conn, s)
		s.Clients.Store(id, client)
		go client.Start()
	}
}

func (s *Server)Stop()  {
	TimeWheel.Stop()
}