diff --git a/cmd/gameserver/agent.go b/cmd/gameserver/agent.go index cac84ce..4d9cca6 100644 --- a/cmd/gameserver/agent.go +++ b/cmd/gameserver/agent.go @@ -8,6 +8,7 @@ import ( "pro2d/common/logger" "pro2d/models" "pro2d/utils" + "sync" "sync/atomic" ) @@ -16,25 +17,21 @@ type Agent struct { Server components.IServer Role *models.RoleModel - - - Quit chan *Agent - nextCheckTime int64 //下一次检查的时间 lastHeartCheckTime int64 heartTimeoutCount int //超时次数 } -func NewAgent(s components.IServer) *Agent { - return &Agent{ - Server: s, +var agentPool = sync.Pool{New: func() interface{} { return new(Agent)}} - Quit: make(chan *Agent), +func NewAgent(s components.IServer) *Agent { + a := agentPool.Get().(*Agent) + a.Server = s - nextCheckTime: 0, - lastHeartCheckTime: utils.Timex(), - heartTimeoutCount: 0, - } + a.nextCheckTime = 0 + a.lastHeartCheckTime = utils.Timex() + a.heartTimeoutCount= 0 + return a } func (c *Agent) OnConnection(conn components.IConnection) { @@ -45,16 +42,19 @@ func (c *Agent) OnMessage(msg components.IMessage) { atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex()) md := c.Server.GetAction(msg.GetHeader().GetMsgID()) if md == nil { - logger.Debug("cmd: %d, md is nil", msg.GetHeader().GetMsgID()) + logger.Debug("cmd: %d, handler is nil", msg.GetHeader().GetMsgID()) return } - - logger.Debug("protocode handler: %d", msg.GetHeader().GetMsgID()) - //fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg) + logger.Debug("protocolID: %d", msg.GetHeader().GetMsgID()) + //fmt.Printf("errCode: %d, protoMsg:%v\n", errCode, protoMsg) f := md.(func (msg components.IMessage) (int32, interface{})) - errCode, protomsg := f(msg) - rsp, err := proto.Marshal(protomsg.(proto.Message)) + errCode, protoMsg := f(msg) + if protoMsg == nil { + return + } + + rsp, err := proto.Marshal(protoMsg.(proto.Message)) if err != nil { conn := msg.GetSession() if conn != nil { @@ -65,9 +65,9 @@ func (c *Agent) OnMessage(msg components.IMessage) { conn := msg.GetSession() if conn != nil { conn.Send(errCode, msg.GetHeader().GetMsgID(), rsp) + return } - return - logger.Error("protocode not handler: %d", msg.GetHeader().GetMsgID()) + logger.Error("protocol not handler: %d", msg.GetHeader().GetMsgID()) } func (c *Agent) OnTimer() { @@ -91,6 +91,8 @@ func (c *Agent) OnClose() { } func (c *Agent) Close() { + agentPool.Put(c) + if c.Role == nil { return } @@ -100,7 +102,7 @@ func (c *Agent) Close() { func (c *Agent) checkHeartBeat(now int64) { lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime) - logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.GetID(), lastHeartCheckTime, now) + //logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.GetID(), lastHeartCheckTime, now) if math.Abs(float64(lastHeartCheckTime - now)) > common.HeartTimerInterval { c.heartTimeoutCount++ if c.heartTimeoutCount >= common.HeartTimeoutCountMax { diff --git a/cmd/gameserver/main.go b/cmd/gameserver/main.go index 7339cfb..c21302c 100644 --- a/cmd/gameserver/main.go +++ b/cmd/gameserver/main.go @@ -20,7 +20,7 @@ func main() { s,err1 := NewGameServer(common.GlobalConf.GameConf) if err1 != nil { - fmt.Errorf(err1.Error()) + logger.Error(err1) return } go func() { diff --git a/cmd/gameserver/plugin/plugin.go b/cmd/gameserver/plugin/plugin.go index e0ad9cf..58c1bf1 100644 --- a/cmd/gameserver/plugin/plugin.go +++ b/cmd/gameserver/plugin/plugin.go @@ -15,7 +15,7 @@ func init() { func GetActionMap() map[interface{}]interface{} { logger.Debug("init protocode...") am := make(map[interface{}]interface{}) - am[uint32(pb.ProtoCode_LoginReq)] = "LoginRpc" + am[uint32(pb.ProtoCode_LoginReq)] = LoginRpc return am } diff --git a/common/components/conn.go b/common/components/conn.go index 5f2fff6..19f7a18 100644 --- a/common/components/conn.go +++ b/common/components/conn.go @@ -6,6 +6,7 @@ import ( "net" "pro2d/common" "pro2d/common/logger" + "sync" "sync/atomic" "time" ) @@ -14,7 +15,7 @@ type Connection struct { IConnection net.Conn Server IServer - Id int + Id uint32 scanner *bufio.Scanner writer *bufio.Writer @@ -31,30 +32,49 @@ type Connection struct { Status uint32 } +var connectionPool = &sync.Pool{ + New: func() interface{} { return new(Connection)}, +} + func NewConn(id int, conn net.Conn, s IServer) *Connection { - c := &Connection{ - Id: id, - Conn: conn, - Server: s, - - scanner: bufio.NewScanner(conn), - writer: bufio.NewWriter(conn), - WBuffer: make(chan []byte, common.MaxMsgChan), - Quit: make(chan *Connection), - readFunc: make(chan func(), 10), - timerFunc: make(chan func(), 10), - - Status: 0, + c := connectionPool.Get().(*Connection) + closed := atomic.LoadUint32(&c.Status) + if closed != 0 { + connectionPool.Put(c) + c = new(Connection) } - c.connectionCallback = c.defaultConnectionCallback - c.messageCallback = c.defaultMessageCallback - c.closeCallback = c.defaultCloseCallback - c.timerCallback = c.defaultTimerCallback + + atomic.StoreUint32(&c.Id, uint32(id)) + c.Conn = conn + c.Server = s + + c.scanner = bufio.NewScanner(conn) + c.writer = bufio.NewWriter(conn) + + c.reset() + return c } -func (c *Connection) GetID() int { - return c.Id +func (c *Connection) reset() { + c.WBuffer = make(chan []byte, common.MaxMsgChan) + c.Quit = make(chan *Connection) + + if c.readFunc == nil { + c.readFunc = make(chan func(), 10) + } + if c.timerFunc == nil { + c.timerFunc = make(chan func(), 10) + } + + //c.connectionCallback = c.defaultConnectionCallback + //c.messageCallback = c.defaultMessageCallback + //c.closeCallback = c.defaultCloseCallback + //c.timerCallback = c.defaultTimerCallback +} + +func (c *Connection) GetID() uint32 { + return atomic.LoadUint32(&c.Id) } func (c *Connection) SetConnectionCallback(cb ConnectionCallback) { @@ -78,12 +98,14 @@ func (c *Connection) Start() { go c.read() go c.listen() - c.Status = 1 + atomic.StoreUint32(&c.Status, 1) c.connectionCallback(c) c.handleTimeOut() } func (c *Connection) Stop() { + if atomic.LoadUint32(&c.Status) == 0 { return } + sendTimeout := time.NewTimer(5 * time.Millisecond) defer sendTimeout.Stop() // 发送超时 @@ -125,7 +147,10 @@ func (c *Connection) defaultTimerCallback(conn IConnection) { } func (c *Connection) write() { - defer c.quitting() + defer func() { + logger.Debug("write close") + c.Stop() + }() for msg := range c.WBuffer { n, err := c.writer.Write(msg) @@ -141,7 +166,11 @@ func (c *Connection) write() { } func (c *Connection) read() { - defer c.quitting() + defer func() { + logger.Debug("read close") + c.Stop() + }() + c.scanner.Split(c.Server.GetSplitter().ParseMsg) for c.scanner.Scan() { @@ -164,7 +193,10 @@ func (c *Connection) read() { //此设计目的是为了让网络数据与定时器处理都在一条协程里处理。不想加锁。。。 func (c *Connection) listen() { - defer c.quitting() + defer func() { + logger.Debug("listen close") + c.quitting() + }() for { select { @@ -179,6 +211,8 @@ func (c *Connection) listen() { } func (c *Connection) handleTimeOut() { + if atomic.LoadUint32(&c.Status) == 0 { return } + c.timerFunc <- func() { c.timerCallback(c) } @@ -186,16 +220,16 @@ func (c *Connection) handleTimeOut() { } func (c *Connection) quitting() { - closed := atomic.LoadUint32(&c.Status) - if closed == 0 { - return - } + if atomic.LoadUint32(&c.Status) == 0 { return } atomic.StoreUint32(&c.Status, 0) logger.Debug("ID: %d close", c.Id) close(c.WBuffer) close(c.Quit) - close(c.readFunc) + c.Conn.Close() c.closeCallback(c) + + //放回到对象池 + connectionPool.Put(c) } diff --git a/common/components/connmanage.go b/common/components/connmanage.go index c26c65a..d48564b 100644 --- a/common/components/connmanage.go +++ b/common/components/connmanage.go @@ -4,29 +4,29 @@ import "sync" type ConnManage struct { mu sync.RWMutex - conns map[int]IConnection + conns map[uint32]IConnection } func NewConnManage() *ConnManage { return &ConnManage{ mu: sync.RWMutex{}, - conns: make(map[int]IConnection), + conns: make(map[uint32]IConnection), } } -func (c *ConnManage) AddConn(id int, connection IConnection) { +func (c *ConnManage) AddConn(id uint32, connection IConnection) { c.mu.Lock() defer c.mu.Unlock() c.conns[id] = connection } -func (c *ConnManage) GetConn(id int) IConnection { +func (c *ConnManage) GetConn(id uint32) IConnection { c.mu.RLock() defer c.mu.RUnlock() return c.conns[id] } -func (c *ConnManage) DelConn(id int) IConnection { +func (c *ConnManage) DelConn(id uint32) IConnection { c.mu.Lock() defer c.mu.Unlock() conn := c.conns[id] @@ -35,14 +35,12 @@ func (c *ConnManage) DelConn(id int) IConnection { } func (c *ConnManage) Range(f func(key interface{}, value interface{}) bool) { + c.mu.Lock() + defer c.mu.Unlock() for k, v := range c.conns { - c.mu.Lock() if ok := f(k, v); !ok { - c.mu.Unlock() return } - c.mu.Unlock() - } } diff --git a/common/components/icompontents.go b/common/components/icompontents.go index 68ae9e6..00cf8e3 100644 --- a/common/components/icompontents.go +++ b/common/components/icompontents.go @@ -36,7 +36,7 @@ type ( TimerCallback func(IConnection) //链接 IConnection interface { - GetID() int + GetID() uint32 Start() Stop() Send(code int32, cmd uint32, b []byte) error @@ -48,9 +48,9 @@ type ( } //connManage IConnManage interface { - AddConn(id int, connection IConnection) - GetConn(id int) IConnection - DelConn(id int) IConnection + AddConn(id uint32, connection IConnection) + GetConn(id uint32) IConnection + DelConn(id uint32) IConnection Range(f func(key interface{}, value interface{}) bool) StopAllConns() } diff --git a/common/components/server.go b/common/components/server.go index 610fda9..70d74ea 100644 --- a/common/components/server.go +++ b/common/components/server.go @@ -137,7 +137,7 @@ func (s *Server) Start() error { func (s *Server) Stop() { StopTimer() - + s.connManage.StopAllConns() } func (s *Server) newConnection(conn IConnection) { @@ -146,7 +146,7 @@ func (s *Server) newConnection(conn IConnection) { conn.SetMessageCallback(s.messageCallback) conn.SetTimerCallback(s.timerCallback) - go conn.Start() + conn.Start() } func (s *Server) removeConnection(conn IConnection) { diff --git a/common/components/timerwheel.go b/common/components/timerwheel.go index 6286d23..5af8ef3 100644 --- a/common/components/timerwheel.go +++ b/common/components/timerwheel.go @@ -2,7 +2,6 @@ package components import ( "container/list" - "pro2d/common" "sync" "sync/atomic" "time" @@ -16,6 +15,10 @@ const ( TimeLevel = 1 << TimeLevelShift TimeNearMask = TimeNear - 1 TimeLevelMask = TimeLevel - 1 + + //协程池 大小 + WorkerPoolSize = 10 + MaxTaskPerWorker = 20 ) type bucket struct { @@ -74,14 +77,16 @@ type TimeWheel struct { WorkPool *WorkPool exit chan struct{} + exitFlag uint32 } func NewTimeWheel() *TimeWheel { tw := &TimeWheel{ tick: 10*time.Millisecond, time: 0, - WorkPool: NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker), + WorkPool: NewWorkPool(WorkerPoolSize, MaxTaskPerWorker), exit: make(chan struct{}), + exitFlag: 0, } for i :=0; i < TimeNear; i++ { tw.near[i] = newBucket() @@ -183,6 +188,12 @@ func (tw *TimeWheel) Start() { } func (tw *TimeWheel) Stop() { + flag := atomic.LoadUint32(&tw.exitFlag) + if flag != 0 { + return + } + + atomic.StoreUint32(&tw.exitFlag, 1) close(tw.exit) } diff --git a/common/const.go b/common/const.go index 1d9a08f..72dc3e3 100644 --- a/common/const.go +++ b/common/const.go @@ -1,21 +1,10 @@ package common const ( - //协程池 大小 - WorkerPoolSize = 10 - MaxTaskPerWorker = 100 - //最大包大 MaxPacketLength = 10 * 1024 * 1024 MaxMsgChan = 100 - //jwt - Pro2DTokenSignedString = "Pro2DSecret" - - //定时器 - TickMS = 10 - WheelSize = 3600 - //心跳 HeartTimerInterval = 5 //s HeartTimeoutCountMax = 20 //最大超时次数 diff --git a/doc/plugin.md b/doc/plugin.md index f16f57a..879dfe8 100644 --- a/doc/plugin.md +++ b/doc/plugin.md @@ -33,6 +33,7 @@ am[uint32(1)] = HotRpc 2. 增加函数`HotRpc` ``` func HotRpc(msg components.IMessage) (int32, interface{}) { + return 0, nil } ``` -- libgit2 0.21.2