package net import ( "fmt" "github.com/golang/protobuf/proto" "net" "plugin" "pro2d/conf" "pro2d/protos/pb" "pro2d/src/common" "pro2d/src/components/db" "pro2d/src/components/etcd" "pro2d/src/components/logger" "pro2d/src/models" "sync" ) type ActionHandler func (msg *MsgPkg) (int32, proto.Message) var ActionMap map[pb.ProtoCode]ActionHandler type Server struct { SConf *conf.SConf Clients *sync.Map EtcdClient *etcd.EtcdClient TaskQueue []chan*MsgPkg } func NewServer(sConf *conf.SConf) *Server { return &Server{ SConf: sConf, Clients: new(sync.Map), EtcdClient: new(etcd.EtcdClient), TaskQueue: make([]chan *MsgPkg, common.WorkerPoolSize), } } //StartWorkerPool 启动worker工作池 func (s *Server) StartWorkerPool() { //遍历需要启动worker的数量,依此启动 for i := 0; i < s.SConf.WorkerPoolSize; i++ { //一个worker被启动 //给当前worker对应的任务队列开辟空间 s.TaskQueue[i] = make(chan *MsgPkg, common.MaxTaskPerWorker) //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来 go s.StartOneWorker(i, s.TaskQueue[i]) } } //StartOneWorker 启动一个Worker工作流程 func (s *Server) StartOneWorker(workerID int, taskQueue chan *MsgPkg) { //不断的等待队列中的消息 for { select { //有消息则取出队列的Request,并执行绑定的业务方法 case request := <-taskQueue: _ = workerID s.DoMsgHandler(request) } } } func (s *Server) DoMsgHandler(msg *MsgPkg) { logger.Debug("DoMsgHandler cmd: %d, data: %s", msg.Head.Cmd, msg.Body) if md, ok := ActionMap[pb.ProtoCode(msg.Head.Cmd)]; ok { logger.Debug("adfadfadfasdfadfadsf") errCode, protomsg := md(msg) rsp, err := proto.Marshal(protomsg) if err != nil { msg.Conn.SendMsgByCode(-100, msg.Head.Cmd, nil) return } msg.Conn.SendMsgByCode(errCode, msg.Head.Cmd, rsp) return } logger.Error("protocode not handler: %d", msg.Head.Cmd) } func (s *Server) OnClose(conn *Connection) { s.Clients.Delete(conn.Id) } func (s *Server) LoadPlugin() { //重新加载 _, err:=plugin.Open(conf.GlobalConf.GameConf.PluginPath) if err != nil { logger.Error("load plugin err: %v, %s", err, conf.GlobalConf.GameConf.PluginPath) return } logger.Debug("load plugin success") } func (s *Server)Start() error { //mongo 初始化 db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.GameConf.DBName) models.InitGameServerModels() //Etcd 初始化 var err error s.EtcdClient, err = etcd.NewEtcdClient(conf.GlobalConf.Etcd) if err != nil { return err } s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5) //初始化plugin _, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath) if err != nil { return err } port := fmt.Sprintf(":%d", s.SConf.Port) l, err := net.Listen("tcp", port) if err != nil { return err } s.StartWorkerPool() logger.Debug("listen on %s\n", port) id := 0 for { conn, err := l.Accept() if err != nil { return err } id++ client := NewConn(id, conn, s) s.Clients.Store(id, client) go client.Start() } } func (s *Server)Stop() { }