Commit 0cc583150940444d548e9f8597256ec30f4303be

Authored by zhangqijia
1 parent 98b0736d

添加定时器, 检查心跳

@@ -6,6 +6,7 @@ import ( @@ -6,6 +6,7 @@ import (
6 "pro2d/conf" 6 "pro2d/conf"
7 "pro2d/src/components/logger" 7 "pro2d/src/components/logger"
8 "pro2d/src/components/net" 8 "pro2d/src/components/net"
  9 + _ "pro2d/src/plugin"
9 "syscall" 10 "syscall"
10 ) 11 )
11 12
1 -Subproject commit c29c12e0a07ed8e3a35a451ca7dbdadbc4bb70fb 1 +Subproject commit 3aa479358834e2ea24c1c1b75c739b0524de1c4f
src/common/common.go
1 package common 1 package common
2 2
3 const ( 3 const (
  4 + //协程池 大小
4 WorkerPoolSize = 10 5 WorkerPoolSize = 10
5 MaxTaskPerWorker = 100 6 MaxTaskPerWorker = 100
6 - MaxPacketSize = 10 * 1024 * 1024  
7 - MaxMsgChanLen = 20  
8 -)  
9 7
10 -const ( 8 + //包头
11 HEADLEN = 16 9 HEADLEN = 16
12 10
  11 + //jwt
13 Pro2DTokenSignedString = "Pro2DSecret" 12 Pro2DTokenSignedString = "Pro2DSecret"
  13 +
  14 + //定时器
  15 + TickMS = 10
  16 + WheelSize = 3600
  17 +
  18 + //心跳
  19 + HEART_TIMER_INTERVAL = 5 //s
  20 + HEART_TIMEOUT_COUNT_MAX = 20 //最大超时次数
14 ) 21 )
src/components/kafkatimer/delayqueue.go deleted
@@ -1,183 +0,0 @@ @@ -1,183 +0,0 @@
1 -package kafkatimer  
2 -  
3 -import (  
4 - "container/heap"  
5 - "sync"  
6 - "sync/atomic"  
7 - "time"  
8 -)  
9 -  
10 -type item struct {  
11 - Value interface{}  
12 - Priority int64  
13 - Index int  
14 -}  
15 -  
16 -// this is a priority queue as implemented by a min heap  
17 -// ie. the 0th element is the *lowest* value  
18 -type priorityQueue []*item  
19 -  
20 -func newPriorityQueue(capacity int) priorityQueue {  
21 - return make(priorityQueue, 0, capacity)  
22 -}  
23 -  
24 -func (pq priorityQueue) Len() int {  
25 - return len(pq)  
26 -}  
27 -  
28 -func (pq priorityQueue) Less(i, j int) bool {  
29 - return pq[i].Priority < pq[j].Priority  
30 -}  
31 -  
32 -func (pq priorityQueue) Swap(i, j int) {  
33 - pq[i], pq[j] = pq[j], pq[i]  
34 - pq[i].Index = i  
35 - pq[j].Index = j  
36 -}  
37 -  
38 -func (pq *priorityQueue) Push(x interface{}) {  
39 - n := len(*pq)  
40 - c := cap(*pq)  
41 - if n+1 > c {  
42 - npq := make(priorityQueue, n, c*2)  
43 - copy(npq, *pq)  
44 - *pq = npq  
45 - }  
46 - *pq = (*pq)[0 : n+1]  
47 - item := x.(*item)  
48 - item.Index = n  
49 - (*pq)[n] = item  
50 -}  
51 -  
52 -func (pq *priorityQueue) Pop() interface{} {  
53 - n := len(*pq)  
54 - c := cap(*pq)  
55 - if n < (c/2) && c > 25 {  
56 - npq := make(priorityQueue, n, c/2)  
57 - copy(npq, *pq)  
58 - *pq = npq  
59 - }  
60 - item := (*pq)[n-1]  
61 - item.Index = -1  
62 - *pq = (*pq)[0 : n-1]  
63 - return item  
64 -}  
65 -  
66 -func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) {  
67 - if pq.Len() == 0 {  
68 - return nil, 0  
69 - }  
70 -  
71 - item := (*pq)[0]  
72 - if item.Priority > max {  
73 - return nil, item.Priority - max  
74 - }  
75 - heap.Remove(pq, 0)  
76 -  
77 - return item, 0  
78 -}  
79 -  
80 -// The end of PriorityQueue implementation.  
81 -  
82 -// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which  
83 -// an element can only be taken when its delay has expired. The head of the  
84 -// queue is the *Delayed* element whose delay expired furthest in the past.  
85 -type DelayQueue struct {  
86 - C chan interface{}  
87 -  
88 - mu sync.Mutex  
89 - pq priorityQueue  
90 -  
91 - // Similar to the sleeping state of runtime.timers.  
92 - sleeping int32  
93 - wakeupC chan struct{}  
94 -}  
95 -  
96 -// New creates an instance of delayQueue with the specified size.  
97 -func New(size int) *DelayQueue {  
98 - return &DelayQueue{  
99 - C: make(chan interface{}),  
100 - pq: newPriorityQueue(size),  
101 - wakeupC: make(chan struct{}),  
102 - }  
103 -}  
104 -  
105 -// Offer inserts the element into the current queue.  
106 -func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {  
107 - item := &item{Value: elem, Priority: expiration}  
108 -  
109 - dq.mu.Lock()  
110 - heap.Push(&dq.pq, item)  
111 - index := item.Index  
112 - dq.mu.Unlock()  
113 -  
114 - if index == 0 {  
115 - // A new item with the earliest expiration is added.  
116 - if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {  
117 - dq.wakeupC <- struct{}{}  
118 - }  
119 - }  
120 -}  
121 -  
122 -// Poll starts an infinite loop, in which it continually waits for an element  
123 -// to expire and then send the expired element to the channel C.  
124 -func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) {  
125 - for {  
126 - now := nowF()  
127 -  
128 - dq.mu.Lock()  
129 - item, delta := dq.pq.PeekAndShift(now)  
130 - if item == nil {  
131 - // No items left or at least one item is pending.  
132 -  
133 - // We must ensure the atomicity of the whole operation, which is  
134 - // composed of the above PeekAndShift and the following StoreInt32,  
135 - // to avoid possible race conditions between Offer and Poll.  
136 - atomic.StoreInt32(&dq.sleeping, 1)  
137 - }  
138 - dq.mu.Unlock()  
139 -  
140 - if item == nil {  
141 - if delta == 0 {  
142 - // No items left.  
143 - select {  
144 - case <-dq.wakeupC:  
145 - // Wait until a new item is added.  
146 - continue  
147 - case <-exitC:  
148 - goto exit  
149 - }  
150 - } else if delta > 0 {  
151 - // At least one item is pending.  
152 - select {  
153 - case <-dq.wakeupC:  
154 - // A new item with an "earlier" expiration than the current "earliest" one is added.  
155 - continue  
156 - case <-time.After(time.Duration(delta) * time.Millisecond):  
157 - // The current "earliest" item expires.  
158 -  
159 - // Reset the sleeping state since there's no need to receive from wakeupC.  
160 - if atomic.SwapInt32(&dq.sleeping, 0) == 0 {  
161 - // A caller of Offer() is being blocked on sending to wakeupC,  
162 - // drain wakeupC to unblock the caller.  
163 - <-dq.wakeupC  
164 - }  
165 - continue  
166 - case <-exitC:  
167 - goto exit  
168 - }  
169 - }  
170 - }  
171 -  
172 - select {  
173 - case dq.C <- item.Value:  
174 - // The expired element has been sent out successfully.  
175 - case <-exitC:  
176 - goto exit  
177 - }  
178 - }  
179 -  
180 -exit:  
181 - // Reset the states  
182 - atomic.StoreInt32(&dq.sleeping, 0)  
183 -}  
src/components/kafkatimer/delayqueue/delayqueue.go deleted
@@ -1,186 +0,0 @@ @@ -1,186 +0,0 @@
1 -package delayqueue  
2 -  
3 -import (  
4 - "container/heap"  
5 - "sync"  
6 - "sync/atomic"  
7 - "time"  
8 -)  
9 -  
10 -// The start of PriorityQueue implementation.  
11 -// Borrowed from https://github.com/nsqio/nsq/blob/master/internal/pqueue/pqueue.go  
12 -  
13 -type item struct {  
14 - Value interface{}  
15 - Priority int64  
16 - Index int  
17 -}  
18 -  
19 -// this is a priority queue as implemented by a min heap  
20 -// ie. the 0th element is the *lowest* value  
21 -type priorityQueue []*item  
22 -  
23 -func newPriorityQueue(capacity int) priorityQueue {  
24 - return make(priorityQueue, 0, capacity)  
25 -}  
26 -  
27 -func (pq priorityQueue) Len() int {  
28 - return len(pq)  
29 -}  
30 -  
31 -func (pq priorityQueue) Less(i, j int) bool {  
32 - return pq[i].Priority < pq[j].Priority  
33 -}  
34 -  
35 -func (pq priorityQueue) Swap(i, j int) {  
36 - pq[i], pq[j] = pq[j], pq[i]  
37 - pq[i].Index = i  
38 - pq[j].Index = j  
39 -}  
40 -  
41 -func (pq *priorityQueue) Push(x interface{}) {  
42 - n := len(*pq)  
43 - c := cap(*pq)  
44 - if n+1 > c {  
45 - npq := make(priorityQueue, n, c*2)  
46 - copy(npq, *pq)  
47 - *pq = npq  
48 - }  
49 - *pq = (*pq)[0 : n+1]  
50 - item := x.(*item)  
51 - item.Index = n  
52 - (*pq)[n] = item  
53 -}  
54 -  
55 -func (pq *priorityQueue) Pop() interface{} {  
56 - n := len(*pq)  
57 - c := cap(*pq)  
58 - if n < (c/2) && c > 25 {  
59 - npq := make(priorityQueue, n, c/2)  
60 - copy(npq, *pq)  
61 - *pq = npq  
62 - }  
63 - item := (*pq)[n-1]  
64 - item.Index = -1  
65 - *pq = (*pq)[0 : n-1]  
66 - return item  
67 -}  
68 -  
69 -func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) {  
70 - if pq.Len() == 0 {  
71 - return nil, 0  
72 - }  
73 -  
74 - item := (*pq)[0]  
75 - if item.Priority > max {  
76 - return nil, item.Priority - max  
77 - }  
78 - heap.Remove(pq, 0)  
79 -  
80 - return item, 0  
81 -}  
82 -  
83 -// The end of PriorityQueue implementation.  
84 -  
85 -// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which  
86 -// an element can only be taken when its delay has expired. The head of the  
87 -// queue is the *Delayed* element whose delay expired furthest in the past.  
88 -type DelayQueue struct {  
89 - C chan interface{}  
90 -  
91 - mu sync.Mutex  
92 - pq priorityQueue  
93 -  
94 - // Similar to the sleeping state of runtime.timers.  
95 - sleeping int32  
96 - wakeupC chan struct{}  
97 -}  
98 -  
99 -// New creates an instance of delayQueue with the specified size.  
100 -func New(size int) *DelayQueue {  
101 - return &DelayQueue{  
102 - C: make(chan interface{}),  
103 - pq: newPriorityQueue(size),  
104 - wakeupC: make(chan struct{}),  
105 - }  
106 -}  
107 -  
108 -// Offer inserts the element into the current queue.  
109 -func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {  
110 - item := &item{Value: elem, Priority: expiration}  
111 -  
112 - dq.mu.Lock()  
113 - heap.Push(&dq.pq, item)  
114 - index := item.Index  
115 - dq.mu.Unlock()  
116 -  
117 - if index == 0 {  
118 - // A new item with the earliest expiration is added.  
119 - if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {  
120 - dq.wakeupC <- struct{}{}  
121 - }  
122 - }  
123 -}  
124 -  
125 -// Poll starts an infinite loop, in which it continually waits for an element  
126 -// to expire and then send the expired element to the channel C.  
127 -func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) {  
128 - for {  
129 - now := nowF()  
130 -  
131 - dq.mu.Lock()  
132 - item, delta := dq.pq.PeekAndShift(now)  
133 - if item == nil {  
134 - // No items left or at least one item is pending.  
135 -  
136 - // We must ensure the atomicity of the whole operation, which is  
137 - // composed of the above PeekAndShift and the following StoreInt32,  
138 - // to avoid possible race conditions between Offer and Poll.  
139 - atomic.StoreInt32(&dq.sleeping, 1)  
140 - }  
141 - dq.mu.Unlock()  
142 -  
143 - if item == nil {  
144 - if delta == 0 {  
145 - // No items left.  
146 - select {  
147 - case <-dq.wakeupC:  
148 - // Wait until a new item is added.  
149 - continue  
150 - case <-exitC:  
151 - goto exit  
152 - }  
153 - } else if delta > 0 {  
154 - // At least one item is pending.  
155 - select {  
156 - case <-dq.wakeupC:  
157 - // A new item with an "earlier" expiration than the current "earliest" one is added.  
158 - continue  
159 - case <-time.After(time.Duration(delta) * time.Millisecond):  
160 - // The current "earliest" item expires.  
161 -  
162 - // Reset the sleeping state since there's no need to receive from wakeupC.  
163 - if atomic.SwapInt32(&dq.sleeping, 0) == 0 {  
164 - // A caller of Offer() is being blocked on sending to wakeupC,  
165 - // drain wakeupC to unblock the caller.  
166 - <-dq.wakeupC  
167 - }  
168 - continue  
169 - case <-exitC:  
170 - goto exit  
171 - }  
172 - }  
173 - }  
174 -  
175 - select {  
176 - case dq.C <- item.Value:  
177 - // The expired element has been sent out successfully.  
178 - case <-exitC:  
179 - goto exit  
180 - }  
181 - }  
182 -  
183 -exit:  
184 - // Reset the states  
185 - atomic.StoreInt32(&dq.sleeping, 0)  
186 -}  
src/components/kafkatimer/timingwheel.go deleted
@@ -1,227 +0,0 @@ @@ -1,227 +0,0 @@
1 -package kafkatimer  
2 -  
3 -import (  
4 - "errors"  
5 - "sync/atomic"  
6 - "time"  
7 - "unsafe"  
8 -  
9 - "github.com/RussellLuo/timingwheel/delayqueue"  
10 -)  
11 -//时间轮 kafka  
12 -  
13 -// TimingWheel is an implementation of Hierarchical Timing Wheels.  
14 -type TimingWheel struct {  
15 - tick int64 // in milliseconds  
16 - wheelSize int64  
17 -  
18 - interval int64 // in milliseconds  
19 - currentTime int64 // in milliseconds  
20 - buckets []*bucket  
21 - queue *delayqueue.DelayQueue  
22 -  
23 - // The higher-level overflow wheel.  
24 - //  
25 - // NOTE: This field may be updated and read concurrently, through Add().  
26 - overflowWheel unsafe.Pointer // type: *TimingWheel  
27 -  
28 - exitC chan struct{}  
29 - waitGroup waitGroupWrapper  
30 -}  
31 -  
32 -// NewTimingWheel creates an instance of TimingWheel with the given tick and wheelSize.  
33 -func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel {  
34 - tickMs := int64(tick / time.Millisecond)  
35 - if tickMs <= 0 {  
36 - panic(errors.New("tick must be greater than or equal to 1ms"))  
37 - }  
38 -  
39 - startMs := timeToMs(time.Now().UTC())  
40 -  
41 - return newTimingWheel(  
42 - tickMs,  
43 - wheelSize,  
44 - startMs,  
45 - delayqueue.New(int(wheelSize)),  
46 - )  
47 -}  
48 -  
49 -// newTimingWheel is an internal helper function that really creates an instance of TimingWheel.  
50 -func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel {  
51 - buckets := make([]*bucket, wheelSize)  
52 - for i := range buckets {  
53 - buckets[i] = newBucket()  
54 - }  
55 - return &TimingWheel{  
56 - tick: tickMs,  
57 - wheelSize: wheelSize,  
58 - currentTime: truncate(startMs, tickMs),  
59 - interval: tickMs * wheelSize,  
60 - buckets: buckets,  
61 - queue: queue,  
62 - exitC: make(chan struct{}),  
63 - }  
64 -}  
65 -  
66 -// add inserts the timer t into the current timing wheel.  
67 -func (tw *TimingWheel) add(t *Timer) bool {  
68 - currentTime := atomic.LoadInt64(&tw.currentTime)  
69 - if t.expiration < currentTime+tw.tick {  
70 - // Already expired  
71 - return false  
72 - } else if t.expiration < currentTime+tw.interval {  
73 - // Put it into its own bucket  
74 - virtualID := t.expiration / tw.tick  
75 - b := tw.buckets[virtualID%tw.wheelSize]  
76 - b.Add(t)  
77 -  
78 - // Set the bucket expiration time  
79 - if b.SetExpiration(virtualID * tw.tick) {  
80 - // The bucket needs to be enqueued since it was an expired bucket.  
81 - // We only need to enqueue the bucket when its expiration time has changed,  
82 - // i.e. the wheel has advanced and this bucket get reused with a new expiration.  
83 - // Any further calls to set the expiration within the same wheel cycle will  
84 - // pass in the same value and hence return false, thus the bucket with the  
85 - // same expiration will not be enqueued multiple times.  
86 - tw.queue.Offer(b, b.Expiration())  
87 - }  
88 -  
89 - return true  
90 - } else {  
91 - // Out of the interval. Put it into the overflow wheel  
92 - overflowWheel := atomic.LoadPointer(&tw.overflowWheel)  
93 - if overflowWheel == nil {  
94 - atomic.CompareAndSwapPointer(  
95 - &tw.overflowWheel,  
96 - nil,  
97 - unsafe.Pointer(newTimingWheel(  
98 - tw.interval,  
99 - tw.wheelSize,  
100 - currentTime,  
101 - tw.queue,  
102 - )),  
103 - )  
104 - overflowWheel = atomic.LoadPointer(&tw.overflowWheel)  
105 - }  
106 - return (*TimingWheel)(overflowWheel).add(t)  
107 - }  
108 -}  
109 -  
110 -// addOrRun inserts the timer t into the current timing wheel, or run the  
111 -// timer's task if it has already expired.  
112 -func (tw *TimingWheel) addOrRun(t *Timer) {  
113 - if !tw.add(t) {  
114 - // Already expired  
115 -  
116 - // Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc),  
117 - // always execute the timer's task in its own goroutine.  
118 - go t.task()  
119 - }  
120 -}  
121 -  
122 -func (tw *TimingWheel) advanceClock(expiration int64) {  
123 - currentTime := atomic.LoadInt64(&tw.currentTime)  
124 - if expiration >= currentTime+tw.tick {  
125 - currentTime = truncate(expiration, tw.tick)  
126 - atomic.StoreInt64(&tw.currentTime, currentTime)  
127 -  
128 - // Try to advance the clock of the overflow wheel if present  
129 - overflowWheel := atomic.LoadPointer(&tw.overflowWheel)  
130 - if overflowWheel != nil {  
131 - (*TimingWheel)(overflowWheel).advanceClock(currentTime)  
132 - }  
133 - }  
134 -}  
135 -  
136 -// Start starts the current timing wheel.  
137 -func (tw *TimingWheel) Start() {  
138 - tw.waitGroup.Wrap(func() {  
139 - tw.queue.Poll(tw.exitC, func() int64 {  
140 - return timeToMs(time.Now().UTC())  
141 - })  
142 - })  
143 -  
144 - tw.waitGroup.Wrap(func() {  
145 - for {  
146 - select {  
147 - case elem := <-tw.queue.C:  
148 - b := elem.(*bucket)  
149 - tw.advanceClock(b.Expiration())  
150 - b.Flush(tw.addOrRun)  
151 - case <-tw.exitC:  
152 - return  
153 - }  
154 - }  
155 - })  
156 -}  
157 -  
158 -// Stop stops the current timing wheel.  
159 -//  
160 -// If there is any timer's task being running in its own goroutine, Stop does  
161 -// not wait for the task to complete before returning. If the caller needs to  
162 -// know whether the task is completed, it must coordinate with the task explicitly.  
163 -func (tw *TimingWheel) Stop() {  
164 - close(tw.exitC)  
165 - tw.waitGroup.Wait()  
166 -}  
167 -  
168 -// AfterFunc waits for the duration to elapse and then calls f in its own goroutine.  
169 -// It returns a Timer that can be used to cancel the call using its Stop method.  
170 -func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {  
171 - t := &Timer{  
172 - expiration: timeToMs(time.Now().UTC().Add(d)),  
173 - task: f,  
174 - }  
175 - tw.addOrRun(t)  
176 - return t  
177 -}  
178 -  
179 -// Scheduler determines the execution plan of a task.  
180 -type Scheduler interface {  
181 - // Next returns the next execution time after the given (previous) time.  
182 - // It will return a zero time if no next time is scheduled.  
183 - //  
184 - // All times must be UTC.  
185 - Next(time.Time) time.Time  
186 -}  
187 -  
188 -// ScheduleFunc calls f (in its own goroutine) according to the execution  
189 -// plan scheduled by s. It returns a Timer that can be used to cancel the  
190 -// call using its Stop method.  
191 -//  
192 -// If the caller want to terminate the execution plan halfway, it must  
193 -// stop the timer and ensure that the timer is stopped actually, since in  
194 -// the current implementation, there is a gap between the expiring and the  
195 -// restarting of the timer. The wait time for ensuring is short since the  
196 -// gap is very small.  
197 -//  
198 -// Internally, ScheduleFunc will ask the first execution time (by calling  
199 -// s.Next()) initially, and create a timer if the execution time is non-zero.  
200 -// Afterwards, it will ask the next execution time each time f is about to  
201 -// be executed, and f will be called at the next execution time if the time  
202 -// is non-zero.  
203 -func (tw *TimingWheel) ScheduleFunc(s Scheduler, f func()) (t *Timer) {  
204 - expiration := s.Next(time.Now().UTC())  
205 - if expiration.IsZero() {  
206 - // No time is scheduled, return nil.  
207 - return  
208 - }  
209 -  
210 - t = &Timer{  
211 - expiration: timeToMs(expiration),  
212 - task: func() {  
213 - // Schedule the task to execute at the next time if possible.  
214 - expiration := s.Next(msToTime(t.expiration))  
215 - if !expiration.IsZero() {  
216 - t.expiration = timeToMs(expiration)  
217 - tw.addOrRun(t)  
218 - }  
219 -  
220 - // Actually execute the task.  
221 - f()  
222 - },  
223 - }  
224 - tw.addOrRun(t)  
225 -  
226 - return  
227 -}  
src/components/kafkatimer/timingwheel_test.go deleted
@@ -1,26 +0,0 @@ @@ -1,26 +0,0 @@
1 -package kafkatimer  
2 -  
3 -import (  
4 - "fmt"  
5 - "testing"  
6 - "time"  
7 -)  
8 -  
9 -func F() {  
10 - fmt.Println("I'm timer...")  
11 -}  
12 -  
13 -  
14 -func TestTimingWheel_Start(t *testing.T) {  
15 - tw := NewTimingWheel(time.Millisecond, 20)  
16 - tw.Start()  
17 - defer tw.Stop()  
18 -  
19 - exitC := make(chan time.Time, 1)  
20 - tw.AfterFunc(10 * time.Second, func() {  
21 - fmt.Println("The timer fires")  
22 - exitC <- time.Now().UTC()  
23 - })  
24 -  
25 - fmt.Println(<-exitC)  
26 -}  
27 \ No newline at end of file 0 \ No newline at end of file
src/components/kafkatimer/utils.go deleted
@@ -1,38 +0,0 @@ @@ -1,38 +0,0 @@
1 -package kafkatimer  
2 -  
3 -import (  
4 - "sync"  
5 - "time"  
6 -)  
7 -  
8 -// truncate returns the result of rounding x toward zero to a multiple of m.  
9 -// If m <= 0, Truncate returns x unchanged.  
10 -func truncate(x, m int64) int64 {  
11 - if m <= 0 {  
12 - return x  
13 - }  
14 - return x - x%m  
15 -}  
16 -  
17 -// timeToMs returns an integer number, which represents t in milliseconds.  
18 -func timeToMs(t time.Time) int64 {  
19 - return t.UnixNano() / int64(time.Millisecond)  
20 -}  
21 -  
22 -// msToTime returns the UTC time corresponding to the given Unix time,  
23 -// t milliseconds since January 1, 1970 UTC.  
24 -func msToTime(t int64) time.Time {  
25 - return time.Unix(0, t*int64(time.Millisecond)).UTC()  
26 -}  
27 -  
28 -type waitGroupWrapper struct {  
29 - sync.WaitGroup  
30 -}  
31 -  
32 -func (w *waitGroupWrapper) Wrap(cb func()) {  
33 - w.Add(1)  
34 - go func() {  
35 - cb()  
36 - w.Done()  
37 - }()  
38 -}  
src/components/net/conn.go
@@ -3,9 +3,12 @@ package net @@ -3,9 +3,12 @@ package net
3 import ( 3 import (
4 "bufio" 4 "bufio"
5 "fmt" 5 "fmt"
  6 + "math"
6 "net" 7 "net"
7 "pro2d/src/common" 8 "pro2d/src/common"
8 "pro2d/src/components/logger" 9 "pro2d/src/components/logger"
  10 + "pro2d/src/utils"
  11 + "sync/atomic"
9 ) 12 )
10 13
11 type Head struct { 14 type Head struct {
@@ -18,16 +21,20 @@ type Head struct { @@ -18,16 +21,20 @@ type Head struct {
18 21
19 type Connection struct { 22 type Connection struct {
20 net.Conn 23 net.Conn
21 - Id int  
22 - Server *Server 24 + Id int
  25 + Server *Server
23 26
24 - scanner *bufio.Scanner  
25 - writer *bufio.Writer 27 + scanner *bufio.Scanner
  28 + writer *bufio.Writer
26 29
27 - WBuffer chan []byte  
28 - RBuffer chan *MsgPkg 30 + WBuffer chan []byte
  31 + RBuffer chan *MsgPkg
29 32
30 - Quit chan *Connection 33 + Quit chan *Connection
  34 +
  35 + nextCheckTime int64 //下一次检查的时间
  36 + lastHeartCheckTime int64 //最后收消息时间
  37 + heartTimeoutCount int //超时次数
31 } 38 }
32 39
33 type MsgPkg struct { 40 type MsgPkg struct {
@@ -42,11 +49,13 @@ func NewConn(id int, conn net.Conn, s *Server) *Connection { @@ -42,11 +49,13 @@ func NewConn(id int, conn net.Conn, s *Server) *Connection {
42 Conn: conn, 49 Conn: conn,
43 Server: s, 50 Server: s,
44 51
45 - scanner: bufio.NewScanner(conn),  
46 - writer: bufio.NewWriter(conn),  
47 - WBuffer: make(chan []byte),  
48 - RBuffer: make(chan *MsgPkg),  
49 - Quit: make(chan *Connection), 52 + scanner: bufio.NewScanner(conn),
  53 + writer: bufio.NewWriter(conn),
  54 + WBuffer: make(chan []byte),
  55 + RBuffer: make(chan *MsgPkg),
  56 + Quit: make(chan *Connection),
  57 + lastHeartCheckTime: utils.Timex(),
  58 + heartTimeoutCount: 0,
50 } 59 }
51 } 60 }
52 61
@@ -63,7 +72,6 @@ func (c *Connection) write() { @@ -63,7 +72,6 @@ func (c *Connection) write() {
63 logger.Error("write Flush fail err: " + err.Error()) 72 logger.Error("write Flush fail err: " + err.Error())
64 return 73 return
65 } 74 }
66 - logger.Debug("write :%s, n: %d", msg, n)  
67 } 75 }
68 } 76 }
69 77
@@ -83,6 +91,8 @@ func (c *Connection) read() { @@ -83,6 +91,8 @@ func (c *Connection) read() {
83 //将请求消息发送给任务队列 91 //将请求消息发送给任务队列
84 c.Server.TaskQueue[workerID] <- req 92 c.Server.TaskQueue[workerID] <- req
85 93
  94 + atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex())
  95 +
86 //备注,可以在当前协程处理当条请求(如下, 实现很简单,已经删除),也可以丢到协程池里处理任务(如上),还未对比效果。 96 //备注,可以在当前协程处理当条请求(如下, 实现很简单,已经删除),也可以丢到协程池里处理任务(如上),还未对比效果。
87 //c.Server.OnRecv(req) 97 //c.Server.OnRecv(req)
88 } 98 }
@@ -94,6 +104,41 @@ func (c *Connection) read() { @@ -94,6 +104,41 @@ func (c *Connection) read() {
94 } 104 }
95 } 105 }
96 106
  107 +
  108 +func (c *Connection) checkHeartBeat(now int64) {
  109 + lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime)
  110 + logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.Id, lastHeartCheckTime, now)
  111 + if math.Abs(float64(lastHeartCheckTime - now)) > common.HEART_TIMER_INTERVAL {
  112 + c.heartTimeoutCount++
  113 + if c.heartTimeoutCount >= common.HEART_TIMEOUT_COUNT_MAX {
  114 + c.Quiting()
  115 + return
  116 + }
  117 + logger.Debug("timeout count: %d", c.heartTimeoutCount)
  118 + }else {
  119 + c.heartTimeoutCount = 0
  120 + }
  121 +}
  122 +
  123 +func (c *Connection) update() {
  124 + nextCheckTime := atomic.LoadInt64(&c.nextCheckTime)
  125 + now := utils.Timex()
  126 + if now >= nextCheckTime {
  127 + c.checkHeartBeat(now)
  128 + nextCheckTime = now + common.HEART_TIMER_INTERVAL
  129 + atomic.StoreInt64(&c.nextCheckTime, nextCheckTime)
  130 + }
  131 +}
  132 +//
  133 +//func (c *Connection) SetLastHeartCheckTime() {
  134 +// now := utils.Timex()
  135 +// lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime)
  136 +// if now - lastHeartCheckTime < common.HEART_TIMER_INTERVAL {
  137 +// logger.Debug("heart too quick")
  138 +// }
  139 +// atomic.StoreInt64(&c.lastHeartCheckTime, now)
  140 +//}
  141 +
