Commit 6f0d72bd0a4b53e74afe665f5fe4034ac689dfd3

Authored by zhangqijia
1 parent 0cc58315

定时器功能完善优化

@@ -11,7 +11,7 @@ test: @@ -11,7 +11,7 @@ test:
11 http: 11 http:
12 go run cmd/http.go 12 go run cmd/http.go
13 13
14 -game:plugin 14 +game:
15 go run cmd/game.go 15 go run cmd/game.go
16 build: 16 build:
17 go build -o bin/account cmd/http.go 17 go build -o bin/account cmd/http.go
1 -Subproject commit 3aa479358834e2ea24c1c1b75c739b0524de1c4f 1 +Subproject commit bb9c916232c605571321f523802c17c469c8ab1c
src/components/net/server.go
@@ -102,7 +102,7 @@ func (s *Server) handleTimeOut() { @@ -102,7 +102,7 @@ func (s *Server) handleTimeOut() {
102 return true 102 return true
103 }) 103 })
104 104
105 - TimeWheel.AfterFunc(1*time.Second, s.handleTimeOut) 105 + timewheel.TimeOut(1*time.Second, s.handleTimeOut)
106 } 106 }
107 107
108 func (s *Server)Start() error { 108 func (s *Server)Start() error {
@@ -134,8 +134,6 @@ func (s *Server)Start() error { @@ -134,8 +134,6 @@ func (s *Server)Start() error {
134 s.StartWorkerPool() 134 s.StartWorkerPool()
135 135
136 //启动定时器 136 //启动定时器
137 - TimeWheel = timewheel.NewTimeWheel(common.TickMS * time.Millisecond, common.WheelSize)  
138 - TimeWheel.Start()  
139 s.handleTimeOut() 137 s.handleTimeOut()
140 138
141 //监听端口 139 //监听端口
src/components/timewheel/bucket.go deleted
@@ -1,85 +0,0 @@ @@ -1,85 +0,0 @@
1 -package timewheel  
2 -  
3 -import (  
4 - "container/list"  
5 - "sync"  
6 - "sync/atomic"  
7 - "unsafe"  
8 -)  
9 -type bucket struct {  
10 - //过期时间  
11 - expiration int64  
12 -  
13 - mu sync.Mutex  
14 - //相同过期时间的任务队列  
15 - timers *list.List  
16 -}  
17 -  
18 -func newBucket() *bucket {  
19 - return &bucket{  
20 - expiration: -1,  
21 - mu: sync.Mutex{},  
22 - timers: list.New(),  
23 - }  
24 -}  
25 -  
26 -func (b *bucket) SetExpiration(expiration int64) {  
27 - atomic.AddInt64(&b.expiration, expiration)  
28 -}  
29 -  
30 -func (b *bucket) Add(t *Timer) {  
31 - b.mu.Lock()  
32 - defer b.mu.Unlock()  
33 -  
34 - e := b.timers.PushBack(t)  
35 - t.setBucket(b)  
36 - t.element = e  
37 -}  
38 -  
39 -func (b *bucket) Flush(reinsert func(*Timer)) {  
40 - b.mu.Lock()  
41 - defer b.mu.Unlock()  
42 -  
43 - for e := b.timers.Front(); e != nil; {  
44 - next := e.Next()  
45 - t := e.Value.(*Timer)  
46 - b.remove(t)  
47 -  
48 - reinsert(t)  
49 - e = next  
50 - }  
51 -}  
52 -  
53 -func (b *bucket) remove(t *Timer) bool {  
54 - if t.getBucket() != b {  
55 - return false  
56 - }  
57 - b.timers.Remove(t.element)  
58 - t.setBucket(nil)  
59 - t.element = nil  
60 - return true  
61 -}  
62 -  
63 -func (b *bucket) Remove(t *Timer) bool {  
64 - b.mu.Lock()  
65 - defer b.mu.Unlock()  
66 -  
67 - return b.remove(t)  
68 -}  
69 -  
70 -type Timer struct {  
71 - expiration int64  
72 - //要被执行的任务  
73 - task func()  
74 -  
75 - b unsafe.Pointer  
76 - element *list.Element  
77 -}  
78 -  
79 -func (t *Timer) setBucket(b *bucket) {  
80 - atomic.StorePointer(&t.b, unsafe.Pointer(b))  
81 -}  
82 -  
83 -func (t *Timer) getBucket() *bucket {  
84 - return (*bucket)(atomic.LoadPointer(&t.b))  
85 -}  
86 \ No newline at end of file 0 \ No newline at end of file
src/components/timewheel/timerwheel.go 0 → 100644
@@ -0,0 +1,202 @@ @@ -0,0 +1,202 @@
  1 +package timewheel
  2 +
  3 +import (
  4 + "container/list"
  5 + "pro2d/src/common"
  6 + "pro2d/src/components/workpool"
  7 + "sync"
  8 + "sync/atomic"
  9 + "time"
  10 +)
  11 +
  12 +//skynet的时间轮 + 协程池
  13 +const (
  14 + TimeNearShift = 8
  15 + TimeNear = 1 << TimeNearShift
  16 + TimeLevelShift = 6
  17 + TimeLevel = 1 << TimeLevelShift
  18 + TimeNearMask = TimeNear - 1
  19 + TimeLevelMask = TimeLevel - 1
  20 +)
  21 +
  22 +type bucket struct {
  23 + expiration int32
  24 + timers *list.List
  25 +
  26 + mu sync.Mutex
  27 +}
  28 +
  29 +func newBucket() *bucket {
  30 + return &bucket{
  31 + expiration: -1,
  32 + timers: list.New(),
  33 + mu: sync.Mutex{},
  34 + }
  35 +}
  36 +
  37 +func (b*bucket) Add(t *timer) {
  38 + b.mu.Lock()
  39 + defer b.mu.Unlock()
  40 +
  41 + b.timers.PushBack(t)
  42 + //t.setBucket(b)
  43 + //t.element = e
  44 +}
  45 +
  46 +func (b*bucket) Flush(reinsert func(t *timer)) {
  47 + b.mu.Lock()
  48 + defer b.mu.Unlock()
  49 +
  50 + for e := b.timers.Front(); e != nil; {
  51 + next := e.Next()
  52 + reinsert(e.Value.(*timer))
  53 +
  54 + b.timers.Remove(e)
  55 + e = next
  56 + }
  57 +}
  58 +
  59 +type timer struct {
  60 + expiration uint32
  61 + f func()
  62 +}
  63 +
  64 +var TimingWheel *TimeWheel
  65 +
  66 +func init() {
  67 + TimingWheel = NewTimeWheel()
  68 + TimingWheel.Start()
  69 +}
  70 +type TimeWheel struct {
  71 + tick time.Duration
  72 + ticker *time.Ticker
  73 + near [TimeNear]*bucket
  74 + t [4][TimeLevel]*bucket
  75 + time uint32
  76 +
  77 + WorkPool *workpool.WorkPool
  78 + exit chan struct{}
  79 +}
  80 +
  81 +func NewTimeWheel() *TimeWheel {
  82 + tw := &TimeWheel{
  83 + tick: 10*time.Millisecond,
  84 + time: 0,
  85 + WorkPool: workpool.NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker),
  86 + exit: nil,
  87 + }
  88 + for i :=0; i < TimeNear; i++ {
  89 + tw.near[i] = newBucket()
  90 + }
  91 +
  92 + for i :=0; i < 4; i++ {
  93 + for j :=0; j < TimeLevel; j++ {
  94 + tw.t[i][j] = newBucket()
  95 + }
  96 + }
  97 + return tw
  98 +}
  99 +
  100 +func (tw *TimeWheel) add(t *timer) bool {
  101 + time := t.expiration
  102 + currentTime := tw.time
  103 + if time <= currentTime {
  104 + return false
  105 + }
  106 +
  107 + if (time | TimeNearMask) == (currentTime | TimeNearMask) {
  108 + tw.near[time&TimeNearMask].Add(t)
  109 + }else {
  110 + i := 0
  111 + mask := TimeNear << TimeNearShift
  112 + for i=0; i < 3; i ++ {
  113 + if (time | uint32(mask - 1)) == (currentTime | uint32(mask - 1)) {
  114 + break
  115 + }
  116 + mask <<= TimeLevelShift
  117 + }
  118 +
  119 + tw.t[i][((time>>(TimeNearShift + i*TimeLevelShift)) & TimeLevelMask)].Add(t)
  120 + }
  121 + return true
  122 +}
  123 +
  124 +func (tw *TimeWheel) addOrRun(t *timer) {
  125 + if !tw.add(t) {
  126 + workerID := int64(t.expiration) % tw.WorkPool.WorkerPoolSize
  127 + //将请求消息发送给任务队列
  128 + tw.WorkPool.TaskQueue[workerID] <- t.f
  129 + }
  130 +}
  131 +
  132 +func (tw *TimeWheel) moveList(level, idx int) {
  133 + current := tw.t[level][idx]
  134 + current.Flush(tw.addOrRun)
  135 +}
  136 +
  137 +func (tw *TimeWheel) shift() {
  138 + mask := TimeNear
  139 + tw.time++
  140 + ct := tw.time
  141 + if ct == 0 {
  142 + tw.moveList(3, 0)
  143 + }else {
  144 + time := ct >> TimeNearShift
  145 +
  146 + i := 0
  147 + for (ct & uint32(mask-1)) == 0{
  148 + idx := time & TimeLevelMask
  149 + if idx != 0 {
  150 + tw.moveList(i, int(idx))
  151 + break
  152 + }
  153 +
  154 + mask <<= TimeLevelShift
  155 + time >>= TimeLevelShift
  156 + i++
  157 + }
  158 + }
  159 +}
  160 +
  161 +func (tw *TimeWheel) execute() {
  162 + idx := tw.time & TimeNearMask
  163 + tw.near[idx].Flush(tw.addOrRun)
  164 +}
  165 +
  166 +func (tw *TimeWheel) update() {
  167 + tw.execute()
  168 + tw.shift()
  169 + tw.execute()
  170 +}
  171 +
  172 +func (tw *TimeWheel) Start() {
  173 + tw.ticker = time.NewTicker(tw.tick)
  174 + tw.WorkPool.StartWorkerPool()
  175 +
  176 + go func() {
  177 + for {
  178 + select {
  179 + case <- tw.ticker.C:
  180 + tw.update()
  181 + case <- tw.exit:
  182 + return
  183 + }
  184 + }
  185 + }()
  186 +}
  187 +
  188 +func (tw *TimeWheel) Stop() {
  189 + close(tw.exit)
  190 +}
  191 +
  192 +func (tw *TimeWheel) afterFunc(expiration time.Duration, f func()) {
  193 + time := atomic.LoadUint32(&tw.time)
  194 + tw.addOrRun(&timer{
  195 + expiration: uint32(expiration / tw.tick) + time,
  196 + f: f,
  197 + })
  198 +}
  199 +
  200 +func TimeOut(expire time.Duration, f func()) {
  201 + TimingWheel.afterFunc(expire, f)
  202 +}
0 \ No newline at end of file 203 \ No newline at end of file
src/components/timewheel/timewheel.go deleted
@@ -1,165 +0,0 @@ @@ -1,165 +0,0 @@
1 -package timewheel  
2 -  
3 -import (  
4 - "pro2d/src/common"  
5 - "pro2d/src/components/workpool"  
6 - "sync/atomic"  
7 - "time"  
8 - "unsafe"  
9 -)  
10 -  
11 -type TimeWheel struct {  
12 - ticker *time.Ticker  
13 - tickMs int64 //一滴答的时间 1ms 可以自定义 我们这里选择使用1ms  
14 - wheelSize int64  
15 - startMs int64 //开始时间 in millisecond  
16 - endMs int64  
17 - wheelTime int64 //跑完一圈所需时间  
18 - level int64 //层级  
19 -  
20 - //时间刻度 列表  
21 - bucket []*bucket  
22 - currentTime int64 //当前时间 in millisecond  
23 - prevflowWheel unsafe.Pointer // type: *TimingWheel  
24 - overflowWheel unsafe.Pointer // type: *TimingWheel  
25 - exitC chan struct{}  
26 -  
27 - WorkPool *workpool.WorkPool  
28 -}  
29 -  
30 -func NewTimeWheel(tick time.Duration, wheelSize int64) *TimeWheel {  
31 - //转化为毫秒  
32 - tickMs := int64(tick / time.Millisecond)  
33 - //如果小于零  
34 - if tickMs <=0 {  
35 - panic("tick must be greater than or equal to 1 ms")  
36 - }  
37 -  
38 - startMs := time.Now().UnixMilli() //ms  
39 -  
40 - workpool := workpool.NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker)  
41 - return newTimingWheel(tickMs, wheelSize, startMs, 0, nil, workpool)  
42 -}  
43 -  
44 -func newTimingWheel(tick, wheelSize int64, start, level int64, prev *TimeWheel, pool *workpool.WorkPool) *TimeWheel {  
45 - buckets := make([]*bucket, wheelSize)  
46 - for i := range buckets {  
47 - buckets[i] = newBucket()  
48 - }  
49 -  
50 - return &TimeWheel{  
51 - tickMs: tick,  
52 - wheelSize: wheelSize,  
53 - startMs: start,  
54 - endMs: wheelSize * tick + start,  
55 - wheelTime: wheelSize * tick,  
56 - bucket: buckets,  
57 - currentTime: truncate(start, tick),  
58 - exitC: make(chan struct{}),  
59 - WorkPool: pool,  
60 -  
61 - prevflowWheel: unsafe.Pointer(prev),  
62 - level: level,  
63 - }  
64 -}  
65 -  
66 -func truncate(dst, m int64) int64 {  
67 - return dst - dst%m  
68 -}  
69 -  
70 -func (tw *TimeWheel) add(t *Timer) bool {  
71 - currentTime := atomic.LoadInt64(&tw.currentTime)  
72 - if t.expiration < currentTime + tw.tickMs {  
73 - return false  
74 - }else if t.expiration < currentTime + tw.wheelTime {  
75 - virtualID := t.expiration / tw.tickMs //需要多少滴答数  
76 - b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize  
77 - b.Add(t)  
78 -  
79 - b.SetExpiration(virtualID * tw.tickMs)  
80 - }else {  
81 - overflowWheel := atomic.LoadPointer(&tw.overflowWheel)  
82 - if overflowWheel == nil {  
83 - level := atomic.LoadInt64(&tw.level) + 1  
84 - atomic.CompareAndSwapPointer(  
85 - &tw.overflowWheel,  
86 - nil,  
87 - unsafe.Pointer(newTimingWheel(tw.wheelTime, tw.wheelSize, currentTime, level, tw , tw.WorkPool)),  
88 - )  
89 - overflowWheel = atomic.LoadPointer(&tw.overflowWheel)  
90 - }  
91 - //递归添加到下一级定时器中  
92 - (*TimeWheel)(overflowWheel).add(t)  
93 - }  
94 -  
95 - return true  
96 -}  
97 -  
98 -func (tw *TimeWheel) addOrRun(t *Timer) {  
99 - if !tw.add(t) {  
100 - workerID := t.expiration % tw.WorkPool.WorkerPoolSize  
101 - //将请求消息发送给任务队列  
102 - tw.WorkPool.TaskQueue[workerID] <- t.task  
103 - }  
104 -}  
105 -  
106 -//拨动时钟  
107 -func (tw *TimeWheel) advanceClock(expiration int64) {  
108 - level := atomic.LoadInt64(&tw.level)  
109 - currentTime := truncate(expiration, tw.tickMs)  
110 - atomic.StoreInt64(&tw.currentTime, currentTime)  
111 -  
112 - if level == 0 {  
113 - virtualID := expiration / tw.tickMs //需要多少滴答数  
114 - b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize  
115 - b.Flush(tw.addOrRun)  
116 - } else {  
117 - prevflowWheel := atomic.LoadPointer(&tw.prevflowWheel)  
118 - if prevflowWheel != nil {  
119 - virtualID := expiration / tw.tickMs //需要多少滴答数  
120 - b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize  
121 - b.Flush((*TimeWheel)(prevflowWheel).addOrRun)  
122 - }  
123 - }  
124 -  
125 - //如果基础的时钟指针转完了一圈,则递归拨动下一级时钟  
126 - if currentTime >= tw.endMs {  
127 - atomic.StoreInt64(&tw.startMs, currentTime)  
128 - atomic.StoreInt64(&tw.endMs, currentTime + tw.wheelTime)  
129 -  
130 - overflowWheel := atomic.LoadPointer(&tw.overflowWheel)  
131 - if overflowWheel != nil {  
132 - (*TimeWheel)(overflowWheel).advanceClock(currentTime)  
133 - }  
134 - }  
135 -}  
136 -  
137 -  
138 -func (tw *TimeWheel) AfterFunc(d time.Duration, f func()) *Timer {  
139 - t := &Timer{  
140 - expiration: time.Now().UTC().Add(d).UnixMilli(),  
141 - task: f,  
142 - }  
143 - tw.addOrRun(t)  
144 - return t  
145 -}  
146 -  
147 -func (tw *TimeWheel) Start() {  
148 - tw.ticker = time.NewTicker(time.Duration(tw.tickMs) * time.Millisecond)  
149 - tw.WorkPool.StartWorkerPool()  
150 -  
151 - go func() {  
152 - for {  
153 - select {  
154 - case t := <- tw.ticker.C:  
155 - tw.advanceClock(t.UnixMilli())  
156 - case <- tw.exitC:  
157 - return  
158 - }  
159 - }  
160 - }()  
161 -}  
162 -  
163 -func (tw *TimeWheel) Stop() {  
164 - tw.exitC <- struct{}{}  
165 -}  
166 \ No newline at end of file 0 \ No newline at end of file
src/components/timewheel/timewheel_test.go
@@ -5,26 +5,14 @@ import ( @@ -5,26 +5,14 @@ import (
5 "testing" 5 "testing"
6 "time" 6 "time"
7 ) 7 )
8 -var tw *TimeWheel  
9 8
10 -func Add() {  
11 - fmt.Println("ADD : 123456")  
12 - tw.AfterFunc(6*time.Second, Add) 9 +func PRINT() {
  10 + fmt.Println("12312312312")
13 } 11 }
14 12
15 -func Add1() {  
16 - fmt.Println("GET : 78901112")  
17 - tw.AfterFunc(9*time.Second, Add1)  
18 -}  
19 -  
20 -func TestTimeWheel_AfterFunc(t *testing.T) {  
21 -  
22 - tw = NewTimeWheel(time.Second, 5)  
23 - tw.Start()  
24 - defer tw.Stop()  
25 -  
26 -  
27 - Add()  
28 - Add1()  
29 - time.Sleep(time.Second * 200) 13 +func TestTimeWheel_Start(t *testing.T) {
  14 + TimeOut(1 * time.Second, func() {
  15 + fmt.Println("12312313123")
  16 + })
  17 + select{}
30 } 18 }
src/models/role.go
@@ -13,8 +13,8 @@ type RoleModel struct { @@ -13,8 +13,8 @@ type RoleModel struct {
13 Role *pb.Role 13 Role *pb.Role
14 Heros HeroMap 14 Heros HeroMap
15 Teams *TeamModel 15 Teams *TeamModel
16 - Equip *pb.Equipment  
17 - Prop *pb.Prop 16 + Equip *EquipModels
  17 + Prop *PropModels
18 } 18 }
19 19
20 func RoleExistByUid(uid string) *RoleModel { 20 func RoleExistByUid(uid string) *RoleModel {
@@ -25,10 +25,17 @@ func RoleExistByUid(uid string) *RoleModel { @@ -25,10 +25,17 @@ func RoleExistByUid(uid string) *RoleModel {
25 return nil 25 return nil
26 } 26 }
27 27
28 - return &RoleModel{ 28 +
  29 + r := &RoleModel{
29 MgoColl: db.NewMongoColl(strconv.Itoa(int(data.Id)), data), 30 MgoColl: db.NewMongoColl(strconv.Itoa(int(data.Id)), data),
30 Role: data, 31 Role: data,
  32 + Heros: make(HeroMap),
  33 + Teams: new(TeamModel),
  34 + Equip: new(EquipModels),
  35 + Prop: new(PropModels),
31 } 36 }
  37 + r.LoadAll()
  38 + return r
32 } 39 }
33 40
34 func NewRole(id int64) *RoleModel { 41 func NewRole(id int64) *RoleModel {
@@ -37,14 +44,49 @@ func NewRole(id int64) *RoleModel { @@ -37,14 +44,49 @@ func NewRole(id int64) *RoleModel {
37 MgoColl: db.NewMongoColl(strconv.Itoa(int(id)), data), 44 MgoColl: db.NewMongoColl(strconv.Itoa(int(id)), data),
38 Role: data, 45 Role: data,
39 Heros: make(HeroMap), 46 Heros: make(HeroMap),
  47 + Teams: new(TeamModel),
  48 + Equip: new(EquipModels),
  49 + Prop: new(PropModels),
40 } 50 }
41 return m 51 return m
42 } 52 }
43 53
44 -func (m *RoleModel) LoadAll() { 54 +func (m *RoleModel) LoadHero() {
  55 + m.Heros["test"] = NewHero(0)
  56 + m.Heros["test"].Hero = &pb.Hero{
  57 + Id: 1,
  58 + RoleId: m.Role.Id,
  59 + Type: 1,
  60 + Level: 1,
  61 + ReinCount: 0,
  62 + ReinPoint: 0,
  63 + Equipments: "123123",
  64 + }
45 } 65 }
46 66
47 -func (m *RoleModel) LoadHero() { 67 +func (m *RoleModel) LoadTeams() {
  68 + m.Teams = NewTeam(0)
  69 + m.Teams.Team = &pb.Team{
  70 + Id: 1,
  71 + HeroIds: "1",
  72 + }
  73 +}
  74 +
  75 +func (m *RoleModel) LoadEquips() {
  76 + m.Equip = NewEquip(0)
  77 + m.Equip.Equip = &pb.Equipment{
  78 + Id: 0,
  79 + RoleId: m.Role.Id,
  80 + Type: 0,
  81 + Equip: false,
  82 + EnhanceLevel: false,
  83 + }
  84 +}
  85 +
  86 +func (m *RoleModel) LoadAll() {
  87 + m.LoadHero()
  88 + m.LoadTeams()
  89 + m.LoadEquips()
48 } 90 }
49 91
50 func (m *RoleModel) AddHero(hero *pb.Hero) { 92 func (m *RoleModel) AddHero(hero *pb.Hero) {
@@ -52,4 +94,12 @@ func (m *RoleModel) AddHero(hero *pb.Hero) { @@ -52,4 +94,12 @@ func (m *RoleModel) AddHero(hero *pb.Hero) {
52 h.Hero = hero 94 h.Hero = hero
53 h.Create() 95 h.Create()
54 m.Heros[fmt.Sprintf("%d%d", m.Role.Id, h.Hero.Id)] = h 96 m.Heros[fmt.Sprintf("%d%d", m.Role.Id, h.Hero.Id)] = h
55 -}  
56 \ No newline at end of file 97 \ No newline at end of file
  98 +}
  99 +
  100 +func (m *RoleModel) GetAllHero() map[string]*pb.Hero {
  101 + h := make(map[string]*pb.Hero)
  102 + for k, hero := range m.Heros {
  103 + h[k] = hero.Hero
  104 + }
  105 + return h
  106 +}
src/plugin/RolePlugin.go
@@ -54,8 +54,10 @@ func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) { @@ -54,8 +54,10 @@ func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) {
54 // Team: nil, 54 // Team: nil,
55 // Equips: nil, 55 // Equips: nil,
56 //} 56 //}
57 - return 0, &pb.LoginResponse{  
58 - Uid: role.Role.Uid,  
59 - Device: role.Role.Device, 57 + return 0, &pb.RoleRsp{
  58 + Role: role.Role,
  59 + Hero: role.GetAllHero(),
  60 + Team: role.Teams.Team,
  61 + Equips: []*pb.Equipment{role.Equip.Equip},
60 } 62 }
61 } 63 }
62 \ No newline at end of file 64 \ No newline at end of file