From cd2f96ab70602424ec48f1ad23292ba7aa45c231 Mon Sep 17 00:00:00 2001 From: zqj <582132116@qq.com> Date: Mon, 21 Mar 2022 11:42:32 +0800 Subject: [PATCH] fix: 优化连接管理 --- cmd/gameserver/game.go | 35 +++++++++++++++-------------------- common/components/conn.go | 58 +++++++++++++++++++++++++++++----------------------------- common/components/icompontents.go | 12 +++++++++--- common/components/server.go | 46 ++++++++++++++++------------------------------ 4 files changed, 69 insertions(+), 82 deletions(-) diff --git a/cmd/gameserver/game.go b/cmd/gameserver/game.go index a573029..c032e9b 100644 --- a/cmd/gameserver/game.go +++ b/cmd/gameserver/game.go @@ -9,20 +9,15 @@ import ( "pro2d/models" "pro2d/common/etcd" - "sync" ) type GameServer struct { components.IServer EtcdClient *etcd.EtcdClient - - Agents *sync.Map } -func NewGameServer(sconf *common.SConf) (*GameServer, error) { - s := &GameServer{ - Agents: new(sync.Map), - } +func NewGameServer(sconf *common.SConf) (*GameServer, error) { + s := &GameServer{} options := []components.ServerOption{ components.WithPlugin(components.NewPlugin(sconf.PluginPath)), @@ -57,40 +52,40 @@ func (s *GameServer) Start() error { return s.IServer.Start() } -func (s *GameServer) Stop() { +func (s *GameServer) Stop() { s.IServer.Stop() db.CloseMongo() s.EtcdClient.Close() } -func (s *GameServer) OnConnection(conn components.IConnection) { +func (s *GameServer) OnConnection(conn components.IConnection) { agent := NewAgent(s) agent.OnConnection(conn) - s.Agents.Store(conn.GetID(),agent) + s.GetConnManage().AddConn(conn.GetID(), agent) } -func (s *GameServer) OnMessage(msg components.IMessage) { - agent, ok := s.Agents.Load(msg.GetSession().GetID()) - if !ok { +func (s *GameServer) OnMessage(msg components.IMessage) { + agent := s.GetConnManage().GetConn(msg.GetSession().GetID()) + if agent == nil { return } agent.(*Agent).OnMessage(msg) } -func (s *GameServer) OnTimer(conn components.IConnection) { - agent, ok := s.Agents.Load(conn.GetID()) - if !ok { +func (s *GameServer) OnTimer(conn components.IConnection) { + agent := s.GetConnManage().GetConn(conn.GetID()) + if agent == nil { return } agent.(*Agent).OnTimer() } -func (s *GameServer) OnClose(conn components.IConnection) { - agent, ok := s.Agents.Load(conn.GetID()) - if !ok { +func (s *GameServer) OnClose(conn components.IConnection) { + agent := s.GetConnManage().GetConn(conn.GetID()) + if agent == nil { return } agent.(*Agent).OnClose() - s.Agents.Delete(conn.GetID()) + s.GetConnManage().DelConn(conn.GetID()) } diff --git a/common/components/conn.go b/common/components/conn.go index a9e518c..5f2fff6 100644 --- a/common/components/conn.go +++ b/common/components/conn.go @@ -10,17 +10,16 @@ import ( "time" ) - type Connection struct { IConnection net.Conn Server IServer Id int - scanner *bufio.Scanner - writer *bufio.Writer - WBuffer chan []byte - Quit chan *Connection + scanner *bufio.Scanner + writer *bufio.Writer + WBuffer chan []byte + Quit chan *Connection readFunc chan func() timerFunc chan func() @@ -34,18 +33,18 @@ type Connection struct { func NewConn(id int, conn net.Conn, s IServer) *Connection { c := &Connection{ - Id: id, - Conn: conn, - Server: s, - - scanner: bufio.NewScanner(conn), - writer: bufio.NewWriter(conn), - WBuffer: make(chan []byte, common.MaxMsgChan), - Quit: make(chan *Connection), + Id: id, + Conn: conn, + Server: s, + + scanner: bufio.NewScanner(conn), + writer: bufio.NewWriter(conn), + WBuffer: make(chan []byte, common.MaxMsgChan), + Quit: make(chan *Connection), readFunc: make(chan func(), 10), timerFunc: make(chan func(), 10), - Status: 0, + Status: 0, } c.connectionCallback = c.defaultConnectionCallback c.messageCallback = c.defaultMessageCallback @@ -74,7 +73,7 @@ func (c *Connection) SetTimerCallback(cb TimerCallback) { c.timerCallback = cb } -func (c *Connection) Start() { +func (c *Connection) Start() { go c.write() go c.read() go c.listen() @@ -96,7 +95,7 @@ func (c *Connection) Stop() { } } -func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error{ +func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error { buf, err := c.Server.GetSplitter().Pack(cmd, data, errCode, 0) if err != nil { return err @@ -113,25 +112,25 @@ func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error{ } } -func (c *Connection) defaultConnectionCallback(conn IConnection) { +func (c *Connection) defaultConnectionCallback(conn IConnection) { } -func (c *Connection) defaultMessageCallback(msg IMessage) { +func (c *Connection) defaultMessageCallback(msg IMessage) { } -func (c *Connection) defaultCloseCallback(conn IConnection) { +func (c *Connection) defaultCloseCallback(conn IConnection) { } -func (c *Connection) defaultTimerCallback(conn IConnection) { +func (c *Connection) defaultTimerCallback(conn IConnection) { } -func (c *Connection) write() { +func (c *Connection) write() { defer c.quitting() for msg := range c.WBuffer { n, err := c.writer.Write(msg) - if err != nil{ - logger.Error("write fail err: " + err.Error(), "n: ", n) + if err != nil { + logger.Error("write fail err: "+err.Error(), "n: ", n) return } if err := c.writer.Flush(); err != nil { @@ -164,16 +163,16 @@ func (c *Connection) read() { } //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。 -func (c *Connection) listen(){ +func (c *Connection) listen() { defer c.quitting() - for { + for { select { - case timerFunc := <- c.timerFunc: + case timerFunc := <-c.timerFunc: timerFunc() - case readFunc := <- c.readFunc: + case readFunc := <-c.readFunc: readFunc() - case <- c.Quit: + case <-c.Quit: return } } @@ -186,7 +185,7 @@ func (c *Connection) handleTimeOut() { TimeOut(1*time.Second, c.handleTimeOut) } -func (c *Connection) quitting() { +func (c *Connection) quitting() { closed := atomic.LoadUint32(&c.Status) if closed == 0 { return @@ -196,6 +195,7 @@ func (c *Connection) quitting() { logger.Debug("ID: %d close", c.Id) close(c.WBuffer) close(c.Quit) + close(c.readFunc) c.Conn.Close() c.closeCallback(c) } diff --git a/common/components/icompontents.go b/common/components/icompontents.go index 2ba1830..4dd128f 100644 --- a/common/components/icompontents.go +++ b/common/components/icompontents.go @@ -46,15 +46,23 @@ type ( SetCloseCallback(CloseCallback) SetTimerCallback(TimerCallback) } + //connManage + IConnManage interface { + AddConn(id int, connection IConnection) + GetConn(id int) IConnection + DelConn(id int) IConnection + Range(f func(key interface{}, value interface{}) bool) + StopAllConns() + } //server IServer interface { Start() error Stop() GetSplitter() ISplitter - GetIConnection(id int) IConnection GetPlugin() IPlugin GetAction(uint32) interface{} + GetConnManage() IConnManage SetConnectionCallback(ConnectionCallback) SetMessageCallback(MessageCallback) @@ -75,8 +83,6 @@ type ( SetActions(map[interface{}]interface{}) GetAction(uint32) interface{} } - - ) //----------------- diff --git a/common/components/server.go b/common/components/server.go index 3833f8d..2f1d444 100644 --- a/common/components/server.go +++ b/common/components/server.go @@ -4,7 +4,6 @@ import ( "fmt" "net" "pro2d/common/logger" - "sync" ) type ServerOption func(*Server) @@ -45,25 +44,24 @@ func WithTimerCbk(cb TimerCallback) ServerOption { } } - type Server struct { - PluginPath string - plugins IPlugin - splitter ISplitter + PluginPath string + plugins IPlugin + splitter ISplitter - connectionCallback ConnectionCallback - messageCallback MessageCallback - closeCallback CloseCallback - timerCallback TimerCallback + connectionCallback ConnectionCallback + messageCallback MessageCallback + closeCallback CloseCallback + timerCallback TimerCallback - port int - Clients *sync.Map + port int + connManage *ConnManage } func NewServer(port int, options ...ServerOption) IServer { s := &Server{ - port: port, - Clients: new(sync.Map), + port: port, + connManage: NewConnManage(), } for _, option := range options { option(s) @@ -76,14 +74,6 @@ func (s *Server) GetSplitter() ISplitter { return s.splitter } -func (s *Server) GetIConnection(id int) IConnection { - c, ok := s.Clients.Load(id) - if !ok { - return nil - } - return c.(IConnection) -} - func (s *Server) GetPlugin() IPlugin { return s.plugins } @@ -92,6 +82,10 @@ func (s *Server) GetAction(cmd uint32) interface{} { return s.plugins.GetAction(cmd) } +func (s *Server) GetConnManage() IConnManage { + return s.connManage +} + func (s *Server) SetConnectionCallback(cb ConnectionCallback) { s.connectionCallback = cb } @@ -133,19 +127,12 @@ func (s *Server) Start() error { } } -func (s *Server)Stop() { +func (s *Server) Stop() { StopTimer() - s.Clients.Range(func(key, value interface{}) bool { - client := value.(IConnection) - client.Stop() - return true - }) } func (s *Server) newConnection(conn IConnection) { - s.Clients.Store(conn.GetID(), conn) - conn.SetConnectionCallback(s.connectionCallback) conn.SetCloseCallback(s.removeConnection) conn.SetMessageCallback(s.messageCallback) @@ -156,5 +143,4 @@ func (s *Server) newConnection(conn IConnection) { func (s *Server) removeConnection(conn IConnection) { s.closeCallback(conn) - s.Clients.Delete(conn.GetID()) } -- libgit2 0.21.2