From 6f0d72bd0a4b53e74afe665f5fe4034ac689dfd3 Mon Sep 17 00:00:00 2001 From: zqj <582132116@qq.com> Date: Mon, 14 Mar 2022 10:16:37 +0800 Subject: [PATCH] 定时器功能完善优化 --- Makefile | 2 +- protos | 2 +- src/components/net/server.go | 4 +--- src/components/timewheel/bucket.go | 85 ------------------------------------------------------------------------------------- src/components/timewheel/timerwheel.go | 202 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/components/timewheel/timewheel.go | 165 --------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/components/timewheel/timewheel_test.go | 26 +++++++------------------- src/models/role.go | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++------ src/plugin/RolePlugin.go | 8 +++++--- 9 files changed, 273 insertions(+), 283 deletions(-) delete mode 100644 src/components/timewheel/bucket.go create mode 100644 src/components/timewheel/timerwheel.go delete mode 100644 src/components/timewheel/timewheel.go diff --git a/Makefile b/Makefile index 6b35c0d..39371f3 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ test: http: go run cmd/http.go -game:plugin +game: go run cmd/game.go build: go build -o bin/account cmd/http.go diff --git a/protos b/protos index 3aa4793..bb9c916 160000 --- a/protos +++ b/protos @@ -1 +1 @@ -Subproject commit 3aa479358834e2ea24c1c1b75c739b0524de1c4f +Subproject commit bb9c916232c605571321f523802c17c469c8ab1c diff --git a/src/components/net/server.go b/src/components/net/server.go index 1635ac8..7634f76 100644 --- a/src/components/net/server.go +++ b/src/components/net/server.go @@ -102,7 +102,7 @@ func (s *Server) handleTimeOut() { return true }) - TimeWheel.AfterFunc(1*time.Second, s.handleTimeOut) + timewheel.TimeOut(1*time.Second, s.handleTimeOut) } func (s *Server)Start() error { @@ -134,8 +134,6 @@ func (s *Server)Start() error { s.StartWorkerPool() //启动定时器 - TimeWheel = timewheel.NewTimeWheel(common.TickMS * time.Millisecond, common.WheelSize) - TimeWheel.Start() s.handleTimeOut() //监听端口 diff --git a/src/components/timewheel/bucket.go b/src/components/timewheel/bucket.go deleted file mode 100644 index 0c54b51..0000000 --- a/src/components/timewheel/bucket.go +++ /dev/null @@ -1,85 +0,0 @@ -package timewheel - -import ( - "container/list" - "sync" - "sync/atomic" - "unsafe" -) -type bucket struct { - //过期时间 - expiration int64 - - mu sync.Mutex - //相同过期时间的任务队列 - timers *list.List -} - -func newBucket() *bucket { - return &bucket{ - expiration: -1, - mu: sync.Mutex{}, - timers: list.New(), - } -} - -func (b *bucket) SetExpiration(expiration int64) { - atomic.AddInt64(&b.expiration, expiration) -} - -func (b *bucket) Add(t *Timer) { - b.mu.Lock() - defer b.mu.Unlock() - - e := b.timers.PushBack(t) - t.setBucket(b) - t.element = e -} - -func (b *bucket) Flush(reinsert func(*Timer)) { - b.mu.Lock() - defer b.mu.Unlock() - - for e := b.timers.Front(); e != nil; { - next := e.Next() - t := e.Value.(*Timer) - b.remove(t) - - reinsert(t) - e = next - } -} - -func (b *bucket) remove(t *Timer) bool { - if t.getBucket() != b { - return false - } - b.timers.Remove(t.element) - t.setBucket(nil) - t.element = nil - return true -} - -func (b *bucket) Remove(t *Timer) bool { - b.mu.Lock() - defer b.mu.Unlock() - - return b.remove(t) -} - -type Timer struct { - expiration int64 - //要被执行的任务 - task func() - - b unsafe.Pointer - element *list.Element -} - -func (t *Timer) setBucket(b *bucket) { - atomic.StorePointer(&t.b, unsafe.Pointer(b)) -} - -func (t *Timer) getBucket() *bucket { - return (*bucket)(atomic.LoadPointer(&t.b)) -} \ No newline at end of file diff --git a/src/components/timewheel/timerwheel.go b/src/components/timewheel/timerwheel.go new file mode 100644 index 0000000..7339f3c --- /dev/null +++ b/src/components/timewheel/timerwheel.go @@ -0,0 +1,202 @@ +package timewheel + +import ( + "container/list" + "pro2d/src/common" + "pro2d/src/components/workpool" + "sync" + "sync/atomic" + "time" +) + +//skynet的时间轮 + 协程池 +const ( + TimeNearShift = 8 + TimeNear = 1 << TimeNearShift + TimeLevelShift = 6 + TimeLevel = 1 << TimeLevelShift + TimeNearMask = TimeNear - 1 + TimeLevelMask = TimeLevel - 1 +) + +type bucket struct { + expiration int32 + timers *list.List + + mu sync.Mutex +} + +func newBucket() *bucket { + return &bucket{ + expiration: -1, + timers: list.New(), + mu: sync.Mutex{}, + } +} + +func (b*bucket) Add(t *timer) { + b.mu.Lock() + defer b.mu.Unlock() + + b.timers.PushBack(t) + //t.setBucket(b) + //t.element = e +} + +func (b*bucket) Flush(reinsert func(t *timer)) { + b.mu.Lock() + defer b.mu.Unlock() + + for e := b.timers.Front(); e != nil; { + next := e.Next() + reinsert(e.Value.(*timer)) + + b.timers.Remove(e) + e = next + } +} + +type timer struct { + expiration uint32 + f func() +} + +var TimingWheel *TimeWheel + +func init() { + TimingWheel = NewTimeWheel() + TimingWheel.Start() +} +type TimeWheel struct { + tick time.Duration + ticker *time.Ticker + near [TimeNear]*bucket + t [4][TimeLevel]*bucket + time uint32 + + WorkPool *workpool.WorkPool + exit chan struct{} +} + +func NewTimeWheel() *TimeWheel { + tw := &TimeWheel{ + tick: 10*time.Millisecond, + time: 0, + WorkPool: workpool.NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker), + exit: nil, + } + for i :=0; i < TimeNear; i++ { + tw.near[i] = newBucket() + } + + for i :=0; i < 4; i++ { + for j :=0; j < TimeLevel; j++ { + tw.t[i][j] = newBucket() + } + } + return tw +} + +func (tw *TimeWheel) add(t *timer) bool { + time := t.expiration + currentTime := tw.time + if time <= currentTime { + return false + } + + if (time | TimeNearMask) == (currentTime | TimeNearMask) { + tw.near[time&TimeNearMask].Add(t) + }else { + i := 0 + mask := TimeNear << TimeNearShift + for i=0; i < 3; i ++ { + if (time | uint32(mask - 1)) == (currentTime | uint32(mask - 1)) { + break + } + mask <<= TimeLevelShift + } + + tw.t[i][((time>>(TimeNearShift + i*TimeLevelShift)) & TimeLevelMask)].Add(t) + } + return true +} + +func (tw *TimeWheel) addOrRun(t *timer) { + if !tw.add(t) { + workerID := int64(t.expiration) % tw.WorkPool.WorkerPoolSize + //将请求消息发送给任务队列 + tw.WorkPool.TaskQueue[workerID] <- t.f + } +} + +func (tw *TimeWheel) moveList(level, idx int) { + current := tw.t[level][idx] + current.Flush(tw.addOrRun) +} + +func (tw *TimeWheel) shift() { + mask := TimeNear + tw.time++ + ct := tw.time + if ct == 0 { + tw.moveList(3, 0) + }else { + time := ct >> TimeNearShift + + i := 0 + for (ct & uint32(mask-1)) == 0{ + idx := time & TimeLevelMask + if idx != 0 { + tw.moveList(i, int(idx)) + break + } + + mask <<= TimeLevelShift + time >>= TimeLevelShift + i++ + } + } +} + +func (tw *TimeWheel) execute() { + idx := tw.time & TimeNearMask + tw.near[idx].Flush(tw.addOrRun) +} + +func (tw *TimeWheel) update() { + tw.execute() + tw.shift() + tw.execute() +} + +func (tw *TimeWheel) Start() { + tw.ticker = time.NewTicker(tw.tick) + tw.WorkPool.StartWorkerPool() + + go func() { + for { + select { + case <- tw.ticker.C: + tw.update() + case <- tw.exit: + return + } + } + }() +} + +func (tw *TimeWheel) Stop() { + close(tw.exit) +} + +func (tw *TimeWheel) afterFunc(expiration time.Duration, f func()) { + time := atomic.LoadUint32(&tw.time) + tw.addOrRun(&timer{ + expiration: uint32(expiration / tw.tick) + time, + f: f, + }) +} + +func TimeOut(expire time.Duration, f func()) { + TimingWheel.afterFunc(expire, f) +} \ No newline at end of file diff --git a/src/components/timewheel/timewheel.go b/src/components/timewheel/timewheel.go deleted file mode 100644 index f17a9e1..0000000 --- a/src/components/timewheel/timewheel.go +++ /dev/null @@ -1,165 +0,0 @@ -package timewheel - -import ( - "pro2d/src/common" - "pro2d/src/components/workpool" - "sync/atomic" - "time" - "unsafe" -) - -type TimeWheel struct { - ticker *time.Ticker - tickMs int64 //一滴答的时间 1ms 可以自定义 我们这里选择使用1ms - wheelSize int64 - startMs int64 //开始时间 in millisecond - endMs int64 - wheelTime int64 //跑完一圈所需时间 - level int64 //层级 - - //时间刻度 列表 - bucket []*bucket - currentTime int64 //当前时间 in millisecond - prevflowWheel unsafe.Pointer // type: *TimingWheel - overflowWheel unsafe.Pointer // type: *TimingWheel - exitC chan struct{} - - WorkPool *workpool.WorkPool -} - -func NewTimeWheel(tick time.Duration, wheelSize int64) *TimeWheel { - //转化为毫秒 - tickMs := int64(tick / time.Millisecond) - //如果小于零 - if tickMs <=0 { - panic("tick must be greater than or equal to 1 ms") - } - - startMs := time.Now().UnixMilli() //ms - - workpool := workpool.NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker) - return newTimingWheel(tickMs, wheelSize, startMs, 0, nil, workpool) -} - -func newTimingWheel(tick, wheelSize int64, start, level int64, prev *TimeWheel, pool *workpool.WorkPool) *TimeWheel { - buckets := make([]*bucket, wheelSize) - for i := range buckets { - buckets[i] = newBucket() - } - - return &TimeWheel{ - tickMs: tick, - wheelSize: wheelSize, - startMs: start, - endMs: wheelSize * tick + start, - wheelTime: wheelSize * tick, - bucket: buckets, - currentTime: truncate(start, tick), - exitC: make(chan struct{}), - WorkPool: pool, - - prevflowWheel: unsafe.Pointer(prev), - level: level, - } -} - -func truncate(dst, m int64) int64 { - return dst - dst%m -} - -func (tw *TimeWheel) add(t *Timer) bool { - currentTime := atomic.LoadInt64(&tw.currentTime) - if t.expiration < currentTime + tw.tickMs { - return false - }else if t.expiration < currentTime + tw.wheelTime { - virtualID := t.expiration / tw.tickMs //需要多少滴答数 - b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize - b.Add(t) - - b.SetExpiration(virtualID * tw.tickMs) - }else { - overflowWheel := atomic.LoadPointer(&tw.overflowWheel) - if overflowWheel == nil { - level := atomic.LoadInt64(&tw.level) + 1 - atomic.CompareAndSwapPointer( - &tw.overflowWheel, - nil, - unsafe.Pointer(newTimingWheel(tw.wheelTime, tw.wheelSize, currentTime, level, tw , tw.WorkPool)), - ) - overflowWheel = atomic.LoadPointer(&tw.overflowWheel) - } - //递归添加到下一级定时器中 - (*TimeWheel)(overflowWheel).add(t) - } - - return true -} - -func (tw *TimeWheel) addOrRun(t *Timer) { - if !tw.add(t) { - workerID := t.expiration % tw.WorkPool.WorkerPoolSize - //将请求消息发送给任务队列 - tw.WorkPool.TaskQueue[workerID] <- t.task - } -} - -//拨动时钟 -func (tw *TimeWheel) advanceClock(expiration int64) { - level := atomic.LoadInt64(&tw.level) - currentTime := truncate(expiration, tw.tickMs) - atomic.StoreInt64(&tw.currentTime, currentTime) - - if level == 0 { - virtualID := expiration / tw.tickMs //需要多少滴答数 - b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize - b.Flush(tw.addOrRun) - } else { - prevflowWheel := atomic.LoadPointer(&tw.prevflowWheel) - if prevflowWheel != nil { - virtualID := expiration / tw.tickMs //需要多少滴答数 - b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize - b.Flush((*TimeWheel)(prevflowWheel).addOrRun) - } - } - - //如果基础的时钟指针转完了一圈,则递归拨动下一级时钟 - if currentTime >= tw.endMs { - atomic.StoreInt64(&tw.startMs, currentTime) - atomic.StoreInt64(&tw.endMs, currentTime + tw.wheelTime) - - overflowWheel := atomic.LoadPointer(&tw.overflowWheel) - if overflowWheel != nil { - (*TimeWheel)(overflowWheel).advanceClock(currentTime) - } - } -} - - -func (tw *TimeWheel) AfterFunc(d time.Duration, f func()) *Timer { - t := &Timer{ - expiration: time.Now().UTC().Add(d).UnixMilli(), - task: f, - } - tw.addOrRun(t) - return t -} - -func (tw *TimeWheel) Start() { - tw.ticker = time.NewTicker(time.Duration(tw.tickMs) * time.Millisecond) - tw.WorkPool.StartWorkerPool() - - go func() { - for { - select { - case t := <- tw.ticker.C: - tw.advanceClock(t.UnixMilli()) - case <- tw.exitC: - return - } - } - }() -} - -func (tw *TimeWheel) Stop() { - tw.exitC <- struct{}{} -} \ No newline at end of file diff --git a/src/components/timewheel/timewheel_test.go b/src/components/timewheel/timewheel_test.go index 460377f..b19776a 100644 --- a/src/components/timewheel/timewheel_test.go +++ b/src/components/timewheel/timewheel_test.go @@ -5,26 +5,14 @@ import ( "testing" "time" ) -var tw *TimeWheel -func Add() { - fmt.Println("ADD : 123456") - tw.AfterFunc(6*time.Second, Add) +func PRINT() { + fmt.Println("12312312312") } -func Add1() { - fmt.Println("GET : 78901112") - tw.AfterFunc(9*time.Second, Add1) -} - -func TestTimeWheel_AfterFunc(t *testing.T) { - - tw = NewTimeWheel(time.Second, 5) - tw.Start() - defer tw.Stop() - - - Add() - Add1() - time.Sleep(time.Second * 200) +func TestTimeWheel_Start(t *testing.T) { + TimeOut(1 * time.Second, func() { + fmt.Println("12312313123") + }) + select{} } diff --git a/src/models/role.go b/src/models/role.go index 8d518ea..4faaa07 100644 --- a/src/models/role.go +++ b/src/models/role.go @@ -13,8 +13,8 @@ type RoleModel struct { Role *pb.Role Heros HeroMap Teams *TeamModel - Equip *pb.Equipment - Prop *pb.Prop + Equip *EquipModels + Prop *PropModels } func RoleExistByUid(uid string) *RoleModel { @@ -25,10 +25,17 @@ func RoleExistByUid(uid string) *RoleModel { return nil } - return &RoleModel{ + + r := &RoleModel{ MgoColl: db.NewMongoColl(strconv.Itoa(int(data.Id)), data), Role: data, + Heros: make(HeroMap), + Teams: new(TeamModel), + Equip: new(EquipModels), + Prop: new(PropModels), } + r.LoadAll() + return r } func NewRole(id int64) *RoleModel { @@ -37,14 +44,49 @@ func NewRole(id int64) *RoleModel { MgoColl: db.NewMongoColl(strconv.Itoa(int(id)), data), Role: data, Heros: make(HeroMap), + Teams: new(TeamModel), + Equip: new(EquipModels), + Prop: new(PropModels), } return m } -func (m *RoleModel) LoadAll() { +func (m *RoleModel) LoadHero() { + m.Heros["test"] = NewHero(0) + m.Heros["test"].Hero = &pb.Hero{ + Id: 1, + RoleId: m.Role.Id, + Type: 1, + Level: 1, + ReinCount: 0, + ReinPoint: 0, + Equipments: "123123", + } } -func (m *RoleModel) LoadHero() { +func (m *RoleModel) LoadTeams() { + m.Teams = NewTeam(0) + m.Teams.Team = &pb.Team{ + Id: 1, + HeroIds: "1", + } +} + +func (m *RoleModel) LoadEquips() { + m.Equip = NewEquip(0) + m.Equip.Equip = &pb.Equipment{ + Id: 0, + RoleId: m.Role.Id, + Type: 0, + Equip: false, + EnhanceLevel: false, + } +} + +func (m *RoleModel) LoadAll() { + m.LoadHero() + m.LoadTeams() + m.LoadEquips() } func (m *RoleModel) AddHero(hero *pb.Hero) { @@ -52,4 +94,12 @@ func (m *RoleModel) AddHero(hero *pb.Hero) { h.Hero = hero h.Create() m.Heros[fmt.Sprintf("%d%d", m.Role.Id, h.Hero.Id)] = h -} \ No newline at end of file +} + +func (m *RoleModel) GetAllHero() map[string]*pb.Hero { + h := make(map[string]*pb.Hero) + for k, hero := range m.Heros { + h[k] = hero.Hero + } + return h +} diff --git a/src/plugin/RolePlugin.go b/src/plugin/RolePlugin.go index 461bc3b..ff1f15f 100644 --- a/src/plugin/RolePlugin.go +++ b/src/plugin/RolePlugin.go @@ -54,8 +54,10 @@ func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) { // Team: nil, // Equips: nil, //} - return 0, &pb.LoginResponse{ - Uid: role.Role.Uid, - Device: role.Role.Device, + return 0, &pb.RoleRsp{ + Role: role.Role, + Hero: role.GetAllHero(), + Team: role.Teams.Team, + Equips: []*pb.Equipment{role.Equip.Equip}, } } \ No newline at end of file -- libgit2 0.21.2