diff --git a/Makefile b/Makefile index be4b6d1..bb091e1 100644 --- a/Makefile +++ b/Makefile @@ -9,14 +9,14 @@ gen: test: go run cmd/test/client.go http: - go run -race cmd/httpserver/*.go + go run -race cmd/httpserver/http.go cmd/httpserver/AccountAction.go game: go run -race cmd/gameserver/*.go build: go build -race -o bin/account cmd/http.go - go build -race -o bin/game cmd/game.go - go build -race -o bin/test test/client.go + go build -race -o bin/game cmd/gameserver/*.go + go build -race -o bin/test cmd/test/client.go regame:plugin lsof -i:8849 | grep "game" | grep -v grep | awk '{print $$2}' | xargs -I {} kill -USR1 {} diff --git a/cmd/gameserver/agent.go b/cmd/gameserver/agent.go index f64a38b..919be39 100644 --- a/cmd/gameserver/agent.go +++ b/cmd/gameserver/agent.go @@ -19,8 +19,7 @@ type Agent struct { Role *models.RoleModel - readFunc chan func() - timerFunc chan func() + Quit chan *Agent nextCheckTime int64 //下一次检查的时间 @@ -31,8 +30,7 @@ type Agent struct { func NewAgent(s components.IServer) *Agent { return &Agent{ Server: s, - readFunc: make(chan func(), 10), - timerFunc: make(chan func(), 10), + Quit: make(chan *Agent), nextCheckTime: 0, @@ -41,53 +39,51 @@ func NewAgent(s components.IServer) *Agent { } } -func (c *Agent) listen() { - defer c.Close() - for { - select { - case timerFunc := <- c.timerFunc: - timerFunc() - case readFunc := <- c.readFunc: - readFunc() - case <- c.Quit: - return - } - } -} - func (c *Agent) OnConnection(conn components.IConnection) { c.IConnection = conn - go c.listen() } func (c *Agent) OnMessage(msg components.IMessage) { - f := func() { - atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex()) - if md, ok := components.ActionMap[pb.ProtoCode(msg.GetHeader().GetMsgID())]; ok { - logger.Debug("protocode handler: %d", msg.GetHeader().GetMsgID()) - errCode, protomsg := md(msg) - rsp, err := proto.Marshal(protomsg) - fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg) - if err != nil { - conn := c.Server.GetIConnection(msg.GetSessId()) - if conn != nil { - conn.Send(-100, msg.GetHeader().GetMsgID(), nil) - } - return - } + atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex()) + if md, ok := components.ActionMap[pb.ProtoCode(msg.GetHeader().GetMsgID())]; ok { + logger.Debug("protocode handler: %d", msg.GetHeader().GetMsgID()) + errCode, protomsg := md(msg) + rsp, err := proto.Marshal(protomsg) + fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg) + if err != nil { conn := c.Server.GetIConnection(msg.GetSessId()) if conn != nil { - conn.Send(errCode, msg.GetHeader().GetMsgID(), rsp) + conn.Send(-100, msg.GetHeader().GetMsgID(), nil) } return } - logger.Error("protocode not handler: %d", msg.GetHeader().GetMsgID()) + conn := c.Server.GetIConnection(msg.GetSessId()) + if conn != nil { + conn.Send(errCode, msg.GetHeader().GetMsgID(), rsp) + } + return + } + logger.Error("protocode not handler: %d", msg.GetHeader().GetMsgID()) +} + +func (c *Agent) OnTimer() { + nextCheckTime := atomic.LoadInt64(&c.nextCheckTime) + now := utils.Timex() + if now >= nextCheckTime { + //检查心跳 + c.checkHeartBeat(now) + nextCheckTime = now + common.HeartTimerInterval + atomic.StoreInt64(&c.nextCheckTime, nextCheckTime) + } + + if c.Role != nil { + //role 恢复数据 + c.Role.OnRecoverTimer(now) } - c.readFunc <- f } func (c *Agent) OnClose() { - c.Quit <- c + c.Close() } func (c *Agent) Close() { @@ -112,21 +108,3 @@ func (c *Agent) checkHeartBeat(now int64) { c.heartTimeoutCount = 0 } } - -func (c *Agent) update() { - nextCheckTime := atomic.LoadInt64(&c.nextCheckTime) - now := utils.Timex() - if now >= nextCheckTime { - //检查心跳 - c.checkHeartBeat(now) - nextCheckTime = now + common.HeartTimerInterval - atomic.StoreInt64(&c.nextCheckTime, nextCheckTime) - } - - c.timerFunc <- func() { - if c.Role != nil { - //role 恢复数据 - c.Role.OnRecoverTimer(now) - } - } -} diff --git a/cmd/gameserver/game.go b/cmd/gameserver/game.go index 1c93c73..5d26a48 100644 --- a/cmd/gameserver/game.go +++ b/cmd/gameserver/game.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "net/http" _ "net/http/pprof" "os" "os/signal" @@ -15,7 +16,6 @@ import ( "pro2d/utils/logger" "sync" "syscall" - "time" ) type GameServer struct { @@ -33,6 +33,7 @@ func NewGameServer(sconf *conf.SConf) (*GameServer, error) { s.SetConnectionCallback(s.OnConnection) s.SetMessageCallback(s.OnMessage) s.SetCloseCallback(s.OnClose) + s.SetTimerCallback(s.OnTimer) //mongo 初始化 db.MongoDatabase = db.MongoClient.Database(sconf.DBName) @@ -46,7 +47,6 @@ func NewGameServer(sconf *conf.SConf) (*GameServer, error) { } s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5) - go s.handleTimeOut() return s, nil } @@ -64,6 +64,14 @@ func (s *GameServer) OnMessage(msg components.IMessage) { agent.(*Agent).OnMessage(msg) } +func (s *GameServer) OnTimer(conn components.IConnection) { + agent, ok := s.Agents.Load(conn.GetID()) + if !ok { + return + } + agent.(*Agent).OnTimer() +} + func (s *GameServer) OnClose(conn components.IConnection) { agent, ok := s.Agents.Load(conn.GetID()) if !ok { @@ -80,16 +88,6 @@ func (s *GameServer) Stop() { } -func (s *GameServer) handleTimeOut() { - s.Agents.Range(func(key, value interface{}) bool { - agent := value.(*Agent) - agent.update() - return true - }) - - components.TimeOut(1*time.Second, s.handleTimeOut) -} - func main() { err := make(chan error) stopChan := make(chan os.Signal) @@ -103,6 +101,9 @@ func main() { fmt.Errorf(err1.Error()) return } + go func() { + err <- http.ListenAndServe("localhost:6061", nil) + }() go func() { err <- s.Start() diff --git a/cmd/httpserver/http.go b/cmd/httpserver/http.go index b4df3cf..8db21bd 100644 --- a/cmd/httpserver/http.go +++ b/cmd/httpserver/http.go @@ -35,7 +35,8 @@ func (s *AccountServer) Start() error { return err } s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.AccountConf.Name, conf.GlobalConf.AccountConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.AccountConf.IP, conf.GlobalConf.AccountConf.Port), 5) - return nil + + return s.IHttp.Start() } func main() { @@ -43,7 +44,7 @@ func main() { stopChan := make(chan os.Signal) signal.Notify(stopChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) - web := NewAccountServer("v1") + web := NewAccountServer("v1", fmt.Sprintf(":%d", conf.GlobalConf.AccountConf.Port)) web.BindHandler(&AccountAction{HttpServer: web}) go func() { err <- web.Start() diff --git a/common/components/conn.go b/common/components/conn.go index c636289..29a2b75 100644 --- a/common/components/conn.go +++ b/common/components/conn.go @@ -2,10 +2,11 @@ package components import ( "bufio" - "errors" + "fmt" "net" "pro2d/common" "pro2d/utils/logger" + "sync/atomic" "time" ) @@ -20,24 +21,37 @@ type Connection struct { writer *bufio.Writer WBuffer chan []byte Quit chan *Connection + readFunc chan func() + timerFunc chan func() messageCallback MessageCallback connectionCallback ConnectionCallback closeCallback CloseCallback timerCallback TimerCallback + + Status uint32 } func NewConn(id int, conn net.Conn, s IServer) *Connection { - return &Connection{ - Id: id, - Conn: conn, - Server: s, - - scanner: bufio.NewScanner(conn), - writer: bufio.NewWriter(conn), - WBuffer: make(chan []byte, common.MaxMsgChan), - Quit: make(chan *Connection), + c := &Connection{ + Id: id, + Conn: conn, + Server: s, + + scanner: bufio.NewScanner(conn), + writer: bufio.NewWriter(conn), + WBuffer: make(chan []byte, common.MaxMsgChan), + Quit: make(chan *Connection), + readFunc: make(chan func(), 10), + timerFunc: make(chan func(), 10), + + Status: 0, } + c.connectionCallback = c.defaultConnectionCallback + c.messageCallback = c.defaultMessageCallback + c.closeCallback = c.defaultCloseCallback + c.timerCallback = c.defaultTimerCallback + return c } func (c *Connection) GetID() int { @@ -56,6 +70,22 @@ func (c *Connection) SetCloseCallback(cb CloseCallback) { c.closeCallback = cb } +func (c *Connection) SetTimerCallback(cb TimerCallback) { + c.timerCallback = cb +} + +func (c *Connection) defaultConnectionCallback(conn IConnection) { +} + +func (c *Connection) defaultMessageCallback(msg IMessage) { +} + +func (c *Connection) defaultCloseCallback(conn IConnection) { +} + +func (c *Connection) defaultTimerCallback(conn IConnection) { +} + func (c *Connection) write() { defer c.Stop() @@ -73,7 +103,7 @@ func (c *Connection) write() { } func (c *Connection) read() { - defer c.Stop() + defer c.Quitting() c.scanner.Split(c.Server.GetSplitter().ParseMsg) for c.scanner.Scan() { @@ -83,7 +113,9 @@ func (c *Connection) read() { } req.SetSessId(c.Id) - c.messageCallback(req) + c.readFunc <- func() { + c.messageCallback(req) + } } if err := c.scanner.Err(); err != nil { @@ -92,14 +124,53 @@ func (c *Connection) read() { } } +func (c *Connection) listen(){ + defer c.Stop() + + for { + select { + case timerFunc := <- c.timerFunc: + timerFunc() + case readFunc := <- c.readFunc: + readFunc() + case <- c.Quit: + return + } + } +} + + +func (c *Connection) handleTimeOut() { + c.timerFunc <- func() { + c.timerCallback(c) + } + TimeOut(1*time.Second, c.handleTimeOut) +} + +func (c *Connection) Quitting() { + c.Quit <- c +} + func (c *Connection) Start() { - c.connectionCallback(c) go c.write() go c.read() + go c.listen() + + c.Status = 1 + c.connectionCallback(c) + c.handleTimeOut() } func (c *Connection) Stop() { logger.Debug("ID: %d close", c.Id) + closed := atomic.LoadUint32(&c.Status) + if closed == 0 { + return + } + atomic.StoreUint32(&c.Status, 0) + + close(c.WBuffer) + close(c.Quit) c.Conn.Close() c.closeCallback(c) } @@ -115,8 +186,9 @@ func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error{ // 发送超时 select { case <-sendTimeout.C: - return errors.New("send buff msg timeout") + return fmt.Errorf("send buff msg timeout") case c.WBuffer <- buf: return nil } } + diff --git a/common/components/icompontents.go b/common/components/icompontents.go index c66dcc3..2b0d701 100644 --- a/common/components/icompontents.go +++ b/common/components/icompontents.go @@ -29,6 +29,11 @@ type ISplitter interface { GetHeadLen() uint32 } +type ConnectionCallback func(IConnection) +type CloseCallback func(IConnection) +type MessageCallback func(IMessage) +type TimerCallback func(IConnection) + //链接 type IConnection interface { GetID() int @@ -39,13 +44,9 @@ type IConnection interface { SetConnectionCallback(ConnectionCallback) SetMessageCallback(MessageCallback) SetCloseCallback(CloseCallback) + SetTimerCallback(TimerCallback) } -type ConnectionCallback func(IConnection) -type CloseCallback func(IConnection) -type MessageCallback func(IMessage) -type TimerCallback func(IConnection) - //server type IServer interface { Start() error diff --git a/common/components/server.go b/common/components/server.go index 8cdb3d5..033cd8d 100644 --- a/common/components/server.go +++ b/common/components/server.go @@ -16,9 +16,10 @@ var ActionMap map[pb.ProtoCode]ActionHandler type Server struct { IServer - connectionCallback ConnectionCallback - messageCallback MessageCallback - closeCallback CloseCallback + connectionCallback ConnectionCallback + messageCallback MessageCallback + closeCallback CloseCallback + timerCallback TimerCallback splitter ISplitter @@ -62,6 +63,7 @@ func (s *Server) SetCloseCallback(cb CloseCallback) { } func (s *Server) SetTimerCallback(cb TimerCallback) { + s.timerCallback = cb } func (s *Server) newConnection(conn IConnection) { @@ -70,6 +72,7 @@ func (s *Server) newConnection(conn IConnection) { conn.SetConnectionCallback(s.connectionCallback) conn.SetCloseCallback(s.removeConnection) conn.SetMessageCallback(s.messageCallback) + conn.SetTimerCallback(s.timerCallback) go conn.Start() } diff --git a/conf/conf.yaml b/conf/conf.yaml index 5f55514..d1277b8 100644 --- a/conf/conf.yaml +++ b/conf/conf.yaml @@ -20,7 +20,7 @@ server_account: id: "1" name: "account" ip: "192.168.0.206" - port: 8848 + port: 8858 dbname: "account" pool_size: 1 diff --git a/go.mod b/go.mod index f4cb55b..d5746d8 100644 --- a/go.mod +++ b/go.mod @@ -5,16 +5,13 @@ go 1.17 require ( github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 - github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/garyburd/redigo v1.6.3 github.com/gin-gonic/gin v1.7.7 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/protobuf v1.5.2 - github.com/ouqiang/timewheel v1.0.1 go.etcd.io/etcd/api/v3 v3.5.2 go.etcd.io/etcd/client/v3 v3.5.2 go.mongodb.org/mongo-driver v1.8.3 - google.golang.org/grpc v1.38.0 google.golang.org/protobuf v1.27.1 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) @@ -37,7 +34,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/robfig/cron v1.2.0 // indirect github.com/ugorji/go/codec v1.1.7 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.0.2 // indirect @@ -53,7 +49,7 @@ require ( golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 // indirect golang.org/x/text v0.3.5 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect - gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + google.golang.org/grpc v1.38.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 8619ac7..22e8779 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,5 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7oFtbDZXC4dnY093M1kZx6k/95sen92gafbY= github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10= @@ -27,8 +26,6 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= -github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -130,8 +127,6 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/ouqiang/timewheel v1.0.1 h1:XxhrYwqhJ3z8nthEnhZcHyZ/dcE29ACJEJR3Ika0W2g= -github.com/ouqiang/timewheel v1.0.1/go.mod h1:896mz+8zvRU6i0PLVR0qaNuU5roxC874OB4TxUvUewY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -153,8 +148,6 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= -github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -317,8 +310,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= -gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/heap.out b/heap.out new file mode 100644 index 0000000..a14af69 Binary files /dev/null and b/heap.out differ -- libgit2 0.21.2