97 func (c *Connection) Start() { 142 func (c *Connection) Start() {
98 go c.write() 143 go c.write()
99 go c.read() 144 go c.read()
@@ -109,6 +154,7 @@ func (c *Connection) Stop() { @@ -109,6 +154,7 @@ func (c *Connection) Stop() {
109 } 154 }
110 155
111 func (c *Connection) Quiting() { 156 func (c *Connection) Quiting() {
  157 + logger.Debug("ID: %d close", c.Id)
112 c.Server.OnClose(c) 158 c.Server.OnClose(c)
113 } 159 }
114 160
src/components/net/server.go
@@ -11,12 +11,15 @@ import ( @@ -11,12 +11,15 @@ import (
11 "pro2d/src/components/db" 11 "pro2d/src/components/db"
12 "pro2d/src/components/etcd" 12 "pro2d/src/components/etcd"
13 "pro2d/src/components/logger" 13 "pro2d/src/components/logger"
  14 + "pro2d/src/components/timewheel"
14 "pro2d/src/models" 15 "pro2d/src/models"
15 "sync" 16 "sync"
  17 + "time"
16 ) 18 )
17 19
18 type ActionHandler func (msg *MsgPkg) (int32, proto.Message) 20 type ActionHandler func (msg *MsgPkg) (int32, proto.Message)
19 var ActionMap map[pb.ProtoCode]ActionHandler 21 var ActionMap map[pb.ProtoCode]ActionHandler
  22 +var TimeWheel *timewheel.TimeWheel
20 23
21 type Server struct { 24 type Server struct {
22 SConf *conf.SConf 25 SConf *conf.SConf
@@ -36,10 +39,6 @@ func NewServer(sConf *conf.SConf) *Server { @@ -36,10 +39,6 @@ func NewServer(sConf *conf.SConf) *Server {
36 } 39 }
37 } 40 }
38 41
39 -func (s *Server) StartTimer() {  
40 -  
41 -}  
42 -  
43 //StartWorkerPool 启动worker工作池 42 //StartWorkerPool 启动worker工作池
44 func (s *Server) StartWorkerPool() { 43 func (s *Server) StartWorkerPool() {
45 //遍历需要启动worker的数量,依此启动 44 //遍历需要启动worker的数量,依此启动
@@ -96,6 +95,16 @@ func (s *Server) LoadPlugin() { @@ -96,6 +95,16 @@ func (s *Server) LoadPlugin() {
96 logger.Debug("load plugin success") 95 logger.Debug("load plugin success")
97 } 96 }
98 97
  98 +func (s *Server) handleTimeOut() {
  99 + s.Clients.Range(func(key, value interface{}) bool {
  100 + client := value.(*Connection)
  101 + client.update()
  102 + return true
  103 + })
  104 +
  105 + TimeWheel.AfterFunc(1*time.Second, s.handleTimeOut)
  106 +}
  107 +
99 func (s *Server)Start() error { 108 func (s *Server)Start() error {
100 //mongo 初始化 109 //mongo 初始化
101 db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.GameConf.DBName) 110 db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.GameConf.DBName)
@@ -110,10 +119,10 @@ func (s *Server)Start() error { @@ -110,10 +119,10 @@ func (s *Server)Start() error {
110 s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5) 119 s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5)
111 120
112 //初始化plugin 121 //初始化plugin
113 - _, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath)  
114 - if err != nil {  
115 - return err  
116 - } 122 + //_, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath)
  123 + //if err != nil {
  124 + // return err
  125 + //}
117 126
118 port := fmt.Sprintf(":%d", s.SConf.Port) 127 port := fmt.Sprintf(":%d", s.SConf.Port)
119 l, err := net.Listen("tcp", port) 128 l, err := net.Listen("tcp", port)
@@ -121,8 +130,15 @@ func (s *Server)Start() error { @@ -121,8 +130,15 @@ func (s *Server)Start() error {
121 return err 130 return err
122 } 131 }
123 132
  133 + //启动协程池
124 s.StartWorkerPool() 134 s.StartWorkerPool()
125 135
  136 + //启动定时器
  137 + TimeWheel = timewheel.NewTimeWheel(common.TickMS * time.Millisecond, common.WheelSize)
  138 + TimeWheel.Start()
  139 + s.handleTimeOut()
  140 +
  141 + //监听端口
126 logger.Debug("listen on %s\n", port) 142 logger.Debug("listen on %s\n", port)
127 id := 0 143 id := 0
128 for { 144 for {
@@ -139,5 +155,5 @@ func (s *Server)Start() error { @@ -139,5 +155,5 @@ func (s *Server)Start() error {
139 } 155 }
140 156
141 func (s *Server)Stop() { 157 func (s *Server)Stop() {
142 -}  
143 - 158 + TimeWheel.Stop()
  159 +}
144 \ No newline at end of file 160 \ No newline at end of file
src/components/skynettimer/timerwheel.go deleted
@@ -1 +0,0 @@ @@ -1 +0,0 @@
1 -package skynettimer  
src/components/kafkatimer/bucket.go renamed to src/components/timewheel/bucket.go
1 -package kafkatimer 1 +package timewheel
2 2
3 import ( 3 import (
4 "container/list" 4 "container/list"
@@ -6,97 +6,52 @@ import ( @@ -6,97 +6,52 @@ import (
6 "sync/atomic" 6 "sync/atomic"
7 "unsafe" 7 "unsafe"
8 ) 8 )
9 -  
10 -// Timer represents a single event. When the Timer expires, the given  
11 -// task will be executed.  
12 -type Timer struct {  
13 - expiration int64 // in milliseconds  
14 - task func()  
15 -  
16 - // The bucket that holds the list to which this timer's element belongs.  
17 - //  
18 - // NOTE: This field may be updated and read concurrently,  
19 - // through Timer.Stop() and Bucket.Flush().  
20 - b unsafe.Pointer // type: *bucket  
21 -  
22 - // The timer's element.  
23 - element *list.Element  
24 -}  
25 -  
26 -func (t *Timer) getBucket() *bucket {  
27 - return (*bucket)(atomic.LoadPointer(&t.b))  
28 -}  
29 -  
30 -func (t *Timer) setBucket(b *bucket) {  
31 - atomic.StorePointer(&t.b, unsafe.Pointer(b))  
32 -}  
33 -  
34 -// Stop prevents the Timer from firing. It returns true if the call  
35 -// stops the timer, false if the timer has already expired or been stopped.  
36 -//  
37 -// If the timer t has already expired and the t.task has been started in its own  
38 -// goroutine; Stop does not wait for t.task to complete before returning. If the caller  
39 -// needs to know whether t.task is completed, it must coordinate with t.task explicitly.  
40 -func (t *Timer) Stop() bool {  
41 - stopped := false  
42 - for b := t.getBucket(); b != nil; b = t.getBucket() {  
43 - // If b.Remove is called just after the timing wheel's goroutine has:  
44 - // 1. removed t from b (through b.Flush -> b.remove)  
45 - // 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)  
46 - // this may fail to remove t due to the change of t's bucket.  
47 - stopped = b.Remove(t)  
48 -  
49 - // Thus, here we re-get t's possibly new bucket (nil for case 1, or ab (non-nil) for case 2),  
50 - // and retry until the bucket becomes nil, which indicates that t has finally been removed.  
51 - }  
52 - return stopped  
53 -}  
54 -  
55 type bucket struct { 9 type bucket struct {
56 - // 64-bit atomic operations require 64-bit alignment, but 32-bit  
57 - // compilers do not ensure it. So we must keep the 64-bit field  
58 - // as the first field of the struct.  
59 - //  
60 - // For more explanations, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG  
61 - // and https://go101.org/article/memory-layout.html. 10 + //过期时间
62 expiration int64 11 expiration int64
63 12
64 - mu sync.Mutex 13 + mu sync.Mutex
  14 + //相同过期时间的任务队列
65 timers *list.List 15 timers *list.List
66 } 16 }
67 17
68 -func newBucket() *bucket { 18 +func newBucket() *bucket {
69 return &bucket{ 19 return &bucket{
70 - timers: list.New(),  
71 expiration: -1, 20 expiration: -1,
  21 + mu: sync.Mutex{},
  22 + timers: list.New(),
72 } 23 }
73 } 24 }
74 25
75 -func (b *bucket) Expiration() int64 {  
76 - return atomic.LoadInt64(&b.expiration)  
77 -}  
78 -  
79 -func (b *bucket) SetExpiration(expiration int64) bool {  
80 - return atomic.SwapInt64(&b.expiration, expiration) != expiration 26 +func (b *bucket) SetExpiration(expiration int64) {
  27 + atomic.AddInt64(&b.expiration, expiration)
81 } 28 }
82 29
83 func (b *bucket) Add(t *Timer) { 30 func (b *bucket) Add(t *Timer) {
84 b.mu.Lock() 31 b.mu.Lock()
  32 + defer b.mu.Unlock()
85 33
86 e := b.timers.PushBack(t) 34 e := b.timers.PushBack(t)
87 t.setBucket(b) 35 t.setBucket(b)
88 t.element = e 36 t.element = e
  37 +}
  38 +
  39 +func (b *bucket) Flush(reinsert func(*Timer)) {
  40 + b.mu.Lock()
  41 + defer b.mu.Unlock()
  42 +
  43 + for e := b.timers.Front(); e != nil; {
  44 + next := e.Next()
  45 + t := e.Value.(*Timer)
  46 + b.remove(t)
89 47
90 - b.mu.Unlock() 48 + reinsert(t)
  49 + e = next
  50 + }
91 } 51 }
92 52
93 func (b *bucket) remove(t *Timer) bool { 53 func (b *bucket) remove(t *Timer) bool {
94 if t.getBucket() != b { 54 if t.getBucket() != b {
95 - // If remove is called from within t.Stop, and this happens just after the timing wheel's goroutine has:  
96 - // 1. removed t from b (through b.Flush -> b.remove)  
97 - // 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)  
98 - // then t.getBucket will return nil for case 1, or ab (non-nil) for case 2.  
99 - // In either case, the returned value does not equal to b.  
100 return false 55 return false
101 } 56 }
102 b.timers.Remove(t.element) 57 b.timers.Remove(t.element)
@@ -108,26 +63,23 @@ func (b *bucket) remove(t *Timer) bool { @@ -108,26 +63,23 @@ func (b *bucket) remove(t *Timer) bool {
108 func (b *bucket) Remove(t *Timer) bool { 63 func (b *bucket) Remove(t *Timer) bool {
109 b.mu.Lock() 64 b.mu.Lock()
110 defer b.mu.Unlock() 65 defer b.mu.Unlock()
  66 +
111 return b.remove(t) 67 return b.remove(t)
112 } 68 }
113 69
114 -func (b *bucket) Flush(reinsert func(*Timer)) {  
115 - b.mu.Lock()  
116 - defer b.mu.Unlock()  
117 -  
118 - for e := b.timers.Front(); e != nil; {  
119 - next := e.Next()  
120 -  
121 - t := e.Value.(*Timer)  
122 - b.remove(t)  
123 - // Note that this operation will either execute the timer's task, or  
124 - // insert the timer into another bucket belonging to a lower-level wheel.  
125 - //  
126 - // In either case, no further lock operation will happen to b.mu.  
127 - reinsert(t) 70 +type Timer struct {
  71 + expiration int64
  72 + //要被执行的任务
  73 + task func()
128 74
129 - e = next  
130 - } 75 + b unsafe.Pointer
  76 + element *list.Element
  77 +}
131 78
132 - b.SetExpiration(-1) 79 +func (t *Timer) setBucket(b *bucket) {
  80 + atomic.StorePointer(&t.b, unsafe.Pointer(b))
133 } 81 }
  82 +
  83 +func (t *Timer) getBucket() *bucket {
  84 + return (*bucket)(atomic.LoadPointer(&t.b))
  85 +}
134 \ No newline at end of file 86 \ No newline at end of file
src/components/timewheel/timewheel.go 0 → 100644
@@ -0,0 +1,165 @@ @@ -0,0 +1,165 @@
  1 +package timewheel
  2 +
  3 +import (
  4 + "pro2d/src/common"
  5 + "pro2d/src/components/workpool"
  6 + "sync/atomic"
  7 + "time"
  8 + "unsafe"
  9 +)
  10 +
  11 +type TimeWheel struct {
  12 + ticker *time.Ticker
  13 + tickMs int64 //一滴答的时间 1ms 可以自定义 我们这里选择使用1ms
  14 + wheelSize int64
  15 + startMs int64 //开始时间 in millisecond
  16 + endMs int64
  17 + wheelTime int64 //跑完一圈所需时间
  18 + level int64 //层级
  19 +
  20 + //时间刻度 列表
  21 + bucket []*bucket
  22 + currentTime int64 //当前时间 in millisecond
  23 + prevflowWheel unsafe.Pointer // type: *TimingWheel
  24 + overflowWheel unsafe.Pointer // type: *TimingWheel
  25 + exitC chan struct{}
  26 +
  27 + WorkPool *workpool.WorkPool
  28 +}
  29 +
  30 +func NewTimeWheel(tick time.Duration, wheelSize int64) *TimeWheel {
  31 + //转化为毫秒
  32 + tickMs := int64(tick / time.Millisecond)
  33 + //如果小于零
  34 + if tickMs <=0 {
  35 + panic("tick must be greater than or equal to 1 ms")
  36 + }
  37 +
  38 + startMs := time.Now().UnixMilli() //ms
  39 +
  40 + workpool := workpool.NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker)
  41 + return newTimingWheel(tickMs, wheelSize, startMs, 0, nil, workpool)
  42 +}
  43 +
  44 +func newTimingWheel(tick, wheelSize int64, start, level int64, prev *TimeWheel, pool *workpool.WorkPool) *TimeWheel {
  45 + buckets := make([]*bucket, wheelSize)
  46 + for i := range buckets {
  47 + buckets[i] = newBucket()
  48 + }
  49 +
  50 + return &TimeWheel{
  51 + tickMs: tick,
  52 + wheelSize: wheelSize,
  53 + startMs: start,
  54 + endMs: wheelSize * tick + start,
  55 + wheelTime: wheelSize * tick,
  56 + bucket: buckets,
  57 + currentTime: truncate(start, tick),
  58 + exitC: make(chan struct{}),
  59 + WorkPool: pool,
  60 +
  61 + prevflowWheel: unsafe.Pointer(prev),
  62 + level: level,
  63 + }
  64 +}
  65 +
  66 +func truncate(dst, m int64) int64 {
  67 + return dst - dst%m
  68 +}
  69 +
  70 +func (tw *TimeWheel) add(t *Timer) bool {
  71 + currentTime := atomic.LoadInt64(&tw.currentTime)
  72 + if t.expiration < currentTime + tw.tickMs {
  73 + return false
  74 + }else if t.expiration < currentTime + tw.wheelTime {
  75 + virtualID := t.expiration / tw.tickMs //需要多少滴答数
  76 + b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize
  77 + b.Add(t)
  78 +
  79 + b.SetExpiration(virtualID * tw.tickMs)
  80 + }else {
  81 + overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
  82 + if overflowWheel == nil {
  83 + level := atomic.LoadInt64(&tw.level) + 1
  84 + atomic.CompareAndSwapPointer(
  85 + &tw.overflowWheel,
  86 + nil,
  87 + unsafe.Pointer(newTimingWheel(tw.wheelTime, tw.wheelSize, currentTime, level, tw , tw.WorkPool)),
  88 + )
  89 + overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
  90 + }
  91 + //递归添加到下一级定时器中
  92 + (*TimeWheel)(overflowWheel).add(t)
  93 + }
  94 +
  95 + return true
  96 +}
  97 +
  98 +func (tw *TimeWheel) addOrRun(t *Timer) {
  99 + if !tw.add(t) {
  100 + workerID := t.expiration % tw.WorkPool.WorkerPoolSize
  101 + //将请求消息发送给任务队列
  102 + tw.WorkPool.TaskQueue[workerID] <- t.task
  103 + }
  104 +}
  105 +
  106 +//拨动时钟
  107 +func (tw *TimeWheel) advanceClock(expiration int64) {
  108 + level := atomic.LoadInt64(&tw.level)
  109 + currentTime := truncate(expiration, tw.tickMs)
  110 + atomic.StoreInt64(&tw.currentTime, currentTime)
  111 +
  112 + if level == 0 {
  113 + virtualID := expiration / tw.tickMs //需要多少滴答数
  114 + b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize
  115 + b.Flush(tw.addOrRun)
  116 + } else {
  117 + prevflowWheel := atomic.LoadPointer(&tw.prevflowWheel)
  118 + if prevflowWheel != nil {
  119 + virtualID := expiration / tw.tickMs //需要多少滴答数
  120 + b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize
  121 + b.Flush((*TimeWheel)(prevflowWheel).addOrRun)
  122 + }
  123 + }
  124 +
  125 + //如果基础的时钟指针转完了一圈,则递归拨动下一级时钟
  126 + if currentTime >= tw.endMs {
  127 + atomic.StoreInt64(&tw.startMs, currentTime)
  128 + atomic.StoreInt64(&tw.endMs, currentTime + tw.wheelTime)
  129 +
  130 + overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
  131 + if overflowWheel != nil {
  132 + (*TimeWheel)(overflowWheel).advanceClock(currentTime)
  133 + }
  134 + }
  135 +}
  136 +
  137 +
  138 +func (tw *TimeWheel) AfterFunc(d time.Duration, f func()) *Timer {
  139 + t := &Timer{
  140 + expiration: time.Now().UTC().Add(d).UnixMilli(),
  141 + task: f,
  142 + }
  143 + tw.addOrRun(t)
  144 + return t
  145 +}
  146 +
  147 +func (tw *TimeWheel) Start() {
  148 + tw.ticker = time.NewTicker(time.Duration(tw.tickMs) * time.Millisecond)
  149 + tw.WorkPool.StartWorkerPool()
  150 +
  151 + go func() {
  152 + for {
  153 + select {
  154 + case t := <- tw.ticker.C:
  155 + tw.advanceClock(t.UnixMilli())
  156 + case <- tw.exitC:
  157 + return
  158 + }
  159 + }
  160 + }()
  161 +}
  162 +
  163 +func (tw *TimeWheel) Stop() {
  164 + tw.exitC <- struct{}{}
  165 +}
0 \ No newline at end of file 166 \ No newline at end of file
src/components/timewheel/timewheel_test.go 0 → 100644
@@ -0,0 +1,30 @@ @@ -0,0 +1,30 @@
  1 +package timewheel
  2 +
  3 +import (
  4 + "fmt"
  5 + "testing"
  6 + "time"
  7 +)
  8 +var tw *TimeWheel
  9 +
  10 +func Add() {
  11 + fmt.Println("ADD : 123456")
  12 + tw.AfterFunc(6*time.Second, Add)
  13 +}
  14 +
  15 +func Add1() {
  16 + fmt.Println("GET : 78901112")
  17 + tw.AfterFunc(9*time.Second, Add1)
  18 +}
  19 +
  20 +func TestTimeWheel_AfterFunc(t *testing.T) {
  21 +
  22 + tw = NewTimeWheel(time.Second, 5)
  23 + tw.Start()
  24 + defer tw.Stop()
  25 +
  26 +
  27 + Add()
  28 + Add1()
  29 + time.Sleep(time.Second * 200)
  30 +}
src/components/workpool/workpool.go 0 → 100644
@@ -0,0 +1,41 @@ @@ -0,0 +1,41 @@
  1 +package workpool
  2 +
  3 +type Job func()
  4 +
  5 +type WorkPool struct {
  6 + WorkerPoolSize int64
  7 + MaxTaskPerWorker int64
  8 + TaskQueue []chan Job
  9 +}
  10 +
  11 +func NewWorkPool(poolSize, maxTaskSize int64) *WorkPool {
  12 + return &WorkPool{
  13 + WorkerPoolSize: poolSize,
  14 + MaxTaskPerWorker: maxTaskSize,
  15 + TaskQueue: make([]chan Job, poolSize),
  16 + }
  17 +}
  18 +
  19 +//StartOneWorker 启动一个Worker工作流程
  20 +func (wp *WorkPool) StartOneWorker(workerID int, taskQueue chan Job) {
  21 + //不断的等待队列中的消息
  22 + for {
  23 + select {
  24 + //有消息则取出队列的Request,并执行绑定的业务方法
  25 + case job := <-taskQueue:
  26 + _ = workerID
  27 + job()
  28 + }
  29 + }
  30 +}
  31 +
  32 +func (wp *WorkPool) StartWorkerPool() {
  33 + //遍历需要启动worker的数量,依此启动
  34 + for i := 0; i < int(wp.WorkerPoolSize); i++ {
  35 + //一个worker被启动
  36 + //给当前worker对应的任务队列开辟空间
  37 + wp.TaskQueue[i] = make(chan Job, wp.MaxTaskPerWorker)
  38 + //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
  39 + go wp.StartOneWorker(i, wp.TaskQueue[i])
  40 + }
  41 +}
0 \ No newline at end of file 42 \ No newline at end of file
src/plugin/RolePlugin.go
1 -package main 1 +package plugin
2 2
3 import ( 3 import (
4 "github.com/golang/protobuf/proto" 4 "github.com/golang/protobuf/proto"
@@ -10,6 +10,7 @@ import ( @@ -10,6 +10,7 @@ import (
10 ) 10 )
11 11
12 func HeartRpc(msg *net.MsgPkg) (int32, proto.Message) { 12 func HeartRpc(msg *net.MsgPkg) (int32, proto.Message) {
  13 + //msg.Conn.SetLastHeartCheckTime()
13 return 0, nil 14 return 0, nil
14 } 15 }
15 16
src/plugin/protocode.go
1 -package main 1 +package plugin
2 2
3 import ( 3 import (
4 "pro2d/protos/pb" 4 "pro2d/protos/pb"
src/utils/utils.go
@@ -55,6 +55,6 @@ func Md5V(str string) string { @@ -55,6 +55,6 @@ func Md5V(str string) string {
55 return hex.EncodeToString(h.Sum(nil)) 55 return hex.EncodeToString(h.Sum(nil))
56 } 56 }
57 57
58 -func Timex() int64 {  
59 - return time.Now().UnixMicro() 58 +func Timex() int64 {
  59 + return time.Now().Unix()
60 } 60 }
61 \ No newline at end of file 61 \ No newline at end of file
@@ -16,7 +16,7 @@ func main() { @@ -16,7 +16,7 @@ func main() {
16 16
17 head := &net2.Head{ 17 head := &net2.Head{
18 Length: 0, 18 Length: 0,
19 - Cmd: int32(pb.ProtoCode_LoginRpc), 19 + Cmd: int32(pb.ProtoCode_LoginReq),
20 ErrCode: 0, 20 ErrCode: 0,
21 PreField: 0, 21 PreField: 0,
22 } 22 }