Commit 0e5d52de758452c895c98fcab944663a9dc8f6bf

Authored by zhangqijia
1 parent 7f269318

reactor: 重构底层框架1.0

Showing 46 changed files with 830 additions and 537 deletions   Show diff stats
Makefile
... ... @@ -7,12 +7,12 @@ gen:
7 7 protoc-go-inject-tag -input=./pb/*.pb.go
8 8  
9 9 test:
10   - go run test/client.go
  10 + go run cmd/test/client.go
11 11 http:
12   - go run -race cmd/http.go
  12 + go run -race cmd/httpserver/*.go
13 13  
14 14 game:
15   - go run -race cmd/game.go
  15 + go run -race cmd/gameserver/*.go
16 16 build:
17 17 go build -race -o bin/account cmd/http.go
18 18 go build -race -o bin/game cmd/game.go
... ...
cmd/game.go deleted
... ... @@ -1,41 +0,0 @@
1   -package main
2   -
3   -import (
4   - _ "net/http/pprof"
5   - "os"
6   - "os/signal"
7   - "pro2d/conf"
8   - "pro2d/src/components/logger"
9   - "pro2d/src/components/net"
10   - _ "pro2d/src/plugin"
11   - "syscall"
12   -)
13   -
14   -func main() {
15   - err := make(chan error)
16   - stopChan := make(chan os.Signal)
17   - signal.Notify(stopChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
18   -
19   - userChan := make(chan os.Signal)
20   - signal.Notify(userChan, syscall.SIGUSR1, syscall.SIGUSR2)
21   -
22   - s := net.NewServer(conf.GlobalConf.GameConf)
23   - go func() {
24   - err <- s.Start()
25   - }()
26   -
27   - for {
28   - select {
29   - case e := <- err:
30   - logger.Error("game server error: %v", e)
31   - return
32   - case <-stopChan:
33   - s.Stop()
34   - logger.Debug("game stop...")
35   - return
36   - case u := <-userChan:
37   - logger.Debug("userChan .. %v",u.String())
38   - s.LoadPlugin()
39   - }
40   - }
41   -}
cmd/gameserver/agent.go 0 → 100644
... ... @@ -0,0 +1,132 @@
  1 +package main
  2 +
  3 +import (
  4 + "fmt"
  5 + "github.com/golang/protobuf/proto"
  6 + "math"
  7 + "pro2d/common"
  8 + "pro2d/common/components"
  9 + "pro2d/models"
  10 + "pro2d/pb"
  11 + "pro2d/utils"
  12 + "pro2d/utils/logger"
  13 + "sync/atomic"
  14 +)
  15 +
  16 +type Agent struct {
  17 + components.IConnection
  18 + Server components.IServer
  19 +
  20 + Role *models.RoleModel
  21 +
  22 + readFunc chan func()
  23 + timerFunc chan func()
  24 + Quit chan *Agent
  25 +
  26 + nextCheckTime int64 //下一次检查的时间
  27 + lastHeartCheckTime int64
  28 + heartTimeoutCount int //超时次数
  29 +}
  30 +
  31 +func NewAgent(s components.IServer) *Agent {
  32 + return &Agent{
  33 + Server: s,
  34 + readFunc: make(chan func(), 10),
  35 + timerFunc: make(chan func(), 10),
  36 + Quit: make(chan *Agent),
  37 +
  38 + nextCheckTime: 0,
  39 + lastHeartCheckTime: utils.Timex(),
  40 + heartTimeoutCount: 0,
  41 + }
  42 +}
  43 +
  44 +func (c *Agent) listen() {
  45 + defer c.Close()
  46 + for {
  47 + select {
  48 + case timerFunc := <- c.timerFunc:
  49 + timerFunc()
  50 + case readFunc := <- c.readFunc:
  51 + readFunc()
  52 + case <- c.Quit:
  53 + return
  54 + }
  55 + }
  56 +}
  57 +
  58 +func (c *Agent) OnConnection(conn components.IConnection) {
  59 + c.IConnection = conn
  60 + go c.listen()
  61 +}
  62 +
  63 +func (c *Agent) OnMessage(msg components.IMessage) {
  64 + f := func() {
  65 + atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex())
  66 + if md, ok := components.ActionMap[pb.ProtoCode(msg.GetHeader().GetMsgID())]; ok {
  67 + logger.Debug("protocode handler: %d", msg.GetHeader().GetMsgID())
  68 + errCode, protomsg := md(msg)
  69 + rsp, err := proto.Marshal(protomsg)
  70 + fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg)
  71 + if err != nil {
  72 + conn := c.Server.GetIConnection(msg.GetSessId())
  73 + if conn != nil {
  74 + conn.Send(-100, msg.GetHeader().GetMsgID(), nil)
  75 + }
  76 + return
  77 + }
  78 + conn := c.Server.GetIConnection(msg.GetSessId())
  79 + if conn != nil {
  80 + conn.Send(errCode, msg.GetHeader().GetMsgID(), rsp)
  81 + }
  82 + return
  83 + }
  84 + logger.Error("protocode not handler: %d", msg.GetHeader().GetMsgID())
  85 + }
  86 + c.readFunc <- f
  87 +}
  88 +
  89 +func (c *Agent) OnClose() {
  90 + c.Quit <- c
  91 +}
  92 +
  93 +func (c *Agent) Close() {
  94 + if c.Role == nil {
  95 + return
  96 + }
  97 +
  98 + c.Role.OnOfflineEvent()
  99 +}
  100 +
  101 +func (c *Agent) checkHeartBeat(now int64) {
  102 + lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime)
  103 + logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.GetID(), lastHeartCheckTime, now)
  104 + if math.Abs(float64(lastHeartCheckTime - now)) > common.HeartTimerInterval {
  105 + c.heartTimeoutCount++
  106 + if c.heartTimeoutCount >= common.HeartTimeoutCountMax {
  107 + c.Stop()
  108 + return
  109 + }
  110 + logger.Debug("timeout count: %d", c.heartTimeoutCount)
  111 + }else {
  112 + c.heartTimeoutCount = 0
  113 + }
  114 +}
  115 +
  116 +func (c *Agent) update() {
  117 + nextCheckTime := atomic.LoadInt64(&c.nextCheckTime)
  118 + now := utils.Timex()
  119 + if now >= nextCheckTime {
  120 + //检查心跳
  121 + c.checkHeartBeat(now)
  122 + nextCheckTime = now + common.HeartTimerInterval
  123 + atomic.StoreInt64(&c.nextCheckTime, nextCheckTime)
  124 + }
  125 +
  126 + c.timerFunc <- func() {
  127 + if c.Role != nil {
  128 + //role 恢复数据
  129 + c.Role.OnRecoverTimer(now)
  130 + }
  131 + }
  132 +}
... ...
cmd/gameserver/game.go 0 → 100644
... ... @@ -0,0 +1,125 @@
  1 +package main
  2 +
  3 +import (
  4 + "context"
  5 + "fmt"
  6 + _ "net/http/pprof"
  7 + "os"
  8 + "os/signal"
  9 + "pro2d/common/components"
  10 + "pro2d/conf"
  11 + "pro2d/models"
  12 + _ "pro2d/plugin"
  13 + "pro2d/utils/db"
  14 + "pro2d/utils/etcd"
  15 + "pro2d/utils/logger"
  16 + "sync"
  17 + "syscall"
  18 + "time"
  19 +)
  20 +
  21 +type GameServer struct {
  22 + components.IServer
  23 + EtcdClient *etcd.EtcdClient
  24 +
  25 + Agents *sync.Map
  26 +}
  27 +
  28 +func NewGameServer(sconf *conf.SConf) (*GameServer, error) {
  29 + s := &GameServer{
  30 + IServer: components.NewServer(sconf.Port, conf.GlobalConf.GameConf.PluginPath, components.NewPBSplitter()),
  31 + Agents: new(sync.Map),
  32 + }
  33 + s.SetConnectionCallback(s.OnConnection)
  34 + s.SetMessageCallback(s.OnMessage)
  35 + s.SetCloseCallback(s.OnClose)
  36 +
  37 + //mongo 初始化
  38 + db.MongoDatabase = db.MongoClient.Database(sconf.DBName)
  39 + models.InitGameServerModels()
  40 +
  41 + //Etcd 初始化
  42 + var err error
  43 + s.EtcdClient, err = etcd.NewEtcdClient(conf.GlobalConf.Etcd)
  44 + if err != nil {
  45 + return nil, err
  46 + }
  47 + s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5)
  48 +
  49 + go s.handleTimeOut()
  50 + return s, nil
  51 +}
  52 +
  53 +func (s *GameServer) OnConnection(conn components.IConnection) {
  54 + agent := NewAgent(s)
  55 + agent.OnConnection(conn)
  56 + s.Agents.Store(conn.GetID(),agent)
  57 +}
  58 +
  59 +func (s *GameServer) OnMessage(msg components.IMessage) {
  60 + agent, ok := s.Agents.Load(msg.GetSessId())
  61 + if !ok {
  62 + return
  63 + }
  64 + agent.(*Agent).OnMessage(msg)
  65 +}
  66 +
  67 +func (s *GameServer) OnClose(conn components.IConnection) {
  68 + agent, ok := s.Agents.Load(conn.GetID())
  69 + if !ok {
  70 + return
  71 + }
  72 + agent.(*Agent).OnClose()
  73 + s.Agents.Delete(conn.GetID())
  74 +}
  75 +
  76 +func (s *GameServer) Stop() {
  77 + s.IServer.Stop()
  78 +
  79 + db.MongoClient.Disconnect(context.TODO())
  80 +}
  81 +
  82 +
  83 +func (s *GameServer) handleTimeOut() {
  84 + s.Agents.Range(func(key, value interface{}) bool {
  85 + agent := value.(*Agent)
  86 + agent.update()
  87 + return true
  88 + })
  89 +
  90 + components.TimeOut(1*time.Second, s.handleTimeOut)
  91 +}
  92 +
  93 +func main() {
  94 + err := make(chan error)
  95 + stopChan := make(chan os.Signal)
  96 + signal.Notify(stopChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
  97 +
  98 + userChan := make(chan os.Signal)
  99 + signal.Notify(userChan, syscall.SIGUSR1, syscall.SIGUSR2)
  100 +
  101 + s,err1 := NewGameServer(conf.GlobalConf.GameConf)
  102 + if err1 != nil {
  103 + fmt.Errorf(err1.Error())
  104 + return
  105 + }
  106 +
  107 + go func() {
  108 + err <- s.Start()
  109 + }()
  110 +
  111 + for {
  112 + select {
  113 + case e := <- err:
  114 + logger.Error("game server error: %v", e)
  115 + return
  116 + case <-stopChan:
  117 + s.Stop()
  118 + logger.Debug("game stop...")
  119 + return
  120 + case u := <-userChan:
  121 + logger.Debug("userChan .. %v",u.String())
  122 + //s.LoadPlugin()
  123 + }
  124 + }
  125 +}
... ...
src/actions/AccountAction.go renamed to cmd/httpserver/AccountAction.go
1   -package actions
  1 +package main
2 2  
3 3 import (
4 4 "github.com/gin-gonic/gin"
5 5 "pro2d/conf"
  6 + "pro2d/models"
6 7 "pro2d/pb"
7   - "pro2d/src/components/net"
8   - "pro2d/src/models"
9   - "pro2d/src/utils"
  8 + "pro2d/utils"
10 9 )
11 10  
12 11 type AccountAction struct {
13   - HttpServer *net.HttpServer
  12 + HttpServer *AccountServer
14 13 }
15 14  
16 15 func (h *AccountAction) Register(c *gin.Context) (int, interface{}){
... ...
cmd/http.go renamed to cmd/httpserver/http.go
1 1 package main
2 2  
3 3 import (
  4 + "fmt"
4 5 "os"
5 6 "os/signal"
  7 + "pro2d/common/components"
  8 + "pro2d/conf"
6 9 _ "pro2d/conf"
7   - "pro2d/src/actions"
8   - "pro2d/src/components/logger"
9   - "pro2d/src/components/net"
  10 + "pro2d/models"
  11 + "pro2d/utils/db"
  12 + "pro2d/utils/etcd"
  13 + "pro2d/utils/logger"
10 14 "syscall"
11 15 )
12 16  
  17 +type AccountServer struct {
  18 + components.IHttp
  19 + EtcdClient *etcd.EtcdClient
  20 +}
  21 +
  22 +func NewAccountServer(version string, port ...string) *AccountServer {
  23 + return &AccountServer{IHttp: components.NewHttpServer(version, port...)}
  24 +}
  25 +
  26 +func (s *AccountServer) Start() error {
  27 + //mongo 初始化
  28 + db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.AccountConf.DBName)
  29 + models.InitAccountServerModels()
  30 +
  31 + //Etcd 初始化
  32 + var err error
  33 + s.EtcdClient, err = etcd.NewEtcdClient(conf.GlobalConf.Etcd)
  34 + if err != nil {
  35 + return err
  36 + }
  37 + s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.AccountConf.Name, conf.GlobalConf.AccountConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.AccountConf.IP, conf.GlobalConf.AccountConf.Port), 5)
  38 + return nil
  39 +}
  40 +
13 41 func main() {
14 42 err := make(chan error)
15 43 stopChan := make(chan os.Signal)
16 44 signal.Notify(stopChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
17 45  
18   - web := net.NewHttpServer("v1")
19   - web.BindHandler(&actions.AccountAction{HttpServer: web})
  46 + web := NewAccountServer("v1")
  47 + web.BindHandler(&AccountAction{HttpServer: web})
20 48 go func() {
21 49 err <- web.Start()
22 50 }()
... ... @@ -26,5 +54,6 @@ func main() {
26 54 logger.Error("game server error: %v", e)
27 55 case <-stopChan:
28 56 logger.Debug("game stop")
  57 + web.Stop()
29 58 }
30 59 }
31 60 \ No newline at end of file
... ...
src/components/net/http_test.go renamed to cmd/httpserver/http_test.go
1   -package net
  1 +package main
2 2  
3 3 import (
4 4 "github.com/gin-gonic/gin"
  5 + "pro2d/common/components"
5 6 "testing"
6 7 )
7 8  
... ... @@ -16,7 +17,7 @@ func (h *HttpAction) PrintB(c *gin.Context) (int, interface{}) {
16 17 }
17 18  
18 19 func TestHttpServer_Start(t *testing.T) {
19   - web := NewHttpServer("v1")
  20 + web := components.NewHttpServer("v1")
20 21 web.BindHandler(&HttpAction{})
21 22 web.Start()
22 23 }
23 24 \ No newline at end of file
... ...
test/client.go renamed to cmd/test/client.go
... ... @@ -6,17 +6,17 @@ import (
6 6 "encoding/binary"
7 7 "github.com/golang/protobuf/proto"
8 8 "net"
  9 + "pro2d/common/components"
9 10 "pro2d/pb"
10   - "pro2d/src/components/logger"
11   - net2 "pro2d/src/components/net"
  11 + "pro2d/utils/logger"
12 12 "time"
13 13 )
14 14  
15 15 func main() {
16 16  
17   - head := &net2.Head{
  17 + head := &components.PBHead{
18 18 Length: 0,
19   - Cmd: int32(pb.ProtoCode_LoginReq),
  19 + Cmd: uint32(pb.ProtoCode_LoginReq),
20 20 ErrCode: 0,
21 21 PreField: 0,
22 22 }
... ... @@ -27,11 +27,11 @@ func main() {
27 27 }
28 28 l, _ :=proto.Marshal(loginReq)
29 29  
30   - b := net2.MsgPkg{
  30 + b := components.PBMessage{
31 31 Head: head,
32 32 Body: l,
33 33 }
34   - head.Length = int32(16 + len(b.Body))
  34 + head.Length = uint32(16 + len(b.Body))
35 35 buf := &bytes.Buffer{}
36 36 err := binary.Write(buf, binary.BigEndian, head)
37 37 if err != nil {
... ... @@ -46,7 +46,7 @@ func main() {
46 46 return
47 47 }
48 48  
49   - client, err := net.Dial("tcp", "localhost:8849")
  49 + client, err := net.Dial("tcp", "localhost:8850")
50 50 if err != nil {
51 51 logger.Error(err)
52 52 return
... ...
test/client_test.go renamed to cmd/test/client_test.go
common/components/conn.go 0 → 100644
... ... @@ -0,0 +1,122 @@
  1 +package components
  2 +
  3 +import (
  4 + "bufio"
  5 + "errors"
  6 + "net"
  7 + "pro2d/common"
  8 + "pro2d/utils/logger"
  9 + "time"
  10 +)
  11 +
  12 +
  13 +type Connection struct {
  14 + IConnection
  15 + net.Conn
  16 + Server IServer
  17 + Id int
  18 +
  19 + scanner *bufio.Scanner
  20 + writer *bufio.Writer
  21 + WBuffer chan []byte
  22 + Quit chan *Connection
  23 +
  24 + messageCallback MessageCallback
  25 + connectionCallback ConnectionCallback
  26 + closeCallback CloseCallback
  27 + timerCallback TimerCallback
  28 +}
  29 +
  30 +func NewConn(id int, conn net.Conn, s IServer) *Connection {
  31 + return &Connection{
  32 + Id: id,
  33 + Conn: conn,
  34 + Server: s,
  35 +
  36 + scanner: bufio.NewScanner(conn),
  37 + writer: bufio.NewWriter(conn),
  38 + WBuffer: make(chan []byte, common.MaxMsgChan),
  39 + Quit: make(chan *Connection),
  40 + }
  41 +}
  42 +
  43 +func (c *Connection) GetID() int {
  44 + return c.Id
  45 +}
  46 +
  47 +func (c *Connection) SetConnectionCallback(cb ConnectionCallback) {
  48 + c.connectionCallback = cb
  49 +}
  50 +
  51 +func (c *Connection) SetMessageCallback(cb MessageCallback) {
  52 + c.messageCallback = cb
  53 +}
  54 +
  55 +func (c *Connection) SetCloseCallback(cb CloseCallback) {
  56 + c.closeCallback = cb
  57 +}
  58 +
  59 +func (c *Connection) write() {
  60 + defer c.Stop()
  61 +
  62 + for msg := range c.WBuffer {
  63 + n, err := c.writer.Write(msg)
  64 + if err != nil{
  65 + logger.Error("write fail err: " + err.Error(), "n: ", n)
  66 + return
  67 + }
  68 + if err := c.writer.Flush(); err != nil {
  69 + logger.Error("write Flush fail err: " + err.Error())
  70 + return
  71 + }
  72 + }
  73 +}
  74 +
  75 +func (c *Connection) read() {
  76 + defer c.Stop()
  77 + c.scanner.Split(c.Server.GetSplitter().ParseMsg)
  78 +
  79 + for c.scanner.Scan() {
  80 + req, err := c.Server.GetSplitter().UnPack(c.scanner.Bytes())
  81 + if err != nil {
  82 + return
  83 + }
  84 +
  85 + req.SetSessId(c.Id)
  86 + c.messageCallback(req)
  87 + }
  88 +
  89 + if err := c.scanner.Err(); err != nil {
  90 + logger.Error("scanner.err: %s", err.Error())
  91 + return
  92 + }
  93 +}
  94 +
  95 +func (c *Connection) Start() {
  96 + c.connectionCallback(c)
  97 + go c.write()
  98 + go c.read()
  99 +}
  100 +
  101 +func (c *Connection) Stop() {
  102 + logger.Debug("ID: %d close", c.Id)
  103 + c.Conn.Close()
  104 + c.closeCallback(c)
  105 +}
  106 +
  107 +func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error{
  108 + buf, err := c.Server.GetSplitter().Pack(cmd, data, errCode, 0)
  109 + if err != nil {
  110 + return err
  111 + }
  112 +
  113 + sendTimeout := time.NewTimer(5 * time.Millisecond)
  114 + defer sendTimeout.Stop()
  115 + // 发送超时
  116 + select {
  117 + case <-sendTimeout.C:
  118 + return errors.New("send buff msg timeout")
  119 + case c.WBuffer <- buf:
  120 + return nil
  121 + }
  122 +}
... ...
src/components/net/http.go renamed to common/components/http.go
1   -package net
  1 +package components
2 2  
3 3 import (
4   - "fmt"
5 4 "github.com/gin-gonic/gin"
6 5 "net/http"
7   - "pro2d/conf"
8   - "pro2d/src/components/db"
9   - "pro2d/src/components/etcd"
10   - "pro2d/src/models"
11 6 "reflect"
12 7 "strings"
13 8 )
14 9  
15 10 type HttpServer struct {
  11 + IHttp
16 12 version string
17 13 port []string
18   - EtcdClient *etcd.EtcdClient
19   -
20 14 Handler interface{}
21 15 }
22 16  
... ... @@ -30,6 +24,10 @@ func NewHttpServer(version string, port ...string) *HttpServer {
30 24 return &HttpServer{version: version, port: port}
31 25 }
32 26  
  27 +func GetRoutePath(objName, objFunc string) string {
  28 + return strings.ToLower(objName + "/" + objFunc)
  29 +}
  30 +
33 31 func (h *HttpServer)HandlerFuncObj(tvl, obj reflect.Value) gin.HandlerFunc {
34 32 return func(c *gin.Context) {
35 33 v := tvl.Call([]reflect.Value{obj, reflect.ValueOf(c)})
... ... @@ -41,27 +39,11 @@ func (h *HttpServer)HandlerFuncObj(tvl, obj reflect.Value) gin.HandlerFunc {
41 39 }
42 40 }
43 41  
44   -func GetRoutePath(objName, objFunc string) string {
45   - return strings.ToLower(objName + "/" + objFunc)
46   -}
47   -
48 42 func (h *HttpServer) BindHandler(handler interface{}) {
49 43 h.Handler = handler
50 44 }
51 45  
52 46 func (h *HttpServer) Start() error {
53   - //mongo 初始化
54   - db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.AccountConf.DBName)
55   - models.InitAccountServerModels()
56   -
57   - //Etcd 初始化
58   - var err error
59   - h.EtcdClient, err = etcd.NewEtcdClient(conf.GlobalConf.Etcd)
60   - if err != nil {
61   - return err
62   - }
63   - h.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.AccountConf.Name, conf.GlobalConf.AccountConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.AccountConf.IP, conf.GlobalConf.AccountConf.Port), 5)
64   -
65 47 //gin初始化
66 48 r := gin.Default()
67 49 r.GET("/ping", Pong)
... ...
common/components/icompontents.go 0 → 100644
... ... @@ -0,0 +1,68 @@
  1 +package components
  2 +
  3 +//网络包头
  4 +type IHead interface {
  5 + GetDataLen() uint32 //获取消息数据段长度
  6 + GetMsgID() uint32 //获取消息ID
  7 + GetErrCode() int32 //获取消息错误码
  8 + GetPreserve() uint32 //获取预留数据
  9 +}
  10 +
  11 +//网络包
  12 +type IMessage interface {
  13 + IHead
  14 + GetHeader() IHead //获取消息头
  15 + SetHeader(header IHead) //设置消息头
  16 +
  17 + GetData() []byte //获取消息内容
  18 + SetData([]byte) //设置消息内容
  19 +
  20 + SetSessId(int) //设置连接id
  21 + GetSessId() int //获取连接id
  22 +}
  23 +
  24 +//网络拆包解包器
  25 +type ISplitter interface {
  26 + UnPack([]byte) (IMessage, error)
  27 + Pack(cmd uint32, data []byte, errcode int32, preserve uint32) ([]byte, error)
  28 + ParseMsg (data []byte, atEOF bool) (advance int, token []byte, err error)
  29 + GetHeadLen() uint32
  30 +}
  31 +
  32 +//链接
  33 +type IConnection interface {
  34 + GetID() int
  35 + Start()
  36 + Stop()
  37 + Send(code int32, cmd uint32, b []byte) error
  38 +
  39 + SetConnectionCallback(ConnectionCallback)
  40 + SetMessageCallback(MessageCallback)
  41 + SetCloseCallback(CloseCallback)
  42 +}
  43 +
  44 +type ConnectionCallback func(IConnection)
  45 +type CloseCallback func(IConnection)
  46 +type MessageCallback func(IMessage)
  47 +type TimerCallback func(IConnection)
  48 +
  49 +//server
  50 +type IServer interface {
  51 + Start() error
  52 + Stop()
  53 +
  54 + GetSplitter() ISplitter
  55 + GetIConnection(id int) IConnection
  56 +
  57 + SetConnectionCallback(ConnectionCallback)
  58 + SetMessageCallback(MessageCallback)
  59 + SetCloseCallback(CloseCallback)
  60 + SetTimerCallback(TimerCallback)
  61 +}
  62 +
  63 +//httpserver
  64 +type IHttp interface {
  65 + Start() error
  66 + Stop()
  67 + BindHandler(interface{})
  68 +}
0 69 \ No newline at end of file
... ...
common/components/pbsplitter.go 0 → 100644
... ... @@ -0,0 +1,134 @@
  1 +package components
  2 +
  3 +import (
  4 + "bytes"
  5 + "encoding/binary"
  6 + "fmt"
  7 + "pro2d/common"
  8 +)
  9 +
  10 +type PBHead struct {
  11 + Length uint32
  12 + Cmd uint32
  13 + ErrCode int32
  14 + PreField uint32
  15 +}
  16 +
  17 +func (h *PBHead) GetDataLen() uint32 {
  18 + return h.Length
  19 +}
  20 +
  21 +func (h *PBHead) GetMsgID() uint32 {
  22 + return h.Cmd
  23 +}
  24 +
  25 +func (h *PBHead) GetErrCode() int32 {
  26 + return h.ErrCode
  27 +}
  28 +
  29 +func (h *PBHead) GetPreserve() uint32 {
  30 + return h.PreField
  31 +}
  32 +
  33 +type PBMessage struct {
  34 + IMessage
  35 + Head IHead
  36 + Body []byte
  37 +
  38 + SessionID int
  39 +}
  40 +
  41 +
  42 +func (m *PBMessage) GetHeader() IHead {
  43 + return m.Head
  44 +}
  45 +
  46 +func (m *PBMessage) SetHeader(header IHead) {
  47 + m.Head = header
  48 +}
  49 +func (m *PBMessage) GetData() []byte {
  50 + return m.Body
  51 +}
  52 +
  53 +func (m *PBMessage) SetData(b []byte) {
  54 + m.Body = b
  55 +}
  56 +
  57 +func (m *PBMessage) SetSessId(id int) {
  58 + m.SessionID = id
  59 +}
  60 +
  61 +func (m *PBMessage) GetSessId() int {
  62 + return m.SessionID
  63 +}
  64 +
  65 +
  66 +type PBSplitter struct {}
  67 +
  68 +func NewPBSplitter() *PBSplitter {
  69 + return &PBSplitter{}
  70 +}
  71 +
  72 +func (m *PBSplitter) GetHeadLen() uint32 {
  73 + return uint32(binary.Size(PBHead{}))
  74 +}
  75 +
  76 +func (m *PBSplitter) UnPack(data []byte) (IMessage,error) {
  77 + h := &PBHead{}
  78 + err := binary.Read(bytes.NewReader(data), binary.BigEndian, h)
  79 + if err != nil {
  80 + return nil, err
  81 + }
  82 +
  83 + return &PBMessage{
  84 + Head: h,
  85 + Body: data[m.GetHeadLen():],
  86 + },nil
  87 +}
  88 +
  89 +func (m *PBSplitter) ParseMsg (data []byte, atEOF bool) (advance int, token []byte, err error) {
  90 + // 表示我们已经扫描到结尾了
  91 + if atEOF && len(data) == 0 {
  92 + return 0, nil, nil
  93 + }
  94 + if !atEOF && len(data) >= int(m.GetHeadLen()) { //4字节数据包长度 4字节指令
  95 + length := int32(0)
  96 + binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length)
  97 + if length <= 0 {
  98 + return 0, nil, fmt.Errorf("length is 0")
  99 + }
  100 +
  101 + if length > common.MaxPacketLength {
  102 + return 0, nil, fmt.Errorf("length exceeds maximum length")
  103 + }
  104 + if int(length) <= len(data) {
  105 + return int(length) , data[:int(length)], nil
  106 + }
  107 + return 0 , nil, nil
  108 + }
  109 + if atEOF {
  110 + return len(data), data, nil
  111 + }
  112 + return 0, nil, nil
  113 +}
  114 +
  115 +func (m *PBSplitter) Pack(cmd uint32, data []byte, errcode int32, preserve uint32) ([]byte, error) {
  116 + buf := &bytes.Buffer{}
  117 + h := &PBHead{
  118 + Length: m.GetHeadLen()+ uint32(len(data)),
  119 + Cmd: cmd,
  120 + ErrCode: errcode,
  121 + PreField: preserve,
  122 + }
  123 + err := binary.Write(buf, binary.BigEndian, h)
  124 + if err != nil {
  125 + return nil, err
  126 + }
  127 +
  128 + err = binary.Write(buf, binary.BigEndian, data)
  129 + if err != nil {
  130 + return nil, err
  131 + }
  132 +
  133 + return buf.Bytes(), nil
  134 +}
0 135 \ No newline at end of file
... ...
common/components/server.go 0 → 100644
... ... @@ -0,0 +1,126 @@
  1 +package components
  2 +
  3 +import (
  4 + "fmt"
  5 + "github.com/golang/protobuf/proto"
  6 + "net"
  7 + "plugin"
  8 + "pro2d/pb"
  9 + "pro2d/utils/logger"
  10 + "sync"
  11 +)
  12 +
  13 +type ActionHandler func (msg IMessage) (int32, proto.Message)
  14 +var ActionMap map[pb.ProtoCode]ActionHandler
  15 +
  16 +type Server struct {
  17 + IServer
  18 +
  19 + connectionCallback ConnectionCallback
  20 + messageCallback MessageCallback
  21 + closeCallback CloseCallback
  22 +
  23 + splitter ISplitter
  24 +
  25 + port int
  26 + PluginPath string
  27 + Clients *sync.Map
  28 +}
  29 +
  30 +func NewServer(port int, pluginPath string, splitter ISplitter) *Server {
  31 + s := &Server{
  32 + splitter: splitter,
  33 + port: port,
  34 + PluginPath: pluginPath,
  35 + Clients: new(sync.Map),
  36 + }
  37 + return s
  38 +}
  39 +
  40 +func (s *Server) GetSplitter() ISplitter {
  41 + return s.splitter
  42 +}
  43 +
  44 +func (s *Server) GetIConnection(id int) IConnection {
  45 + c, ok := s.Clients.Load(id)
  46 + if !ok {
  47 + return nil
  48 + }
  49 + return c.(IConnection)
  50 +}
  51 +
  52 +func (s *Server) SetConnectionCallback(cb ConnectionCallback) {
  53 + s.connectionCallback = cb
  54 +}
  55 +
  56 +func (s *Server) SetMessageCallback(cb MessageCallback) {
  57 + s.messageCallback = cb
  58 +}
  59 +
  60 +func (s *Server) SetCloseCallback(cb CloseCallback) {
  61 + s.closeCallback = cb
  62 +}
  63 +
  64 +func (s *Server) SetTimerCallback(cb TimerCallback) {
  65 +}
  66 +
  67 +func (s *Server) newConnection(conn IConnection) {
  68 + s.Clients.Store(conn.GetID(), conn)
  69 +
  70 + conn.SetConnectionCallback(s.connectionCallback)
  71 + conn.SetCloseCallback(s.removeConnection)
  72 + conn.SetMessageCallback(s.messageCallback)
  73 +
  74 + go conn.Start()
  75 +}
  76 +
  77 +func (s *Server) removeConnection(conn IConnection) {
  78 + s.closeCallback(conn)
  79 + s.Clients.Delete(conn.GetID())
  80 +}
  81 +
  82 +func (s *Server) LoadPlugin() {
  83 + //重新加载
  84 + _, err:=plugin.Open(s.PluginPath)
  85 + if err != nil {
  86 + logger.Error("load plugin err: %v, %s", err, s.PluginPath)
  87 + return
  88 + }
  89 + logger.Debug("load plugin success")
  90 +}
  91 +
  92 +
  93 +func (s *Server) Start() error {
  94 + //初始化plugin
  95 + //_, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath)
  96 + //if err != nil {
  97 + // return err
  98 + //}
  99 + port := fmt.Sprintf(":%d", s.port)
  100 + l, err := net.Listen("tcp", port)
  101 + if err != nil {
  102 + return err
  103 + }
  104 + //监听端口
  105 + logger.Debug("listen on %s\n", port)
  106 + id := 0
  107 + for {
  108 + conn, err := l.Accept()
  109 + if err != nil {
  110 + return err
  111 + }
  112 +
  113 + id++
  114 + client := NewConn(id, conn, s)
  115 + s.newConnection(client)
  116 + }
  117 +}
  118 +
  119 +func (s *Server)Stop() {
  120 + StopTimer()
  121 + s.Clients.Range(func(key, value interface{}) bool {
  122 + client := value.(IConnection)
  123 + client.Stop()
  124 + return true
  125 + })
  126 +}
... ...
src/components/timewheel/timerwheel.go renamed to common/components/timerwheel.go
1   -package timewheel
  1 +package components
2 2  
3 3 import (
4 4 "container/list"
5   - "pro2d/src/common"
6   - "pro2d/src/components/workpool"
  5 + "pro2d/common"
7 6 "sync"
8 7 "sync/atomic"
9 8 "time"
... ... @@ -73,16 +72,16 @@ type TimeWheel struct {
73 72 t [4][TimeLevel]*bucket
74 73 time uint32
75 74  
76   - WorkPool *workpool.WorkPool
  75 + WorkPool *WorkPool
77 76 exit chan struct{}
78 77 }
79 78  
80 79 func NewTimeWheel() *TimeWheel {
81 80 tw := &TimeWheel{
82   - tick: 10*time.Millisecond,
83   - time: 0,
84   - WorkPool: workpool.NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker),
85   - exit: make(chan struct{}),
  81 + tick: 10*time.Millisecond,
  82 + time: 0,
  83 + WorkPool: NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker),
  84 + exit: make(chan struct{}),
86 85 }
87 86 for i :=0; i < TimeNear; i++ {
88 87 tw.near[i] = newBucket()
... ...
src/components/timewheel/timewheel_test.go renamed to common/components/timewheel_test.go
1   -package timewheel
  1 +package components
2 2  
3 3 import (
4 4 "fmt"
... ...
src/components/workpool/workpool.go renamed to common/components/workpool.go
1   -package workpool
  1 +package components
2 2  
3 3 type Job func()
4 4  
... ...
src/common/common.go renamed to common/const.go
... ... @@ -5,8 +5,9 @@ const (
5 5 WorkerPoolSize = 10
6 6 MaxTaskPerWorker = 100
7 7  
8   - //包头
9   - HEADLEN = 16
  8 + //最大包大
  9 + MaxPacketLength = 10 * 1024 * 1024
  10 + MaxMsgChan = 100
10 11  
11 12 //jwt
12 13 Pro2DTokenSignedString = "Pro2DSecret"
... ... @@ -19,6 +20,6 @@ const (
19 20 HeartTimerInterval = 5 //s
20 21 HeartTimeoutCountMax = 20 //最大超时次数
21 22  
22   - //保存数据时间剑客
  23 + //保存数据时间
23 24 SaveDataInterval = 5 //s
24 25 )
... ...
conf/conf.go
... ... @@ -5,9 +5,9 @@ import (
5 5 "fmt"
6 6 "gopkg.in/yaml.v3"
7 7 "io/ioutil"
8   - "pro2d/src/components/db"
9   - "pro2d/src/components/logger"
10   - "pro2d/src/utils"
  8 + "pro2d/utils"
  9 + "pro2d/utils/db"
  10 + "pro2d/utils/logger"
11 11 "strings"
12 12 )
13 13  
... ...
conf/conf.yaml
... ... @@ -28,7 +28,7 @@ server_game:
28 28 id: "1"
29 29 name: "game"
30 30 ip: "192.168.0.206"
31   - port: 8849
  31 + port: 8850
32 32 dbname: "game"
33 33 pool_size: 1
34 34 plugin_path: "./bin/plugin.so"
... ...
src/models/account.go renamed to models/account.go
... ... @@ -2,7 +2,7 @@ package models
2 2  
3 3 import (
4 4 "pro2d/pb"
5   - "pro2d/src/components/db"
  5 + "pro2d/utils/db"
6 6 )
7 7  
8 8 type AccountModel struct {
... ... @@ -23,7 +23,7 @@ func NewAccount(phone string) *AccountModel {
23 23 Phone: phone,
24 24 }
25 25 account := &AccountModel{
26   - Schema: db.NewSchema(phone, ac),
  26 + Schema: db.NewSchema(phone, ac),
27 27 Account: ac,
28 28 }
29 29  
... ...
src/models/equip.go renamed to models/equip.go
... ... @@ -2,7 +2,7 @@ package models
2 2  
3 3 import (
4 4 "pro2d/pb"
5   - "pro2d/src/components/db"
  5 + "pro2d/utils/db"
6 6 )
7 7  
8 8 type EquipModels struct {
... ... @@ -16,7 +16,7 @@ func NewEquip(id string) *EquipModels {
16 16 }
17 17 m := &EquipModels{
18 18 Schema: db.NewSchema(id, data),
19   - Equip: data,
  19 + Equip: data,
20 20 }
21 21  
22 22 return m
... ...
src/models/hero.go renamed to models/hero.go
... ... @@ -2,7 +2,7 @@ package models
2 2  
3 3 import (
4 4 "pro2d/pb"
5   - "pro2d/src/components/db"
  5 + "pro2d/utils/db"
6 6 )
7 7  
8 8 type HeroModel struct {
... ... @@ -25,7 +25,7 @@ func NewHero(id string) *HeroModel {
25 25 }
26 26 m := &HeroModel{
27 27 Schema: db.NewSchema(id, h),
28   - Hero: h,
  28 + Hero: h,
29 29 }
30 30 return m
31 31 }
... ...
src/models/init.go renamed to models/init.go
... ... @@ -2,9 +2,9 @@ package models
2 2  
3 3 import (
4 4 "pro2d/pb"
5   - "pro2d/src/components/db"
6   - "pro2d/src/components/logger"
7   - "pro2d/src/utils"
  5 + "pro2d/utils"
  6 + "pro2d/utils/db"
  7 + "pro2d/utils/logger"
8 8 )
9 9  
10 10 func InitDoc(schema ...interface{}) {
... ...
src/models/init_test.go renamed to models/init_test.go
... ... @@ -3,7 +3,7 @@ package models
3 3 import (
4 4 "context"
5 5 _ "pro2d/conf"
6   - "pro2d/src/components/db"
  6 + "pro2d/utils/db"
7 7 "testing"
8 8 )
9 9  
... ...
src/models/prop.go renamed to models/prop.go
... ... @@ -2,7 +2,7 @@ package models
2 2  
3 3 import (
4 4 "pro2d/pb"
5   - "pro2d/src/components/db"
  5 + "pro2d/utils/db"
6 6 )
7 7  
8 8 type PropModels struct {
... ... @@ -16,7 +16,7 @@ func NewProp(id string) *PropModels {
16 16 }
17 17 m := &PropModels{
18 18 Schema: db.NewSchema(id, data),
19   - Prop: data,
  19 + Prop: data,
20 20 }
21 21  
22 22 return m
... ...
src/models/role.go renamed to models/role.go
... ... @@ -2,11 +2,11 @@ package models
2 2  
3 3 import (
4 4 "fmt"
  5 + "pro2d/common"
5 6 "pro2d/pb"
6   - "pro2d/src/common"
7   - "pro2d/src/components/db"
8   - "pro2d/src/components/logger"
9   - "pro2d/src/utils"
  7 + "pro2d/utils"
  8 + "pro2d/utils/db"
  9 + "pro2d/utils/logger"
10 10 "sync/atomic"
11 11 )
12 12  
... ... @@ -32,11 +32,11 @@ func RoleExistByUid(uid string) *RoleModel {
32 32  
33 33 r := &RoleModel{
34 34 Schema: db.NewSchema(data.Id, data),
35   - Role: data,
36   - Heros: make(HeroMap),
37   - Teams: new(TeamModel),
38   - Equip: new(EquipModels),
39   - Prop: new(PropModels),
  35 + Role: data,
  36 + Heros: make(HeroMap),
  37 + Teams: new(TeamModel),
  38 + Equip: new(EquipModels),
  39 + Prop: new(PropModels),
40 40 }
41 41 r.LoadAll()
42 42 return r
... ... @@ -46,11 +46,11 @@ func NewRole(id string) *RoleModel {
46 46 data := &pb.Role{Id: id}
47 47 m := &RoleModel{
48 48 Schema: db.NewSchema(id, data),
49   - Role: data,
50   - Heros: make(HeroMap),
51   - Teams: new(TeamModel),
52   - Equip: new(EquipModels),
53   - Prop: new(PropModels),
  49 + Role: data,
  50 + Heros: make(HeroMap),
  51 + Teams: new(TeamModel),
  52 + Equip: new(EquipModels),
  53 + Prop: new(PropModels),
54 54 }
55 55 return m
56 56 }
... ...
src/models/role_test.go renamed to models/role_test.go
... ... @@ -4,9 +4,9 @@ import (
4 4 "fmt"
5 5 _ "pro2d/conf"
6 6 "pro2d/pb"
7   - "pro2d/src/components/db"
8   - "pro2d/src/components/logger"
9   - "pro2d/src/utils"
  7 + "pro2d/utils"
  8 + "pro2d/utils/db"
  9 + "pro2d/utils/logger"
10 10 "testing"
11 11 )
12 12  
... ...
src/models/team.go renamed to models/team.go
... ... @@ -2,7 +2,7 @@ package models
2 2  
3 3 import (
4 4 "pro2d/pb"
5   - "pro2d/src/components/db"
  5 + "pro2d/utils/db"
6 6 )
7 7  
8 8 type TeamModel struct {
... ... @@ -16,7 +16,7 @@ func NewTeam(id string) *TeamModel {
16 16 }
17 17 m := &TeamModel{
18 18 Schema: db.NewSchema(id, data),
19   - Team: data,
  19 + Team: data,
20 20 }
21 21  
22 22 return m
... ...
src/plugin/RolePlugin.go renamed to plugin/RolePlugin.go
... ... @@ -2,21 +2,21 @@ package plugin
2 2  
3 3 import (
4 4 "github.com/golang/protobuf/proto"
  5 + "pro2d/common/components"
5 6 "pro2d/conf"
  7 + "pro2d/models"
6 8 "pro2d/pb"
7   - "pro2d/src/components/logger"
8   - "pro2d/src/components/net"
9   - "pro2d/src/models"
  9 + "pro2d/utils/logger"
10 10 )
11 11  
12   -func HeartRpc(msg *net.MsgPkg) (int32, proto.Message) {
  12 +func HeartRpc(msg components.IMessage) (int32, proto.Message) {
13 13 //msg.Conn.SetLastHeartCheckTime()
14 14 return 0, nil
15 15 }
16 16  
17   -func CreateRpc(msg *net.MsgPkg) (int32, proto.Message) {
  17 +func CreateRpc(msg components.IMessage) (int32, proto.Message) {
18 18 req := pb.CreateReq{}
19   - if err := proto.Unmarshal(msg.Body, &req); err != nil {
  19 + if err := proto.Unmarshal(msg.GetData(), &req); err != nil {
20 20 logger.Error("CreateRpc err: %v", err)
21 21 return 1, nil
22 22 }
... ... @@ -34,10 +34,10 @@ func CreateRpc(msg *net.MsgPkg) (int32, proto.Message) {
34 34 return 0, nil
35 35 }
36 36  
37   -func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) {
38   - //logger.Debug("cmd: %v, msg: %s", msg.Head.Cmd, msg.Body)
  37 +func LoginRpc(msg components.IMessage) (int32, proto.Message) {
  38 + //logger.Debug("cmd: %v, msg: %s", msg.PBHead.Cmd, msg.Body)
39 39 req := pb.LoginReq{}
40   - if err := proto.Unmarshal(msg.Body, &req); err != nil {
  40 + if err := proto.Unmarshal(msg.GetData(), &req); err != nil {
41 41 logger.Error("loginRpc err: %v", err)
42 42 return 1, nil
43 43 }
... ... @@ -49,7 +49,6 @@ func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) {
49 49 role.SetProperty("Device", req.Device)
50 50  
51 51  
52   - msg.Conn.Role = role
53 52 return 0, &pb.RoleRsp{
54 53 Role: role.Role,
55 54 Hero: role.GetAllHero(),
... ...
src/plugin/protocode.go renamed to plugin/protocode.go
1 1 package plugin
2 2  
3 3 import (
  4 + "pro2d/common/components"
4 5 "pro2d/pb"
5   - "pro2d/src/components/logger"
6   - "pro2d/src/components/net"
  6 + "pro2d/utils/logger"
7 7 )
8 8  
9 9 func init() {
10 10 logger.Debug("init protocode...")
11   - net.ActionMap = make(map[pb.ProtoCode]net.ActionHandler)
  11 + components.ActionMap = make(map[pb.ProtoCode]components.ActionHandler)
12 12  
13   - net.ActionMap[pb.ProtoCode_HeartReq] = HeartRpc
14   - net.ActionMap[pb.ProtoCode_LoginReq] = LoginRpc
15   - net.ActionMap[pb.ProtoCode_CreateReq] = CreateRpc
  13 + components.ActionMap[pb.ProtoCode_HeartReq] = HeartRpc
  14 + components.ActionMap[pb.ProtoCode_LoginReq] = LoginRpc
  15 + components.ActionMap[pb.ProtoCode_CreateReq] = CreateRpc
16 16  
17 17 }
... ...
src/components/net/conn.go deleted
... ... @@ -1,195 +0,0 @@
1   -package net
2   -
3   -import (
4   - "bufio"
5   - "fmt"
6   - "math"
7   - "net"
8   - "pro2d/src/common"
9   - "pro2d/src/components/logger"
10   - "pro2d/src/models"
11   - "pro2d/src/utils"
12   - "sync/atomic"
13   -)
14   -
15   -type Head struct {
16   - Length int32
17   - Cmd int32
18   - ErrCode int32
19   - PreField int32
20   -}
21   -
22   -
23   -type Connection struct {
24   - net.Conn
25   - Id int
26   - Server *Server
27   -
28   - scanner *bufio.Scanner
29   - writer *bufio.Writer
30   -
31   - WBuffer chan []byte
32   -
33   - updateFunc chan func()
34   - readFunc chan func()
35   -
36   - Quit chan *Connection
37   -
38   - Role *models.RoleModel
39   -
40   - nextCheckTime int64 //下一次检查的时间
41   - lastHeartCheckTime int64 //最后收消息时间
42   - heartTimeoutCount int //超时次数
43   -}
44   -
45   -type MsgPkg struct {
46   - Head *Head
47   - Body []byte
48   - Conn *Connection
49   -}
50   -
51   -func NewConn(id int, conn net.Conn, s *Server) *Connection {
52   - return &Connection{
53   - Id: id,
54   - Conn: conn,
55   - Server: s,
56   -
57   - scanner: bufio.NewScanner(conn),
58   - writer: bufio.NewWriter(conn),
59   - WBuffer: make(chan []byte),
60   - updateFunc: make(chan func()),
61   - readFunc: make(chan func()),
62   - Quit: make(chan *Connection),
63   - lastHeartCheckTime: utils.Timex(),
64   - heartTimeoutCount: 0,
65   - }
66   -}
67   -
68   -func (c *Connection) write() {
69   - defer c.Quiting()
70   -
71   - for msg := range c.WBuffer {
72   - n, err := c.writer.Write(msg)
73   - if err != nil{
74   - logger.Error("write fail err: " + err.Error(), "n: ", n)
75   - return
76   - }
77   - if err := c.writer.Flush(); err != nil {
78   - logger.Error("write Flush fail err: " + err.Error())
79   - return
80   - }
81   - }
82   -}
83   -
84   -func (c *Connection) read() {
85   - defer c.Quiting()
86   - c.scanner.Split(ParseMsg)
87   -
88   - for c.scanner.Scan() {
89   - req, err := DecodeMsg(c.scanner.Bytes())
90   - if err != nil {
91   - return
92   - }
93   -
94   - req.Conn = c
95   - c.readFunc <- func() {
96   - c.Server.DoMsgHandler(req)
97   - }
98   -
99   - atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex())
100   -
101   - //备注,可以在当前协程处理当条请求(如下, 实现很简单,已经删除),也可以丢到协程池里处理任务(如上),还未对比效果。
102   - //c.Server.OnRecv(req)
103   - }
104   -
105   - if err := c.scanner.Err(); err != nil {
106   - fmt.Printf("scanner.err: %s\n", err.Error())
107   - return
108   - }
109   -}
110   -
111   -func (c *Connection) checkHeartBeat(now int64) {
112   - lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime)
113   - logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.Id, lastHeartCheckTime, now)
114   - if math.Abs(float64(lastHeartCheckTime - now)) > common.HeartTimerInterval {
115   - c.heartTimeoutCount++
116   - if c.heartTimeoutCount >= common.HeartTimeoutCountMax {
117   - c.Quiting()
118   - return
119   - }
120   - logger.Debug("timeout count: %d", c.heartTimeoutCount)
121   - }else {
122   - c.heartTimeoutCount = 0
123   - }
124   -}
125   -
126   -func (c *Connection) update() {
127   - nextCheckTime := atomic.LoadInt64(&c.nextCheckTime)
128   - now := utils.Timex()
129   - if now >= nextCheckTime {
130   - //检查心跳
131   - c.checkHeartBeat(now)
132   - nextCheckTime = now + common.HeartTimerInterval
133   - atomic.StoreInt64(&c.nextCheckTime, nextCheckTime)
134   - }
135   -
136   - c.updateFunc <- func() {
137   - if c.Role != nil {
138   - //role 恢复数据
139   - c.Role.OnRecoverTimer(now)
140   - }
141   - }
142   -}
143   -
144   -func (c *Connection) flush() {
145   - defer c.Stop()
146   - for {
147   - select {
148   - case rf := <- c.readFunc:
149   - rf()
150   - case uf := <- c.updateFunc:
151   - uf()
152   - case <- c.Quit:
153   - return
154   - }
155   - }
156   -}
157   -
158   -func (c *Connection) Start() {
159   - go c.write()
160   - go c.read()
161   - c.flush()
162   -}
163   -
164   -func (c *Connection) Stop() {
165   - logger.Debug("ID: %d close", c.Id)
166   - c.Conn.Close()
167   - if c.Role != nil {
168   - c.Role.OnOfflineEvent()
169   - }
170   -
171   - c.Server.OnClose(c)
172   -}
173   -
174   -func (c *Connection) Quiting() {
175   - c.Quit <- c
176   -}
177   -
178   -func (c *Connection) SendMsgByCode(errCode int32, cmd int32, data []byte){
179   - h := &Head{
180   - Length: int32(common.HEADLEN + len(data)),
181   - Cmd: cmd,
182   - ErrCode: errCode,
183   - PreField: 0,
184   - }
185   - pkg := &MsgPkg{
186   - Head: h,
187   - Body: data,
188   - }
189   - buf, err := EncodeMsg(pkg)
190   - if err != nil {
191   - logger.Error("SendMsg error: %v", err)
192   - return
193   - }
194   - c.WBuffer <- buf
195   -}
196 0 \ No newline at end of file
src/components/net/msg.go deleted
... ... @@ -1,57 +0,0 @@
1   -package net
2   -
3   -import (
4   - "bytes"
5   - "encoding/binary"
6   - "fmt"
7   - "pro2d/src/common"
8   -)
9   -
10   -func ParseMsg (data []byte, atEOF bool) (advance int, token []byte, err error) {
11   - // 表示我们已经扫描到结尾了
12   - if atEOF && len(data) == 0 {
13   - return 0, nil, nil
14   - }
15   - if !atEOF && len(data) >= common.HEADLEN { //4字节数据包长度 4字节指令
16   - length := int32(0)
17   - binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length)
18   - if length <= 0 {
19   - return 0, nil, fmt.Errorf("length is 0")
20   - }
21   - if int(length) <= len(data) {
22   - return int(length) , data[:int(length)], nil
23   - }
24   - return 0 , nil, nil
25   - }
26   - if atEOF {
27   - return len(data), data, nil
28   - }
29   - return 0, nil, nil
30   -}
31   -
32   -
33   -func DecodeMsg(data []byte) (*MsgPkg, error) {
34   - h := &Head{}
35   - err := binary.Read(bytes.NewReader(data), binary.BigEndian, h)
36   - if err != nil {
37   - return nil, err
38   - }
39   - return &MsgPkg{
40   - Head: h,
41   - Body: data[common.HEADLEN:],
42   - },nil
43   -}
44   -
45   -func EncodeMsg(pkg *MsgPkg) ([]byte, error) {
46   - buf := &bytes.Buffer{}
47   - err := binary.Write(buf, binary.BigEndian, pkg.Head)
48   - if err != nil {
49   - return nil, err
50   - }
51   -
52   - err = binary.Write(buf, binary.BigEndian, pkg.Body)
53   - if err != nil {
54   - return nil, err
55   - }
56   - return buf.Bytes(), nil
57   -}
58 0 \ No newline at end of file
src/components/net/server.go deleted
... ... @@ -1,131 +0,0 @@
1   -package net
2   -
3   -import (
4   - "context"
5   - "fmt"
6   - "github.com/golang/protobuf/proto"
7   - "net"
8   - "plugin"
9   - "pro2d/conf"
10   - "pro2d/pb"
11   - "pro2d/src/components/db"
12   - "pro2d/src/components/etcd"
13   - "pro2d/src/components/logger"
14   - "pro2d/src/components/timewheel"
15   - "pro2d/src/models"
16   - "sync"
17   - "time"
18   -)
19   -
20   -type ActionHandler func (msg *MsgPkg) (int32, proto.Message)
21   -var ActionMap map[pb.ProtoCode]ActionHandler
22   -
23   -type Server struct {
24   - SConf *conf.SConf
25   - Clients *sync.Map
26   - EtcdClient *etcd.EtcdClient
27   -}
28   -
29   -func NewServer(sConf *conf.SConf) *Server {
30   - return &Server{
31   - SConf: sConf,
32   - Clients: new(sync.Map),
33   - EtcdClient: new(etcd.EtcdClient),
34   - }
35   -}
36   -
37   -func (s *Server) DoMsgHandler(msg *MsgPkg) {
38   - if md, ok := ActionMap[pb.ProtoCode(msg.Head.Cmd)]; ok {
39   - logger.Debug("protocode handler: %d", msg.Head.Cmd)
40   - errCode, protomsg := md(msg)
41   - rsp, err := proto.Marshal(protomsg)
42   - fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg)
43   - if err != nil {
44   - msg.Conn.SendMsgByCode(-100, msg.Head.Cmd, nil)
45   - return
46   - }
47   - msg.Conn.SendMsgByCode(errCode, msg.Head.Cmd, rsp)
48   - return
49   - }
50   - logger.Error("protocode not handler: %d", msg.Head.Cmd)
51   -}
52   -
53   -func (s *Server) OnClose(conn *Connection) {
54   - s.Clients.Delete(conn.Id)
55   -}
56   -
57   -func (s *Server) LoadPlugin() {
58   - //重新加载
59   - _, err:=plugin.Open(conf.GlobalConf.GameConf.PluginPath)
60   - if err != nil {
61   - logger.Error("load plugin err: %v, %s", err, conf.GlobalConf.GameConf.PluginPath)
62   - return
63   - }
64   - logger.Debug("load plugin success")
65   -}
66   -
67   -func (s *Server) handleTimeOut() {
68   - s.Clients.Range(func(key, value interface{}) bool {
69   - client := value.(*Connection)
70   - client.update()
71   - return true
72   - })
73   -
74   - timewheel.TimeOut(1*time.Second, s.handleTimeOut)
75   -}
76   -
77   -func (s *Server)Start() error {
78   - //mongo 初始化
79   - db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.GameConf.DBName)
80   - models.InitGameServerModels()
81   -
82   - //Etcd 初始化
83   - var err error
84   - s.EtcdClient, err = etcd.NewEtcdClient(conf.GlobalConf.Etcd)
85   - if err != nil {
86   - return err
87   - }
88   - s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5)
89   -
90   - //初始化plugin
91   - //_, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath)
92   - //if err != nil {
93   - // return err
94   - //}
95   -
96   - port := fmt.Sprintf(":%d", s.SConf.Port)
97   - l, err := net.Listen("tcp", port)
98   - if err != nil {
99   - return err
100   - }
101   -
102   - //启动定时器
103   - s.handleTimeOut()
104   -
105   - //监听端口
106   - logger.Debug("listen on %s\n", port)
107   - id := 0
108   - for {
109   - conn, err := l.Accept()
110   - if err != nil {
111   - return err
112   - }
113   -
114   - id++
115   - client := NewConn(id, conn, s)
116   - s.Clients.Store(id, client)
117   - go client.Start()
118   - }
119   -}
120   -
121   -func (s *Server)Stop() {
122   - timewheel.StopTimer()
123   -
124   - s.Clients.Range(func(key, value interface{}) bool {
125   - client := value.(*Connection)
126   - client.Quiting()
127   - return true
128   - })
129   -
130   - db.MongoClient.Disconnect(context.TODO())
131   -}
132 0 \ No newline at end of file
src/components/db/mongo.go renamed to utils/db/mongo.go
... ... @@ -8,7 +8,7 @@ import (
8 8 "go.mongodb.org/mongo-driver/mongo/options"
9 9 "go.mongodb.org/mongo-driver/mongo/readpref"
10 10 "go.mongodb.org/mongo-driver/x/bsonx"
11   - "pro2d/src/utils"
  11 + "pro2d/utils"
12 12 "sort"
13 13 "strconv"
14 14 "strings"
... ...
src/components/db/redis.go renamed to utils/db/redis.go
src/components/db/schema.go renamed to utils/db/schema.go
src/components/etcd/etcd.go renamed to utils/etcd/etcd.go
... ... @@ -5,7 +5,7 @@ import (
5 5 "fmt"
6 6 clientv3 "go.etcd.io/etcd/client/v3"
7 7 "pro2d/conf"
8   - "pro2d/src/components/logger"
  8 + "pro2d/utils/logger"
9 9 "time"
10 10 )
11 11  
... ...
src/components/etcd/etcd_test.go renamed to utils/etcd/etcd_test.go
... ... @@ -6,7 +6,7 @@ import (
6 6 "go.etcd.io/etcd/api/v3/mvccpb"
7 7 clientv3 "go.etcd.io/etcd/client/v3"
8 8 "pro2d/conf"
9   - "pro2d/src/components/logger"
  9 + "pro2d/utils/logger"
10 10 "testing"
11 11 )
12 12  
... ...
src/components/logger/README.md renamed to utils/logger/README.md
src/components/logger/conn.go renamed to utils/logger/conn.go
src/components/logger/console.go renamed to utils/logger/console.go
src/components/logger/file.go renamed to utils/logger/file.go
src/components/logger/log.go renamed to utils/logger/log.go
src/utils/snowflake.go renamed to utils/snowflake.go
src/utils/utils.go renamed to utils/utils.go