From 2e0aa29811785c6f64b79661328b8adee619c8eb Mon Sep 17 00:00:00 2001 From: zqj <582132116@qq.com> Date: Tue, 15 Mar 2022 16:27:49 +0800 Subject: [PATCH] update 每条连接新增一条协程,用于接受处理网络数据的逻辑和定时器的逻辑, 防止数据竞争 --- protos | 2 +- src/components/net/conn.go | 35 ++++++++++++++++++++++++++--------- src/components/net/msg.go | 2 +- src/components/net/server.go | 46 ++++++++++------------------------------------ src/components/timewheel/timerwheel.go | 6 +++++- src/models/role.go | 8 ++++---- src/plugin/RolePlugin.go | 1 + test/client.go | 2 +- 8 files changed, 49 insertions(+), 53 deletions(-) diff --git a/protos b/protos index bb9c916..608828e 160000 --- a/protos +++ b/protos @@ -1 +1 @@ -Subproject commit bb9c916232c605571321f523802c17c469c8ab1c +Subproject commit 608828ec759ed7e958cf63ae557e592c816e45cc diff --git a/src/components/net/conn.go b/src/components/net/conn.go index c13f161..05b5fe6 100644 --- a/src/components/net/conn.go +++ b/src/components/net/conn.go @@ -30,6 +30,9 @@ type Connection struct { WBuffer chan []byte + updateFunc chan func() + readFunc chan func() + Quit chan *Connection Role *models.RoleModel @@ -54,9 +57,11 @@ func NewConn(id int, conn net.Conn, s *Server) *Connection { scanner: bufio.NewScanner(conn), writer: bufio.NewWriter(conn), WBuffer: make(chan []byte), + updateFunc: make(chan func()), + readFunc: make(chan func()), Quit: make(chan *Connection), lastHeartCheckTime: utils.Timex(), - heartTimeoutCount: 0, + heartTimeoutCount: 0, } } @@ -87,10 +92,7 @@ func (c *Connection) read() { } req.Conn = c - //得到需要处理此条连接的workerID - workerID := c.Id % c.Server.SConf.WorkerPoolSize - //将请求消息发送给任务队列 - c.Server.TaskQueue[workerID] <- func() { + c.readFunc <- func() { c.Server.DoMsgHandler(req) } @@ -132,26 +134,41 @@ func (c *Connection) update() { } if c.Role != nil { - c.Server.TaskQueue[c.Id % c.Server.SConf.WorkerPoolSize] <- func() { - //role 获取Recover + c.updateFunc <- func() { + //role 恢复数据 c.Role.OnRecoverTimer(now) } } } +func (c *Connection) flush() { + defer c.Stop() + for { + select { + case rf := <- c.readFunc: + rf() + case uf := <- c.updateFunc: + uf() + case <- c.Quit: + return + } + } +} func (c *Connection) Start() { go c.write() go c.read() + c.flush() } func (c *Connection) Stop() { + logger.Debug("ID: %d close", c.Id) c.Conn.Close() + c.Server.OnClose(c) } func (c *Connection) Quiting() { - logger.Debug("ID: %d close", c.Id) - c.Server.OnClose(c) + c.Quit <- c } func (c *Connection) SendMsgByCode(errCode int32, cmd int32, data []byte){ diff --git a/src/components/net/msg.go b/src/components/net/msg.go index 2c5d330..beb331e 100644 --- a/src/components/net/msg.go +++ b/src/components/net/msg.go @@ -54,4 +54,4 @@ func EncodeMsg(pkg *MsgPkg) ([]byte, error) { return nil, err } return buf.Bytes(), nil -} +} \ No newline at end of file diff --git a/src/components/net/server.go b/src/components/net/server.go index cbc4acd..3aab660 100644 --- a/src/components/net/server.go +++ b/src/components/net/server.go @@ -1,13 +1,13 @@ package net import ( + "context" "fmt" "github.com/golang/protobuf/proto" "net" "plugin" "pro2d/conf" "pro2d/protos/pb" - "pro2d/src/common" "pro2d/src/components/db" "pro2d/src/components/etcd" "pro2d/src/components/logger" @@ -19,14 +19,11 @@ import ( type ActionHandler func (msg *MsgPkg) (int32, proto.Message) var ActionMap map[pb.ProtoCode]ActionHandler -var TimeWheel *timewheel.TimeWheel type Server struct { SConf *conf.SConf Clients *sync.Map EtcdClient *etcd.EtcdClient - - TaskQueue []chan func() } func NewServer(sConf *conf.SConf) *Server { @@ -34,34 +31,6 @@ func NewServer(sConf *conf.SConf) *Server { SConf: sConf, Clients: new(sync.Map), EtcdClient: new(etcd.EtcdClient), - - TaskQueue: make([]chan func(), common.WorkerPoolSize), - } -} - -//StartWorkerPool 启动worker工作池 -func (s *Server) StartWorkerPool() { - //遍历需要启动worker的数量,依此启动 - for i := 0; i < s.SConf.WorkerPoolSize; i++ { - //一个worker被启动 - //给当前worker对应的任务队列开辟空间 - s.TaskQueue[i] = make(chan func(), common.MaxTaskPerWorker) - //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来 - go s.StartOneWorker(i, s.TaskQueue[i]) - } -} - -//StartOneWorker 启动一个Worker工作流程 -func (s *Server) StartOneWorker(workerID int, taskQueue chan func()) { - //不断的等待队列中的消息 - for { - select { - //有消息则取出队列的Request,并执行绑定的业务方法 - case f:= <-taskQueue: - _ = workerID - f() - //s.DoMsgHandler(request) - } } } @@ -131,9 +100,6 @@ func (s *Server)Start() error { return err } - //启动协程池 - s.StartWorkerPool() - //启动定时器 s.handleTimeOut() @@ -154,5 +120,13 @@ func (s *Server)Start() error { } func (s *Server)Stop() { - TimeWheel.Stop() + timewheel.StopTimer() + + s.Clients.Range(func(key, value interface{}) bool { + client := value.(*Connection) + client.Stop() + return true + }) + + db.MongoClient.Disconnect(context.TODO()) } \ No newline at end of file diff --git a/src/components/timewheel/timerwheel.go b/src/components/timewheel/timerwheel.go index b6fd1b9..e6afe63 100644 --- a/src/components/timewheel/timerwheel.go +++ b/src/components/timewheel/timerwheel.go @@ -83,7 +83,7 @@ func NewTimeWheel() *TimeWheel { tick: 10*time.Millisecond, time: 0, WorkPool: workpool.NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker), - exit: nil, + exit: make(chan struct{}), } for i :=0; i < TimeNear; i++ { tw.near[i] = newBucket() @@ -198,4 +198,8 @@ func (tw *TimeWheel) afterFunc(expiration time.Duration, f func()) { func TimeOut(expire time.Duration, f func()) { TimingWheel.afterFunc(expire, f) +} + +func StopTimer() { + TimingWheel.Stop() } \ No newline at end of file diff --git a/src/models/role.go b/src/models/role.go index d99df84..e19d8ee 100644 --- a/src/models/role.go +++ b/src/models/role.go @@ -99,10 +99,10 @@ func (m *RoleModel) AddHero(hero *pb.Hero) { m.Heros[fmt.Sprintf("%s%s", m.Role.Id, h.Hero.Id)] = h } -func (m *RoleModel) GetAllHero() map[string]*pb.Hero { - h := make(map[string]*pb.Hero) - for k, hero := range m.Heros { - h[k] = hero.Hero +func (m *RoleModel) GetAllHero() []*pb.Hero { + var h []*pb.Hero + for _, hero := range m.Heros { + h = append(h, hero.Hero) } return h } diff --git a/src/plugin/RolePlugin.go b/src/plugin/RolePlugin.go index b45cc97..cf3bbd3 100644 --- a/src/plugin/RolePlugin.go +++ b/src/plugin/RolePlugin.go @@ -54,6 +54,7 @@ func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) { // Team: nil, // Equips: nil, //} + msg.Conn.Role = role return 0, &pb.RoleRsp{ Role: role.Role, Hero: role.GetAllHero(), diff --git a/test/client.go b/test/client.go index da4b75f..7066ce5 100644 --- a/test/client.go +++ b/test/client.go @@ -69,7 +69,7 @@ func main() { return } logger.Debug("recv: %s, n: %d\n", b1, n) - time.Sleep(5*time.Second) + time.Sleep(1*time.Second) } } \ No newline at end of file -- libgit2 0.21.2