Commit cd2f96ab70602424ec48f1ad23292ba7aa45c231

Authored by zhangqijia
1 parent 364620cd

fix: 优化连接管理

只使用一份连接管理,把数据放在底层,操作放在应用层
cmd/gameserver/game.go
@@ -9,20 +9,15 @@ import ( @@ -9,20 +9,15 @@ import (
9 "pro2d/models" 9 "pro2d/models"
10 10
11 "pro2d/common/etcd" 11 "pro2d/common/etcd"
12 - "sync"  
13 ) 12 )
14 13
15 type GameServer struct { 14 type GameServer struct {
16 components.IServer 15 components.IServer
17 EtcdClient *etcd.EtcdClient 16 EtcdClient *etcd.EtcdClient
18 -  
19 - Agents *sync.Map  
20 } 17 }
21 18
22 -func NewGameServer(sconf *common.SConf) (*GameServer, error) {  
23 - s := &GameServer{  
24 - Agents: new(sync.Map),  
25 - } 19 +func NewGameServer(sconf *common.SConf) (*GameServer, error) {
  20 + s := &GameServer{}
26 21
27 options := []components.ServerOption{ 22 options := []components.ServerOption{
28 components.WithPlugin(components.NewPlugin(sconf.PluginPath)), 23 components.WithPlugin(components.NewPlugin(sconf.PluginPath)),
@@ -57,40 +52,40 @@ func (s *GameServer) Start() error { @@ -57,40 +52,40 @@ func (s *GameServer) Start() error {
57 return s.IServer.Start() 52 return s.IServer.Start()
58 } 53 }
59 54
60 -func (s *GameServer) Stop() { 55 +func (s *GameServer) Stop() {
61 s.IServer.Stop() 56 s.IServer.Stop()
62 57
63 db.CloseMongo() 58 db.CloseMongo()
64 s.EtcdClient.Close() 59 s.EtcdClient.Close()
65 } 60 }
66 61
67 -func (s *GameServer) OnConnection(conn components.IConnection) { 62 +func (s *GameServer) OnConnection(conn components.IConnection) {
68 agent := NewAgent(s) 63 agent := NewAgent(s)
69 agent.OnConnection(conn) 64 agent.OnConnection(conn)
70 - s.Agents.Store(conn.GetID(),agent) 65 + s.GetConnManage().AddConn(conn.GetID(), agent)
71 } 66 }
72 67
73 -func (s *GameServer) OnMessage(msg components.IMessage) {  
74 - agent, ok := s.Agents.Load(msg.GetSession().GetID())  
75 - if !ok { 68 +func (s *GameServer) OnMessage(msg components.IMessage) {
  69 + agent := s.GetConnManage().GetConn(msg.GetSession().GetID())
  70 + if agent == nil {
76 return 71 return
77 } 72 }
78 agent.(*Agent).OnMessage(msg) 73 agent.(*Agent).OnMessage(msg)
79 } 74 }
80 75
81 -func (s *GameServer) OnTimer(conn components.IConnection) {  
82 - agent, ok := s.Agents.Load(conn.GetID())  
83 - if !ok { 76 +func (s *GameServer) OnTimer(conn components.IConnection) {
  77 + agent := s.GetConnManage().GetConn(conn.GetID())
  78 + if agent == nil {
84 return 79 return
85 } 80 }
86 agent.(*Agent).OnTimer() 81 agent.(*Agent).OnTimer()
87 } 82 }
88 83
89 -func (s *GameServer) OnClose(conn components.IConnection) {  
90 - agent, ok := s.Agents.Load(conn.GetID())  
91 - if !ok { 84 +func (s *GameServer) OnClose(conn components.IConnection) {
  85 + agent := s.GetConnManage().GetConn(conn.GetID())
  86 + if agent == nil {
92 return 87 return
93 } 88 }
94 agent.(*Agent).OnClose() 89 agent.(*Agent).OnClose()
95 - s.Agents.Delete(conn.GetID()) 90 + s.GetConnManage().DelConn(conn.GetID())
96 } 91 }
common/components/conn.go
@@ -10,17 +10,16 @@ import ( @@ -10,17 +10,16 @@ import (
10 "time" 10 "time"
11 ) 11 )
12 12
13 -  
14 type Connection struct { 13 type Connection struct {
15 IConnection 14 IConnection
16 net.Conn 15 net.Conn
17 Server IServer 16 Server IServer
18 Id int 17 Id int
19 18
20 - scanner *bufio.Scanner  
21 - writer *bufio.Writer  
22 - WBuffer chan []byte  
23 - Quit chan *Connection 19 + scanner *bufio.Scanner
  20 + writer *bufio.Writer
  21 + WBuffer chan []byte
  22 + Quit chan *Connection
24 readFunc chan func() 23 readFunc chan func()
25 timerFunc chan func() 24 timerFunc chan func()
26 25
@@ -34,18 +33,18 @@ type Connection struct { @@ -34,18 +33,18 @@ type Connection struct {
34 33
35 func NewConn(id int, conn net.Conn, s IServer) *Connection { 34 func NewConn(id int, conn net.Conn, s IServer) *Connection {
36 c := &Connection{ 35 c := &Connection{
37 - Id: id,  
38 - Conn: conn,  
39 - Server: s,  
40 -  
41 - scanner: bufio.NewScanner(conn),  
42 - writer: bufio.NewWriter(conn),  
43 - WBuffer: make(chan []byte, common.MaxMsgChan),  
44 - Quit: make(chan *Connection), 36 + Id: id,
  37 + Conn: conn,
  38 + Server: s,
  39 +
  40 + scanner: bufio.NewScanner(conn),
  41 + writer: bufio.NewWriter(conn),
  42 + WBuffer: make(chan []byte, common.MaxMsgChan),
  43 + Quit: make(chan *Connection),
45 readFunc: make(chan func(), 10), 44 readFunc: make(chan func(), 10),
46 timerFunc: make(chan func(), 10), 45 timerFunc: make(chan func(), 10),
47 46
48 - Status: 0, 47 + Status: 0,
49 } 48 }
50 c.connectionCallback = c.defaultConnectionCallback 49 c.connectionCallback = c.defaultConnectionCallback
51 c.messageCallback = c.defaultMessageCallback 50 c.messageCallback = c.defaultMessageCallback
@@ -74,7 +73,7 @@ func (c *Connection) SetTimerCallback(cb TimerCallback) { @@ -74,7 +73,7 @@ func (c *Connection) SetTimerCallback(cb TimerCallback) {
74 c.timerCallback = cb 73 c.timerCallback = cb
75 } 74 }
76 75
77 -func (c *Connection) Start() { 76 +func (c *Connection) Start() {
78 go c.write() 77 go c.write()
79 go c.read() 78 go c.read()
80 go c.listen() 79 go c.listen()
@@ -96,7 +95,7 @@ func (c *Connection) Stop() { @@ -96,7 +95,7 @@ func (c *Connection) Stop() {
96 } 95 }
97 } 96 }
98 97
99 -func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error{ 98 +func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error {
100 buf, err := c.Server.GetSplitter().Pack(cmd, data, errCode, 0) 99 buf, err := c.Server.GetSplitter().Pack(cmd, data, errCode, 0)
101 if err != nil { 100 if err != nil {
102 return err 101 return err
@@ -113,25 +112,25 @@ func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error{ @@ -113,25 +112,25 @@ func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error{
113 } 112 }
114 } 113 }
115 114
116 -func (c *Connection) defaultConnectionCallback(conn IConnection) { 115 +func (c *Connection) defaultConnectionCallback(conn IConnection) {
117 } 116 }
118 117
119 -func (c *Connection) defaultMessageCallback(msg IMessage) { 118 +func (c *Connection) defaultMessageCallback(msg IMessage) {
120 } 119 }
121 120
122 -func (c *Connection) defaultCloseCallback(conn IConnection) { 121 +func (c *Connection) defaultCloseCallback(conn IConnection) {
123 } 122 }
124 123
125 -func (c *Connection) defaultTimerCallback(conn IConnection) { 124 +func (c *Connection) defaultTimerCallback(conn IConnection) {
126 } 125 }
127 126
128 -func (c *Connection) write() { 127 +func (c *Connection) write() {
129 defer c.quitting() 128 defer c.quitting()
130 129
131 for msg := range c.WBuffer { 130 for msg := range c.WBuffer {
132 n, err := c.writer.Write(msg) 131 n, err := c.writer.Write(msg)
133 - if err != nil{  
134 - logger.Error("write fail err: " + err.Error(), "n: ", n) 132 + if err != nil {
  133 + logger.Error("write fail err: "+err.Error(), "n: ", n)
135 return 134 return
136 } 135 }
137 if err := c.writer.Flush(); err != nil { 136 if err := c.writer.Flush(); err != nil {
@@ -164,16 +163,16 @@ func (c *Connection) read() { @@ -164,16 +163,16 @@ func (c *Connection) read() {
164 } 163 }
165 164
166 //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。 165 //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。
167 -func (c *Connection) listen(){ 166 +func (c *Connection) listen() {
168 defer c.quitting() 167 defer c.quitting()
169 168
170 - for { 169 + for {
171 select { 170 select {
172 - case timerFunc := <- c.timerFunc: 171 + case timerFunc := <-c.timerFunc:
173 timerFunc() 172 timerFunc()
174 - case readFunc := <- c.readFunc: 173 + case readFunc := <-c.readFunc:
175 readFunc() 174 readFunc()
176 - case <- c.Quit: 175 + case <-c.Quit:
177 return 176 return
178 } 177 }
179 } 178 }
@@ -186,7 +185,7 @@ func (c *Connection) handleTimeOut() { @@ -186,7 +185,7 @@ func (c *Connection) handleTimeOut() {
186 TimeOut(1*time.Second, c.handleTimeOut) 185 TimeOut(1*time.Second, c.handleTimeOut)
187 } 186 }
188 187
189 -func (c *Connection) quitting() { 188 +func (c *Connection) quitting() {
190 closed := atomic.LoadUint32(&c.Status) 189 closed := atomic.LoadUint32(&c.Status)
191 if closed == 0 { 190 if closed == 0 {
192 return 191 return
@@ -196,6 +195,7 @@ func (c *Connection) quitting() { @@ -196,6 +195,7 @@ func (c *Connection) quitting() {
196 logger.Debug("ID: %d close", c.Id) 195 logger.Debug("ID: %d close", c.Id)
197 close(c.WBuffer) 196 close(c.WBuffer)
198 close(c.Quit) 197 close(c.Quit)
  198 + close(c.readFunc)
199 c.Conn.Close() 199 c.Conn.Close()
200 c.closeCallback(c) 200 c.closeCallback(c)
201 } 201 }
common/components/icompontents.go
@@ -46,15 +46,23 @@ type ( @@ -46,15 +46,23 @@ type (
46 SetCloseCallback(CloseCallback) 46 SetCloseCallback(CloseCallback)
47 SetTimerCallback(TimerCallback) 47 SetTimerCallback(TimerCallback)
48 } 48 }
  49 + //connManage
  50 + IConnManage interface {
  51 + AddConn(id int, connection IConnection)
  52 + GetConn(id int) IConnection
  53 + DelConn(id int) IConnection
  54 + Range(f func(key interface{}, value interface{}) bool)
  55 + StopAllConns()
  56 + }
49 //server 57 //server
50 IServer interface { 58 IServer interface {
51 Start() error 59 Start() error
52 Stop() 60 Stop()
53 61
54 GetSplitter() ISplitter 62 GetSplitter() ISplitter
55 - GetIConnection(id int) IConnection  
56 GetPlugin() IPlugin 63 GetPlugin() IPlugin
57 GetAction(uint32) interface{} 64 GetAction(uint32) interface{}
  65 + GetConnManage() IConnManage
58 66
59 SetConnectionCallback(ConnectionCallback) 67 SetConnectionCallback(ConnectionCallback)
60 SetMessageCallback(MessageCallback) 68 SetMessageCallback(MessageCallback)
@@ -75,8 +83,6 @@ type ( @@ -75,8 +83,6 @@ type (
75 SetActions(map[interface{}]interface{}) 83 SetActions(map[interface{}]interface{})
76 GetAction(uint32) interface{} 84 GetAction(uint32) interface{}
77 } 85 }
78 -  
79 -  
80 ) 86 )
81 87
82 //----------------- 88 //-----------------
common/components/server.go
@@ -4,7 +4,6 @@ import ( @@ -4,7 +4,6 @@ import (
4 "fmt" 4 "fmt"
5 "net" 5 "net"
6 "pro2d/common/logger" 6 "pro2d/common/logger"
7 - "sync"  
8 ) 7 )
9 8
10 type ServerOption func(*Server) 9 type ServerOption func(*Server)
@@ -45,25 +44,24 @@ func WithTimerCbk(cb TimerCallback) ServerOption { @@ -45,25 +44,24 @@ func WithTimerCbk(cb TimerCallback) ServerOption {
45 } 44 }
46 } 45 }
47 46
48 -  
49 type Server struct { 47 type Server struct {
50 - PluginPath string  
51 - plugins IPlugin  
52 - splitter ISplitter 48 + PluginPath string
  49 + plugins IPlugin
  50 + splitter ISplitter
53 51
54 - connectionCallback ConnectionCallback  
55 - messageCallback MessageCallback  
56 - closeCallback CloseCallback  
57 - timerCallback TimerCallback 52 + connectionCallback ConnectionCallback
  53 + messageCallback MessageCallback
  54 + closeCallback CloseCallback
  55 + timerCallback TimerCallback
58 56
59 - port int  
60 - Clients *sync.Map 57 + port int
  58 + connManage *ConnManage
61 } 59 }
62 60
63 func NewServer(port int, options ...ServerOption) IServer { 61 func NewServer(port int, options ...ServerOption) IServer {
64 s := &Server{ 62 s := &Server{
65 - port: port,  
66 - Clients: new(sync.Map), 63 + port: port,
  64 + connManage: NewConnManage(),
67 } 65 }
68 for _, option := range options { 66 for _, option := range options {
69 option(s) 67 option(s)
@@ -76,14 +74,6 @@ func (s *Server) GetSplitter() ISplitter { @@ -76,14 +74,6 @@ func (s *Server) GetSplitter() ISplitter {
76 return s.splitter 74 return s.splitter
77 } 75 }
78 76
79 -func (s *Server) GetIConnection(id int) IConnection {  
80 - c, ok := s.Clients.Load(id)  
81 - if !ok {  
82 - return nil  
83 - }  
84 - return c.(IConnection)  
85 -}  
86 -  
87 func (s *Server) GetPlugin() IPlugin { 77 func (s *Server) GetPlugin() IPlugin {
88 return s.plugins 78 return s.plugins
89 } 79 }
@@ -92,6 +82,10 @@ func (s *Server) GetAction(cmd uint32) interface{} { @@ -92,6 +82,10 @@ func (s *Server) GetAction(cmd uint32) interface{} {
92 return s.plugins.GetAction(cmd) 82 return s.plugins.GetAction(cmd)
93 } 83 }
94 84
  85 +func (s *Server) GetConnManage() IConnManage {
  86 + return s.connManage
  87 +}
  88 +
95 func (s *Server) SetConnectionCallback(cb ConnectionCallback) { 89 func (s *Server) SetConnectionCallback(cb ConnectionCallback) {
96 s.connectionCallback = cb 90 s.connectionCallback = cb
97 } 91 }
@@ -133,19 +127,12 @@ func (s *Server) Start() error { @@ -133,19 +127,12 @@ func (s *Server) Start() error {
133 } 127 }
134 } 128 }
135 129
136 -func (s *Server)Stop() { 130 +func (s *Server) Stop() {
137 StopTimer() 131 StopTimer()
138 132
139 - s.Clients.Range(func(key, value interface{}) bool {  
140 - client := value.(IConnection)  
141 - client.Stop()  
142 - return true  
143 - })  
144 } 133 }
145 134
146 func (s *Server) newConnection(conn IConnection) { 135 func (s *Server) newConnection(conn IConnection) {
147 - s.Clients.Store(conn.GetID(), conn)  
148 -  
149 conn.SetConnectionCallback(s.connectionCallback) 136 conn.SetConnectionCallback(s.connectionCallback)
150 conn.SetCloseCallback(s.removeConnection) 137 conn.SetCloseCallback(s.removeConnection)
151 conn.SetMessageCallback(s.messageCallback) 138 conn.SetMessageCallback(s.messageCallback)
@@ -156,5 +143,4 @@ func (s *Server) newConnection(conn IConnection) { @@ -156,5 +143,4 @@ func (s *Server) newConnection(conn IConnection) {
156 143
157 func (s *Server) removeConnection(conn IConnection) { 144 func (s *Server) removeConnection(conn IConnection) {
158 s.closeCallback(conn) 145 s.closeCallback(conn)
159 - s.Clients.Delete(conn.GetID())  
160 } 146 }