diff --git a/src/actions/AccountAction.go b/src/actions/AccountAction.go index 5e5bfcd..adf5312 100644 --- a/src/actions/AccountAction.go +++ b/src/actions/AccountAction.go @@ -21,7 +21,7 @@ func (h *AccountAction) Register(c *gin.Context) (int, interface{}){ account := models.NewAccount(register.Phone) if err := account.Load(); err == nil { - return -2 , "account exist: " + register.Phone + return -2 , "account exists: " + register.Phone } account.Uid = conf.SnowFlack.NextValStr() diff --git a/src/common/common.go b/src/common/common.go index 1d27e7b..33ca56e 100644 --- a/src/common/common.go +++ b/src/common/common.go @@ -18,4 +18,7 @@ const ( //心跳 HeartTimerInterval = 5 //s HeartTimeoutCountMax = 20 //最大超时次数 + + //保存数据时间剑客 + SaveDataInterval = 5 //s ) diff --git a/src/components/db/schema.go b/src/components/db/schema.go index 9a67d6b..25d82d1 100644 --- a/src/components/db/schema.go +++ b/src/components/db/schema.go @@ -10,6 +10,7 @@ type Schema struct { reflectValue *reflect.Value reflectType reflect.Type + cacheFields map[string]interface{} pri interface{} schema interface{} @@ -23,6 +24,7 @@ func NewSchema(key string, schema interface{}) *Schema { sch := &Schema{ reflectValue: &s, reflectType: s.Type(), + cacheFields: make(map[string]interface{}), schema: schema, } sch.mgo = NewMongoColl(sch) @@ -66,14 +68,21 @@ func (s *Schema) Create() error { return err } +func (s *Schema) Update() { + if s.cacheFields != nil { + s.mgo.UpdateProperties(s.cacheFields) + s.cacheFields = make(map[string]interface{}) + } +} + func (s *Schema) UpdateProperty(key string, val interface{}) { s.reflectValue.FieldByName(key).Set(reflect.ValueOf(val)) - s.mgo.UpdateProperty(key, val) + s.cacheFields[key] = val } func (s *Schema) UpdateProperties(properties map[string]interface{}) { for key, val := range properties { s.reflectValue.FieldByName(key).Set(reflect.ValueOf(val)) + s.cacheFields[key] = val } - s.mgo.UpdateProperties(properties) -} +} \ No newline at end of file diff --git a/src/components/net/conn.go b/src/components/net/conn.go index 80fbfaf..c13f161 100644 --- a/src/components/net/conn.go +++ b/src/components/net/conn.go @@ -7,6 +7,7 @@ import ( "net" "pro2d/src/common" "pro2d/src/components/logger" + "pro2d/src/models" "pro2d/src/utils" "sync/atomic" ) @@ -31,6 +32,8 @@ type Connection struct { Quit chan *Connection + Role *models.RoleModel + nextCheckTime int64 //下一次检查的时间 lastHeartCheckTime int64 //最后收消息时间 heartTimeoutCount int //超时次数 @@ -87,7 +90,9 @@ func (c *Connection) read() { //得到需要处理此条连接的workerID workerID := c.Id % c.Server.SConf.WorkerPoolSize //将请求消息发送给任务队列 - c.Server.TaskQueue[workerID] <- req + c.Server.TaskQueue[workerID] <- func() { + c.Server.DoMsgHandler(req) + } atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex()) @@ -120,10 +125,18 @@ func (c *Connection) update() { nextCheckTime := atomic.LoadInt64(&c.nextCheckTime) now := utils.Timex() if now >= nextCheckTime { + //检查心跳 c.checkHeartBeat(now) nextCheckTime = now + common.HeartTimerInterval atomic.StoreInt64(&c.nextCheckTime, nextCheckTime) } + + if c.Role != nil { + c.Server.TaskQueue[c.Id % c.Server.SConf.WorkerPoolSize] <- func() { + //role 获取Recover + c.Role.OnRecoverTimer(now) + } + } } diff --git a/src/components/net/server.go b/src/components/net/server.go index 7634f76..cbc4acd 100644 --- a/src/components/net/server.go +++ b/src/components/net/server.go @@ -26,7 +26,7 @@ type Server struct { Clients *sync.Map EtcdClient *etcd.EtcdClient - TaskQueue []chan*MsgPkg + TaskQueue []chan func() } func NewServer(sConf *conf.SConf) *Server { @@ -35,7 +35,7 @@ func NewServer(sConf *conf.SConf) *Server { Clients: new(sync.Map), EtcdClient: new(etcd.EtcdClient), - TaskQueue: make([]chan *MsgPkg, common.WorkerPoolSize), + TaskQueue: make([]chan func(), common.WorkerPoolSize), } } @@ -45,21 +45,22 @@ func (s *Server) StartWorkerPool() { for i := 0; i < s.SConf.WorkerPoolSize; i++ { //一个worker被启动 //给当前worker对应的任务队列开辟空间 - s.TaskQueue[i] = make(chan *MsgPkg, common.MaxTaskPerWorker) + s.TaskQueue[i] = make(chan func(), common.MaxTaskPerWorker) //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来 go s.StartOneWorker(i, s.TaskQueue[i]) } } //StartOneWorker 启动一个Worker工作流程 -func (s *Server) StartOneWorker(workerID int, taskQueue chan *MsgPkg) { +func (s *Server) StartOneWorker(workerID int, taskQueue chan func()) { //不断的等待队列中的消息 for { select { //有消息则取出队列的Request,并执行绑定的业务方法 - case request := <-taskQueue: + case f:= <-taskQueue: _ = workerID - s.DoMsgHandler(request) + f() + //s.DoMsgHandler(request) } } } diff --git a/src/models/role.go b/src/models/role.go index 9d6b4e8..d99df84 100644 --- a/src/models/role.go +++ b/src/models/role.go @@ -3,8 +3,10 @@ package models import ( "fmt" "pro2d/protos/pb" + "pro2d/src/common" "pro2d/src/components/db" "pro2d/src/components/logger" + "sync/atomic" ) type RoleModel struct { @@ -14,6 +16,8 @@ type RoleModel struct { Teams *TeamModel Equip *EquipModels Prop *PropModels + + lastSaveTs int64 } func RoleExistByUid(uid string) *RoleModel { @@ -102,3 +106,15 @@ func (m *RoleModel) GetAllHero() map[string]*pb.Hero { } return h } + +func (m *RoleModel) OnRecoverTimer(now int64) { + m.saveRoleData(now) +} + +func (m *RoleModel) saveRoleData(now int64) { + if now - m.lastSaveTs < common.SaveDataInterval { + return + } + atomic.StoreInt64(&m.lastSaveTs, now) + m.Update() +} \ No newline at end of file -- libgit2 0.21.2