server.go
3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package net
import (
"fmt"
"github.com/golang/protobuf/proto"
"net"
"plugin"
"pro2d/conf"
"pro2d/protos/pb"
"pro2d/src/common"
"pro2d/src/components/db"
"pro2d/src/components/etcd"
"pro2d/src/components/logger"
"pro2d/src/models"
"sync"
)
type ActionHandler func (msg *MsgPkg) (int32, proto.Message)
var ActionMap map[pb.ProtoCode]ActionHandler
type Server struct {
SConf *conf.SConf
Clients *sync.Map
EtcdClient *etcd.EtcdClient
TaskQueue []chan*MsgPkg
}
func NewServer(sConf *conf.SConf) *Server {
return &Server{
SConf: sConf,
Clients: new(sync.Map),
EtcdClient: new(etcd.EtcdClient),
TaskQueue: make([]chan *MsgPkg, common.WorkerPoolSize),
}
}
//StartWorkerPool 启动worker工作池
func (s *Server) StartWorkerPool() {
//遍历需要启动worker的数量,依此启动
for i := 0; i < s.SConf.WorkerPoolSize; i++ {
//一个worker被启动
//给当前worker对应的任务队列开辟空间
s.TaskQueue[i] = make(chan *MsgPkg, common.MaxTaskPerWorker)
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
go s.StartOneWorker(i, s.TaskQueue[i])
}
}
//StartOneWorker 启动一个Worker工作流程
func (s *Server) StartOneWorker(workerID int, taskQueue chan *MsgPkg) {
//不断的等待队列中的消息
for {
select {
//有消息则取出队列的Request,并执行绑定的业务方法
case request := <-taskQueue:
_ = workerID
s.DoMsgHandler(request)
}
}
}
func (s *Server) DoMsgHandler(msg *MsgPkg) {
logger.Debug("DoMsgHandler cmd: %d, data: %s", msg.Head.Cmd, msg.Body)
if md, ok := ActionMap[pb.ProtoCode(msg.Head.Cmd)]; ok {
logger.Debug("adfadfadfasdfadfadsf")
errCode, protomsg := md(msg)
rsp, err := proto.Marshal(protomsg)
if err != nil {
msg.Conn.SendMsgByCode(-100, msg.Head.Cmd, nil)
return
}
msg.Conn.SendMsgByCode(errCode, msg.Head.Cmd, rsp)
return
}
logger.Error("protocode not handler: %d", msg.Head.Cmd)
}
func (s *Server) OnClose(conn *Connection) {
s.Clients.Delete(conn.Id)
}
func (s *Server) LoadPlugin() {
//重新加载
_, err:=plugin.Open(conf.GlobalConf.GameConf.PluginPath)
if err != nil {
logger.Error("load plugin err: %v, %s", err, conf.GlobalConf.GameConf.PluginPath)
return
}
logger.Debug("load plugin success")
}
func (s *Server)Start() error {
//mongo 初始化
db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.GameConf.DBName)
models.InitGameServerModels()
//Etcd 初始化
var err error
s.EtcdClient, err = etcd.NewEtcdClient(conf.GlobalConf.Etcd)
if err != nil {
return err
}
s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5)
//初始化plugin
_, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath)
if err != nil {
return err
}
port := fmt.Sprintf(":%d", s.SConf.Port)
l, err := net.Listen("tcp", port)
if err != nil {
return err
}
s.StartWorkerPool()
logger.Debug("listen on %s\n", port)
id := 0
for {
conn, err := l.Accept()
if err != nil {
return err
}
id++
client := NewConn(id, conn, s)
s.Clients.Store(id, client)
go client.Start()
}
}
func (s *Server)Stop() {
}