package net import ( "fmt" "github.com/golang/protobuf/proto" "net" "plugin" "pro2d/conf" "pro2d/protos/pb" "pro2d/src/common" "pro2d/src/components/db" "pro2d/src/components/etcd" "pro2d/src/components/logger" "pro2d/src/components/timewheel" "pro2d/src/models" "sync" "time" ) type ActionHandler func (msg *MsgPkg) (int32, proto.Message) var ActionMap map[pb.ProtoCode]ActionHandler var TimeWheel *timewheel.TimeWheel type Server struct { SConf *conf.SConf Clients *sync.Map EtcdClient *etcd.EtcdClient TaskQueue []chan func() } func NewServer(sConf *conf.SConf) *Server { return &Server{ SConf: sConf, Clients: new(sync.Map), EtcdClient: new(etcd.EtcdClient), TaskQueue: make([]chan func(), common.WorkerPoolSize), } } //StartWorkerPool 启动worker工作池 func (s *Server) StartWorkerPool() { //遍历需要启动worker的数量,依此启动 for i := 0; i < s.SConf.WorkerPoolSize; i++ { //一个worker被启动 //给当前worker对应的任务队列开辟空间 s.TaskQueue[i] = make(chan func(), common.MaxTaskPerWorker) //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来 go s.StartOneWorker(i, s.TaskQueue[i]) } } //StartOneWorker 启动一个Worker工作流程 func (s *Server) StartOneWorker(workerID int, taskQueue chan func()) { //不断的等待队列中的消息 for { select { //有消息则取出队列的Request,并执行绑定的业务方法 case f:= <-taskQueue: _ = workerID f() //s.DoMsgHandler(request) } } } func (s *Server) DoMsgHandler(msg *MsgPkg) { if md, ok := ActionMap[pb.ProtoCode(msg.Head.Cmd)]; ok { logger.Debug("protocode handler: %d", msg.Head.Cmd) errCode, protomsg := md(msg) rsp, err := proto.Marshal(protomsg) fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg) if err != nil { msg.Conn.SendMsgByCode(-100, msg.Head.Cmd, nil) return } msg.Conn.SendMsgByCode(errCode, msg.Head.Cmd, rsp) return } logger.Error("protocode not handler: %d", msg.Head.Cmd) } func (s *Server) OnClose(conn *Connection) { //conn.Stop() s.Clients.Delete(conn.Id) } func (s *Server) LoadPlugin() { //重新加载 _, err:=plugin.Open(conf.GlobalConf.GameConf.PluginPath) if err != nil { logger.Error("load plugin err: %v, %s", err, conf.GlobalConf.GameConf.PluginPath) return } logger.Debug("load plugin success") } func (s *Server) handleTimeOut() { s.Clients.Range(func(key, value interface{}) bool { client := value.(*Connection) client.update() return true }) timewheel.TimeOut(1*time.Second, s.handleTimeOut) } func (s *Server)Start() error { //mongo 初始化 db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.GameConf.DBName) models.InitGameServerModels() //Etcd 初始化 var err error s.EtcdClient, err = etcd.NewEtcdClient(conf.GlobalConf.Etcd) if err != nil { return err } s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5) //初始化plugin //_, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath) //if err != nil { // return err //} port := fmt.Sprintf(":%d", s.SConf.Port) l, err := net.Listen("tcp", port) if err != nil { return err } //启动协程池 s.StartWorkerPool() //启动定时器 s.handleTimeOut() //监听端口 logger.Debug("listen on %s\n", port) id := 0 for { conn, err := l.Accept() if err != nil { return err } id++ client := NewConn(id, conn, s) s.Clients.Store(id, client) go client.Start() } } func (s *Server)Stop() { TimeWheel.Stop() }