Commit 2e0aa29811785c6f64b79661328b8adee619c8eb

Authored by zhangqijia
1 parent 38dd96b4

update 每条连接新增一条协程,用于接受处理网络数据的逻辑和定时器的逻辑, 防止数据竞争

1 -Subproject commit bb9c916232c605571321f523802c17c469c8ab1c 1 +Subproject commit 608828ec759ed7e958cf63ae557e592c816e45cc
src/components/net/conn.go
@@ -30,6 +30,9 @@ type Connection struct { @@ -30,6 +30,9 @@ type Connection struct {
30 30
31 WBuffer chan []byte 31 WBuffer chan []byte
32 32
  33 + updateFunc chan func()
  34 + readFunc chan func()
  35 +
33 Quit chan *Connection 36 Quit chan *Connection
34 37
35 Role *models.RoleModel 38 Role *models.RoleModel
@@ -54,9 +57,11 @@ func NewConn(id int, conn net.Conn, s *Server) *Connection { @@ -54,9 +57,11 @@ func NewConn(id int, conn net.Conn, s *Server) *Connection {
54 scanner: bufio.NewScanner(conn), 57 scanner: bufio.NewScanner(conn),
55 writer: bufio.NewWriter(conn), 58 writer: bufio.NewWriter(conn),
56 WBuffer: make(chan []byte), 59 WBuffer: make(chan []byte),
  60 + updateFunc: make(chan func()),
  61 + readFunc: make(chan func()),
57 Quit: make(chan *Connection), 62 Quit: make(chan *Connection),
58 lastHeartCheckTime: utils.Timex(), 63 lastHeartCheckTime: utils.Timex(),
59 - heartTimeoutCount: 0, 64 + heartTimeoutCount: 0,
60 } 65 }
61 } 66 }
62 67
@@ -87,10 +92,7 @@ func (c *Connection) read() { @@ -87,10 +92,7 @@ func (c *Connection) read() {
87 } 92 }
88 93
89 req.Conn = c 94 req.Conn = c
90 - //得到需要处理此条连接的workerID  
91 - workerID := c.Id % c.Server.SConf.WorkerPoolSize  
92 - //将请求消息发送给任务队列  
93 - c.Server.TaskQueue[workerID] <- func() { 95 + c.readFunc <- func() {
94 c.Server.DoMsgHandler(req) 96 c.Server.DoMsgHandler(req)
95 } 97 }
96 98
@@ -132,26 +134,41 @@ func (c *Connection) update() { @@ -132,26 +134,41 @@ func (c *Connection) update() {
132 } 134 }
133 135
134 if c.Role != nil { 136 if c.Role != nil {
135 - c.Server.TaskQueue[c.Id % c.Server.SConf.WorkerPoolSize] <- func() {  
136 - //role 获取Recover 137 + c.updateFunc <- func() {
  138 + //role 恢复数据
137 c.Role.OnRecoverTimer(now) 139 c.Role.OnRecoverTimer(now)
138 } 140 }
139 } 141 }
140 } 142 }
141 143
  144 +func (c *Connection) flush() {
  145 + defer c.Stop()
  146 + for {
  147 + select {
  148 + case rf := <- c.readFunc:
  149 + rf()
  150 + case uf := <- c.updateFunc:
  151 + uf()
  152 + case <- c.Quit:
  153 + return
  154 + }
  155 + }
  156 +}
142 157
143 func (c *Connection) Start() { 158 func (c *Connection) Start() {
144 go c.write() 159 go c.write()
145 go c.read() 160 go c.read()
  161 + c.flush()
146 } 162 }
147 163
148 func (c *Connection) Stop() { 164 func (c *Connection) Stop() {
  165 + logger.Debug("ID: %d close", c.Id)
149 c.Conn.Close() 166 c.Conn.Close()
  167 + c.Server.OnClose(c)
150 } 168 }
151 169
152 func (c *Connection) Quiting() { 170 func (c *Connection) Quiting() {
153 - logger.Debug("ID: %d close", c.Id)  
154 - c.Server.OnClose(c) 171 + c.Quit <- c
155 } 172 }
156 173
157 func (c *Connection) SendMsgByCode(errCode int32, cmd int32, data []byte){ 174 func (c *Connection) SendMsgByCode(errCode int32, cmd int32, data []byte){
src/components/net/msg.go
@@ -54,4 +54,4 @@ func EncodeMsg(pkg *MsgPkg) ([]byte, error) { @@ -54,4 +54,4 @@ func EncodeMsg(pkg *MsgPkg) ([]byte, error) {
54 return nil, err 54 return nil, err
55 } 55 }
56 return buf.Bytes(), nil 56 return buf.Bytes(), nil
57 -} 57 -}
  58 +}
