Commit 38dd96b4e4afe0fa749c9c8ee3100be5aec0c73c

Authored by zhangqijia
1 parent 33ea26ab

定时器+网络数据 peer 在一条协程,避免锁

src/actions/AccountAction.go
@@ -21,7 +21,7 @@ func (h *AccountAction) Register(c *gin.Context) (int, interface{}){ @@ -21,7 +21,7 @@ func (h *AccountAction) Register(c *gin.Context) (int, interface{}){
21 21
22 account := models.NewAccount(register.Phone) 22 account := models.NewAccount(register.Phone)
23 if err := account.Load(); err == nil { 23 if err := account.Load(); err == nil {
24 - return -2 , "account exist: " + register.Phone 24 + return -2 , "account exists: " + register.Phone
25 } 25 }
26 26
27 account.Uid = conf.SnowFlack.NextValStr() 27 account.Uid = conf.SnowFlack.NextValStr()
src/common/common.go
@@ -18,4 +18,7 @@ const ( @@ -18,4 +18,7 @@ const (
18 //心跳 18 //心跳
19 HeartTimerInterval = 5 //s 19 HeartTimerInterval = 5 //s
20 HeartTimeoutCountMax = 20 //最大超时次数 20 HeartTimeoutCountMax = 20 //最大超时次数
  21 +
  22 + //保存数据时间剑客
  23 + SaveDataInterval = 5 //s
21 ) 24 )
src/components/db/schema.go
@@ -10,6 +10,7 @@ type Schema struct { @@ -10,6 +10,7 @@ type Schema struct {
10 reflectValue *reflect.Value 10 reflectValue *reflect.Value
11 reflectType reflect.Type 11 reflectType reflect.Type
12 12
  13 + cacheFields map[string]interface{}
13 14
14 pri interface{} 15 pri interface{}
15 schema interface{} 16 schema interface{}
@@ -23,6 +24,7 @@ func NewSchema(key string, schema interface{}) *Schema { @@ -23,6 +24,7 @@ func NewSchema(key string, schema interface{}) *Schema {
23 sch := &Schema{ 24 sch := &Schema{
24 reflectValue: &s, 25 reflectValue: &s,
25 reflectType: s.Type(), 26 reflectType: s.Type(),
  27 + cacheFields: make(map[string]interface{}),
26 schema: schema, 28 schema: schema,
27 } 29 }
28 sch.mgo = NewMongoColl(sch) 30 sch.mgo = NewMongoColl(sch)
@@ -66,14 +68,21 @@ func (s *Schema) Create() error { @@ -66,14 +68,21 @@ func (s *Schema) Create() error {
66 return err 68 return err
67 } 69 }
68 70
  71 +func (s *Schema) Update() {
  72 + if s.cacheFields != nil {
  73 + s.mgo.UpdateProperties(s.cacheFields)
  74 + s.cacheFields = make(map[string]interface{})
  75 + }
  76 +}
  77 +
69 func (s *Schema) UpdateProperty(key string, val interface{}) { 78 func (s *Schema) UpdateProperty(key string, val interface{}) {
70 s.reflectValue.FieldByName(key).Set(reflect.ValueOf(val)) 79 s.reflectValue.FieldByName(key).Set(reflect.ValueOf(val))
71 - s.mgo.UpdateProperty(key, val) 80 + s.cacheFields[key] = val
72 } 81 }
73 82
74 func (s *Schema) UpdateProperties(properties map[string]interface{}) { 83 func (s *Schema) UpdateProperties(properties map[string]interface{}) {
75 for key, val := range properties { 84 for key, val := range properties {
76 s.reflectValue.FieldByName(key).Set(reflect.ValueOf(val)) 85 s.reflectValue.FieldByName(key).Set(reflect.ValueOf(val))
  86 + s.cacheFields[key] = val
77 } 87 }
78 - s.mgo.UpdateProperties(properties)  
79 -} 88 +}
80 \ No newline at end of file 89 \ No newline at end of file
src/components/net/conn.go
@@ -7,6 +7,7 @@ import ( @@ -7,6 +7,7 @@ import (
7 "net" 7 "net"
8 "pro2d/src/common" 8 "pro2d/src/common"
9 "pro2d/src/components/logger" 9 "pro2d/src/components/logger"
  10 + "pro2d/src/models"
10 "pro2d/src/utils" 11 "pro2d/src/utils"
11 "sync/atomic" 12 "sync/atomic"
12 ) 13 )
@@ -31,6 +32,8 @@ type Connection struct { @@ -31,6 +32,8 @@ type Connection struct {
31 32
32 Quit chan *Connection 33 Quit chan *Connection
33 34
  35 + Role *models.RoleModel
  36 +
34 nextCheckTime int64 //下一次检查的时间 37 nextCheckTime int64 //下一次检查的时间
35 lastHeartCheckTime int64 //最后收消息时间 38 lastHeartCheckTime int64 //最后收消息时间
36 heartTimeoutCount int //超时次数 39 heartTimeoutCount int //超时次数
@@ -87,7 +90,9 @@ func (c *Connection) read() { @@ -87,7 +90,9 @@ func (c *Connection) read() {
87 //得到需要处理此条连接的workerID 90 //得到需要处理此条连接的workerID
88 workerID := c.Id % c.Server.SConf.WorkerPoolSize 91 workerID := c.Id % c.Server.SConf.WorkerPoolSize
89 //将请求消息发送给任务队列 92 //将请求消息发送给任务队列
90 - c.Server.TaskQueue[workerID] <- req 93 + c.Server.TaskQueue[workerID] <- func() {
  94 + c.Server.DoMsgHandler(req)
  95 + }
91 96
92 atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex()) 97 atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex())
93 98
@@ -120,10 +125,18 @@ func (c *Connection) update() { @@ -120,10 +125,18 @@ func (c *Connection) update() {
120 nextCheckTime := atomic.LoadInt64(&c.nextCheckTime) 125 nextCheckTime := atomic.LoadInt64(&c.nextCheckTime)
121 now := utils.Timex() 126 now := utils.Timex()
122 if now >= nextCheckTime { 127 if now >= nextCheckTime {
  128 + //检查心跳
123 c.checkHeartBeat(now) 129 c.checkHeartBeat(now)
124 nextCheckTime = now + common.HeartTimerInterval 130 nextCheckTime = now + common.HeartTimerInterval
125 atomic.StoreInt64(&c.nextCheckTime, nextCheckTime) 131 atomic.StoreInt64(&c.nextCheckTime, nextCheckTime)
126 } 132 }
  133 +
  134 + if c.Role != nil {
  135 + c.Server.TaskQueue[c.Id % c.Server.SConf.WorkerPoolSize] <- func() {
  136 + //role 获取Recover
  137 + c.Role.OnRecoverTimer(now)
  138 + }
  139 + }
127 } 140 }
128 141
129 142
src/components/net/server.go
@@ -26,7 +26,7 @@ type Server struct { @@ -26,7 +26,7 @@ type Server struct {
26 Clients *sync.Map 26 Clients *sync.Map
27 EtcdClient *etcd.EtcdClient 27 EtcdClient *etcd.EtcdClient
28 28
29 - TaskQueue []chan*MsgPkg 29 + TaskQueue []chan func()
30 } 30 }
31 31
32 func NewServer(sConf *conf.SConf) *Server { 32 func NewServer(sConf *conf.SConf) *Server {
@@ -35,7 +35,7 @@ func NewServer(sConf *conf.SConf) *Server { @@ -35,7 +35,7 @@ func NewServer(sConf *conf.SConf) *Server {
35 Clients: new(sync.Map), 35 Clients: new(sync.Map),
36 EtcdClient: new(etcd.EtcdClient), 36 EtcdClient: new(etcd.EtcdClient),
37 37
38 - TaskQueue: make([]chan *MsgPkg, common.WorkerPoolSize), 38 + TaskQueue: make([]chan func(), common.WorkerPoolSize),
39 } 39 }
40 } 40 }
41 41
@@ -45,21 +45,22 @@ func (s *Server) StartWorkerPool() { @@ -45,21 +45,22 @@ func (s *Server) StartWorkerPool() {
45 for i := 0; i < s.SConf.WorkerPoolSize; i++ { 45 for i := 0; i < s.SConf.WorkerPoolSize; i++ {
46 //一个worker被启动 46 //一个worker被启动
47 //给当前worker对应的任务队列开辟空间 47 //给当前worker对应的任务队列开辟空间
48 - s.TaskQueue[i] = make(chan *MsgPkg, common.MaxTaskPerWorker) 48 + s.TaskQueue[i] = make(chan func(), common.MaxTaskPerWorker)
49 //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来 49 //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
50 go s.StartOneWorker(i, s.TaskQueue[i]) 50 go s.StartOneWorker(i, s.TaskQueue[i])
51 } 51 }
52 } 52 }
53 53
54 //StartOneWorker 启动一个Worker工作流程 54 //StartOneWorker 启动一个Worker工作流程
55 -func (s *Server) StartOneWorker(workerID int, taskQueue chan *MsgPkg) { 55 +func (s *Server) StartOneWorker(workerID int, taskQueue chan func()) {
56 //不断的等待队列中的消息 56 //不断的等待队列中的消息
57 for { 57 for {
58 select { 58 select {
59 //有消息则取出队列的Request,并执行绑定的业务方法 59 //有消息则取出队列的Request,并执行绑定的业务方法
60 - case request := <-taskQueue: 60 + case f:= <-taskQueue:
61 _ = workerID 61 _ = workerID
62 - s.DoMsgHandler(request) 62 + f()
  63 + //s.DoMsgHandler(request)
63 } 64 }
64 } 65 }
65 } 66 }
src/models/role.go
@@ -3,8 +3,10 @@ package models @@ -3,8 +3,10 @@ package models
3 import ( 3 import (
4 "fmt" 4 "fmt"
5 "pro2d/protos/pb" 5 "pro2d/protos/pb"
  6 + "pro2d/src/common"
6 "pro2d/src/components/db" 7 "pro2d/src/components/db"
7 "pro2d/src/components/logger" 8 "pro2d/src/components/logger"
  9 + "sync/atomic"
8 ) 10 )
9 11
10 type RoleModel struct { 12 type RoleModel struct {
@@ -14,6 +16,8 @@ type RoleModel struct { @@ -14,6 +16,8 @@ type RoleModel struct {
14 Teams *TeamModel 16 Teams *TeamModel
15 Equip *EquipModels 17 Equip *EquipModels
16 Prop *PropModels 18 Prop *PropModels
  19 +
  20 + lastSaveTs int64
17 } 21 }
18 22
19 func RoleExistByUid(uid string) *RoleModel { 23 func RoleExistByUid(uid string) *RoleModel {
@@ -102,3 +106,15 @@ func (m *RoleModel) GetAllHero() map[string]*pb.Hero { @@ -102,3 +106,15 @@ func (m *RoleModel) GetAllHero() map[string]*pb.Hero {
102 } 106 }
103 return h 107 return h
104 } 108 }
  109 +
  110 +func (m *RoleModel) OnRecoverTimer(now int64) {
  111 + m.saveRoleData(now)
  112 +}
  113 +
  114 +func (m *RoleModel) saveRoleData(now int64) {
  115 + if now - m.lastSaveTs < common.SaveDataInterval {
  116 + return
  117 + }
  118 + atomic.StoreInt64(&m.lastSaveTs, now)
  119 + m.Update()
  120 +}
105 \ No newline at end of file 121 \ No newline at end of file