server.go
3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package net
import (
"fmt"
"github.com/golang/protobuf/proto"
"net"
"plugin"
"pro2d/conf"
"pro2d/protos/pb"
"pro2d/src/common"
"pro2d/src/components/db"
"pro2d/src/components/etcd"
"pro2d/src/components/logger"
"pro2d/src/components/timewheel"
"pro2d/src/models"
"sync"
"time"
)
type ActionHandler func (msg *MsgPkg) (int32, proto.Message)
var ActionMap map[pb.ProtoCode]ActionHandler
var TimeWheel *timewheel.TimeWheel
type Server struct {
SConf *conf.SConf
Clients *sync.Map
EtcdClient *etcd.EtcdClient
TaskQueue []chan func()
}
func NewServer(sConf *conf.SConf) *Server {
return &Server{
SConf: sConf,
Clients: new(sync.Map),
EtcdClient: new(etcd.EtcdClient),
TaskQueue: make([]chan func(), common.WorkerPoolSize),
}
}
//StartWorkerPool 启动worker工作池
func (s *Server) StartWorkerPool() {
//遍历需要启动worker的数量,依此启动
for i := 0; i < s.SConf.WorkerPoolSize; i++ {
//一个worker被启动
//给当前worker对应的任务队列开辟空间
s.TaskQueue[i] = make(chan func(), common.MaxTaskPerWorker)
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
go s.StartOneWorker(i, s.TaskQueue[i])
}
}
//StartOneWorker 启动一个Worker工作流程
func (s *Server) StartOneWorker(workerID int, taskQueue chan func()) {
//不断的等待队列中的消息
for {
select {
//有消息则取出队列的Request,并执行绑定的业务方法
case f:= <-taskQueue:
_ = workerID
f()
//s.DoMsgHandler(request)
}
}
}
func (s *Server) DoMsgHandler(msg *MsgPkg) {
if md, ok := ActionMap[pb.ProtoCode(msg.Head.Cmd)]; ok {
logger.Debug("protocode handler: %d", msg.Head.Cmd)
errCode, protomsg := md(msg)
rsp, err := proto.Marshal(protomsg)
fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg)
if err != nil {
msg.Conn.SendMsgByCode(-100, msg.Head.Cmd, nil)
return
}
msg.Conn.SendMsgByCode(errCode, msg.Head.Cmd, rsp)
return
}
logger.Error("protocode not handler: %d", msg.Head.Cmd)
}
func (s *Server) OnClose(conn *Connection) {
//conn.Stop()
s.Clients.Delete(conn.Id)
}
func (s *Server) LoadPlugin() {
//重新加载
_, err:=plugin.Open(conf.GlobalConf.GameConf.PluginPath)
if err != nil {
logger.Error("load plugin err: %v, %s", err, conf.GlobalConf.GameConf.PluginPath)
return
}
logger.Debug("load plugin success")
}
func (s *Server) handleTimeOut() {
s.Clients.Range(func(key, value interface{}) bool {
client := value.(*Connection)
client.update()
return true
})
timewheel.TimeOut(1*time.Second, s.handleTimeOut)
}
func (s *Server)Start() error {
//mongo 初始化
db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.GameConf.DBName)
models.InitGameServerModels()
//Etcd 初始化
var err error
s.EtcdClient, err = etcd.NewEtcdClient(conf.GlobalConf.Etcd)
if err != nil {
return err
}
s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5)
//初始化plugin
//_, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath)
//if err != nil {
// return err
//}
port := fmt.Sprintf(":%d", s.SConf.Port)
l, err := net.Listen("tcp", port)
if err != nil {
return err
}
//启动协程池
s.StartWorkerPool()
//启动定时器
s.handleTimeOut()
//监听端口
logger.Debug("listen on %s\n", port)
id := 0
for {
conn, err := l.Accept()
if err != nil {
return err
}
id++
client := NewConn(id, conn, s)
s.Clients.Store(id, client)
go client.Start()
}
}
func (s *Server)Stop() {
TimeWheel.Stop()
}