58 \ No newline at end of file 59 \ No newline at end of file
src/components/net/server.go
1 package net 1 package net
2 2
3 import ( 3 import (
  4 + "context"
4 "fmt" 5 "fmt"
5 "github.com/golang/protobuf/proto" 6 "github.com/golang/protobuf/proto"
6 "net" 7 "net"
7 "plugin" 8 "plugin"
8 "pro2d/conf" 9 "pro2d/conf"
9 "pro2d/protos/pb" 10 "pro2d/protos/pb"
10 - "pro2d/src/common"  
11 "pro2d/src/components/db" 11 "pro2d/src/components/db"
12 "pro2d/src/components/etcd" 12 "pro2d/src/components/etcd"
13 "pro2d/src/components/logger" 13 "pro2d/src/components/logger"
@@ -19,14 +19,11 @@ import ( @@ -19,14 +19,11 @@ import (
19 19
20 type ActionHandler func (msg *MsgPkg) (int32, proto.Message) 20 type ActionHandler func (msg *MsgPkg) (int32, proto.Message)
21 var ActionMap map[pb.ProtoCode]ActionHandler 21 var ActionMap map[pb.ProtoCode]ActionHandler
22 -var TimeWheel *timewheel.TimeWheel  
23 22
24 type Server struct { 23 type Server struct {
25 SConf *conf.SConf 24 SConf *conf.SConf
26 Clients *sync.Map 25 Clients *sync.Map
27 EtcdClient *etcd.EtcdClient 26 EtcdClient *etcd.EtcdClient
28 -  
29 - TaskQueue []chan func()  
30 } 27 }
31 28
32 func NewServer(sConf *conf.SConf) *Server { 29 func NewServer(sConf *conf.SConf) *Server {
@@ -34,34 +31,6 @@ func NewServer(sConf *conf.SConf) *Server { @@ -34,34 +31,6 @@ func NewServer(sConf *conf.SConf) *Server {
34 SConf: sConf, 31 SConf: sConf,
35 Clients: new(sync.Map), 32 Clients: new(sync.Map),
36 EtcdClient: new(etcd.EtcdClient), 33 EtcdClient: new(etcd.EtcdClient),
37 -  
38 - TaskQueue: make([]chan func(), common.WorkerPoolSize),  
39 - }  
40 -}  
41 -  
42 -//StartWorkerPool 启动worker工作池  
43 -func (s *Server) StartWorkerPool() {  
44 - //遍历需要启动worker的数量,依此启动  
45 - for i := 0; i < s.SConf.WorkerPoolSize; i++ {  
46 - //一个worker被启动  
47 - //给当前worker对应的任务队列开辟空间  
48 - s.TaskQueue[i] = make(chan func(), common.MaxTaskPerWorker)  
49 - //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来  
50 - go s.StartOneWorker(i, s.TaskQueue[i])  
51 - }  
52 -}  
53 -  
54 -//StartOneWorker 启动一个Worker工作流程  
55 -func (s *Server) StartOneWorker(workerID int, taskQueue chan func()) {  
56 - //不断的等待队列中的消息  
57 - for {  
58 - select {  
59 - //有消息则取出队列的Request,并执行绑定的业务方法  
60 - case f:= <-taskQueue:  
61 - _ = workerID  
62 - f()  
63 - //s.DoMsgHandler(request)  
64 - }  
65 } 34 }
66 } 35 }
67 36
@@ -131,9 +100,6 @@ func (s *Server)Start() error { @@ -131,9 +100,6 @@ func (s *Server)Start() error {
131 return err 100 return err
132 } 101 }
133 102
134 - //启动协程池  
135 - s.StartWorkerPool()  
136 -  
137 //启动定时器 103 //启动定时器
138 s.handleTimeOut() 104 s.handleTimeOut()
139 105
@@ -154,5 +120,13 @@ func (s *Server)Start() error { @@ -154,5 +120,13 @@ func (s *Server)Start() error {
154 } 120 }
155 121
156 func (s *Server)Stop() { 122 func (s *Server)Stop() {
157 - TimeWheel.Stop() 123 + timewheel.StopTimer()
  124 +
  125 + s.Clients.Range(func(key, value interface{}) bool {
  126 + client := value.(*Connection)
  127 + client.Stop()
  128 + return true
  129 + })
  130 +
  131 + db.MongoClient.Disconnect(context.TODO())
158 } 132 }
159 \ No newline at end of file 133 \ No newline at end of file
src/components/timewheel/timerwheel.go
@@ -83,7 +83,7 @@ func NewTimeWheel() *TimeWheel { @@ -83,7 +83,7 @@ func NewTimeWheel() *TimeWheel {
83 tick: 10*time.Millisecond, 83 tick: 10*time.Millisecond,
84 time: 0, 84 time: 0,
85 WorkPool: workpool.NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker), 85 WorkPool: workpool.NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker),
86 - exit: nil, 86 + exit: make(chan struct{}),
87 } 87 }
88 for i :=0; i < TimeNear; i++ { 88 for i :=0; i < TimeNear; i++ {
89 tw.near[i] = newBucket() 89 tw.near[i] = newBucket()
@@ -198,4 +198,8 @@ func (tw *TimeWheel) afterFunc(expiration time.Duration, f func()) { @@ -198,4 +198,8 @@ func (tw *TimeWheel) afterFunc(expiration time.Duration, f func()) {
198 198
199 func TimeOut(expire time.Duration, f func()) { 199 func TimeOut(expire time.Duration, f func()) {
200 TimingWheel.afterFunc(expire, f) 200 TimingWheel.afterFunc(expire, f)
  201 +}
  202 +
  203 +func StopTimer() {
  204 + TimingWheel.Stop()
201 } 205 }
202 \ No newline at end of file 206 \ No newline at end of file
src/models/role.go
@@ -99,10 +99,10 @@ func (m *RoleModel) AddHero(hero *pb.Hero) { @@ -99,10 +99,10 @@ func (m *RoleModel) AddHero(hero *pb.Hero) {
99 m.Heros[fmt.Sprintf("%s%s", m.Role.Id, h.Hero.Id)] = h 99 m.Heros[fmt.Sprintf("%s%s", m.Role.Id, h.Hero.Id)] = h
100 } 100 }
101 101
102 -func (m *RoleModel) GetAllHero() map[string]*pb.Hero {  
103 - h := make(map[string]*pb.Hero)  
104 - for k, hero := range m.Heros {  
105 - h[k] = hero.Hero 102 +func (m *RoleModel) GetAllHero() []*pb.Hero {
  103 + var h []*pb.Hero
  104 + for _, hero := range m.Heros {
  105 + h = append(h, hero.Hero)
106 } 106 }
107 return h 107 return h
108 } 108 }
src/plugin/RolePlugin.go
@@ -54,6 +54,7 @@ func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) { @@ -54,6 +54,7 @@ func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) {
54 // Team: nil, 54 // Team: nil,
55 // Equips: nil, 55 // Equips: nil,
56 //} 56 //}
  57 + msg.Conn.Role = role
57 return 0, &pb.RoleRsp{ 58 return 0, &pb.RoleRsp{
58 Role: role.Role, 59 Role: role.Role,
59 Hero: role.GetAllHero(), 60 Hero: role.GetAllHero(),
@@ -69,7 +69,7 @@ func main() { @@ -69,7 +69,7 @@ func main() {
69 return 69 return
70 } 70 }
71 logger.Debug("recv: %s, n: %d\n", b1, n) 71 logger.Debug("recv: %s, n: %d\n", b1, n)
72 - time.Sleep(5*time.Second) 72 + time.Sleep(1*time.Second)
73 } 73 }
74 74
75 } 75 }
76 \ No newline at end of file 76 \ No newline at end of file