Commit 98b0736dfc80fe77acc419f692ca248ef57d288d
1 parent
62d5d847
添加定时器, 检查心跳
Showing
20 changed files
with
933 additions
and
25 deletions
Show diff stats
@@ -3,12 +3,14 @@ module pro2d | @@ -3,12 +3,14 @@ module pro2d | ||
3 | go 1.17 | 3 | go 1.17 |
4 | 4 | ||
5 | require ( | 5 | require ( |
6 | + github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 | ||
6 | github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 | 7 | github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 |
7 | github.com/dgrijalva/jwt-go v3.2.0+incompatible | 8 | github.com/dgrijalva/jwt-go v3.2.0+incompatible |
8 | github.com/garyburd/redigo v1.6.3 | 9 | github.com/garyburd/redigo v1.6.3 |
9 | github.com/gin-gonic/gin v1.7.7 | 10 | github.com/gin-gonic/gin v1.7.7 |
10 | github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b | 11 | github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b |
11 | github.com/golang/protobuf v1.5.2 | 12 | github.com/golang/protobuf v1.5.2 |
13 | + github.com/ouqiang/timewheel v1.0.1 | ||
12 | go.etcd.io/etcd/api/v3 v3.5.2 | 14 | go.etcd.io/etcd/api/v3 v3.5.2 |
13 | go.etcd.io/etcd/client/v3 v3.5.2 | 15 | go.etcd.io/etcd/client/v3 v3.5.2 |
14 | go.mongodb.org/mongo-driver v1.8.3 | 16 | go.mongodb.org/mongo-driver v1.8.3 |
@@ -35,6 +37,7 @@ require ( | @@ -35,6 +37,7 @@ require ( | ||
35 | github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | 37 | github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect |
36 | github.com/modern-go/reflect2 v1.0.1 // indirect | 38 | github.com/modern-go/reflect2 v1.0.1 // indirect |
37 | github.com/pkg/errors v0.9.1 // indirect | 39 | github.com/pkg/errors v0.9.1 // indirect |
40 | + github.com/robfig/cron v1.2.0 // indirect | ||
38 | github.com/ugorji/go/codec v1.1.7 // indirect | 41 | github.com/ugorji/go/codec v1.1.7 // indirect |
39 | github.com/xdg-go/pbkdf2 v1.0.0 // indirect | 42 | github.com/xdg-go/pbkdf2 v1.0.0 // indirect |
40 | github.com/xdg-go/scram v1.0.2 // indirect | 43 | github.com/xdg-go/scram v1.0.2 // indirect |
@@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT | @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT | ||
2 | cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= | 2 | cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= |
3 | github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= | 3 | github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= |
4 | github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= | 4 | github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= |
5 | +github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7oFtbDZXC4dnY093M1kZx6k/95sen92gafbY= | ||
6 | +github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10= | ||
5 | github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= | 7 | github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= |
6 | github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= | 8 | github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= |
7 | github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= | 9 | github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= |
@@ -128,6 +130,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb | @@ -128,6 +130,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb | ||
128 | github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= | 130 | github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= |
129 | github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= | 131 | github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= |
130 | github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= | 132 | github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= |
133 | +github.com/ouqiang/timewheel v1.0.1 h1:XxhrYwqhJ3z8nthEnhZcHyZ/dcE29ACJEJR3Ika0W2g= | ||
134 | +github.com/ouqiang/timewheel v1.0.1/go.mod h1:896mz+8zvRU6i0PLVR0qaNuU5roxC874OB4TxUvUewY= | ||
131 | github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= | 135 | github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= |
132 | github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= | 136 | github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= |
133 | github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= | 137 | github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= |
@@ -149,6 +153,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R | @@ -149,6 +153,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R | ||
149 | github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= | 153 | github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= |
150 | github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= | 154 | github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= |
151 | github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= | 155 | github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= |
156 | +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= | ||
157 | +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= | ||
152 | github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= | 158 | github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= |
153 | github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= | 159 | github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= |
154 | github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= | 160 | github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= |
src/components/db/mongo.go
@@ -11,6 +11,7 @@ import ( | @@ -11,6 +11,7 @@ import ( | ||
11 | "pro2d/src/utils" | 11 | "pro2d/src/utils" |
12 | "sort" | 12 | "sort" |
13 | "strconv" | 13 | "strconv" |
14 | + "strings" | ||
14 | "time" | 15 | "time" |
15 | ) | 16 | ) |
16 | 17 | ||
@@ -221,6 +222,10 @@ func (m *MgoColl) Update(update interface{}) { | @@ -221,6 +222,10 @@ func (m *MgoColl) Update(update interface{}) { | ||
221 | m.FindOneAndUpdate(m.pri, update) | 222 | m.FindOneAndUpdate(m.pri, update) |
222 | } | 223 | } |
223 | 224 | ||
225 | +func (m *MgoColl) UpdateProperty(key string, val interface{}) { | ||
226 | + m.FindOneAndUpdate(m.pri, bson.M{strings.ToLower(key): val}) | ||
227 | +} | ||
228 | + | ||
224 | func (m *MgoColl)Save() { | 229 | func (m *MgoColl)Save() { |
225 | m.FindOneAndUpdate(m.pri, m.schema) | 230 | m.FindOneAndUpdate(m.pri, m.schema) |
226 | } | 231 | } |
227 | \ No newline at end of file | 232 | \ No newline at end of file |
@@ -0,0 +1,43 @@ | @@ -0,0 +1,43 @@ | ||
1 | +package db | ||
2 | + | ||
3 | +import ( | ||
4 | + "reflect" | ||
5 | + "strings" | ||
6 | +) | ||
7 | + | ||
8 | +type Schema struct { | ||
9 | + reflect.Type | ||
10 | + pri interface{} | ||
11 | + schema interface{} | ||
12 | +} | ||
13 | + | ||
14 | +func NewSchema(pri, schema interface{}) *Schema { | ||
15 | + s := reflect.TypeOf(schema) | ||
16 | + if s.Kind() == reflect.Ptr { | ||
17 | + s = reflect.TypeOf(s).Elem() | ||
18 | + } | ||
19 | + return &Schema{ | ||
20 | + Type: s, | ||
21 | + pri: pri, | ||
22 | + schema: schema, | ||
23 | + } | ||
24 | +} | ||
25 | + | ||
26 | +func (s *Schema)GetSchemaType() reflect.Type { | ||
27 | + return s.Type | ||
28 | +} | ||
29 | + | ||
30 | +func (s *Schema)GetCollName() string { | ||
31 | + return strings.ToLower(s.GetSchemaType().Name()) | ||
32 | +} | ||
33 | + | ||
34 | +func (s *Schema)GetPriKey() string { | ||
35 | + var pri string | ||
36 | + for i := 0; i < s.NumField(); i++ { | ||
37 | + if s.Field(i).Tag.Get("pri") == "1" { | ||
38 | + pri = strings.ToLower(s.Field(i).Name) | ||
39 | + break | ||
40 | + } | ||
41 | + } | ||
42 | + return pri | ||
43 | +} |
@@ -0,0 +1,133 @@ | @@ -0,0 +1,133 @@ | ||
1 | +package kafkatimer | ||
2 | + | ||
3 | +import ( | ||
4 | + "container/list" | ||
5 | + "sync" | ||
6 | + "sync/atomic" | ||
7 | + "unsafe" | ||
8 | +) | ||
9 | + | ||
10 | +// Timer represents a single event. When the Timer expires, the given | ||
11 | +// task will be executed. | ||
12 | +type Timer struct { | ||
13 | + expiration int64 // in milliseconds | ||
14 | + task func() | ||
15 | + | ||
16 | + // The bucket that holds the list to which this timer's element belongs. | ||
17 | + // | ||
18 | + // NOTE: This field may be updated and read concurrently, | ||
19 | + // through Timer.Stop() and Bucket.Flush(). | ||
20 | + b unsafe.Pointer // type: *bucket | ||
21 | + | ||
22 | + // The timer's element. | ||
23 | + element *list.Element | ||
24 | +} | ||
25 | + | ||
26 | +func (t *Timer) getBucket() *bucket { | ||
27 | + return (*bucket)(atomic.LoadPointer(&t.b)) | ||
28 | +} | ||
29 | + | ||
30 | +func (t *Timer) setBucket(b *bucket) { | ||
31 | + atomic.StorePointer(&t.b, unsafe.Pointer(b)) | ||
32 | +} | ||
33 | + | ||
34 | +// Stop prevents the Timer from firing. It returns true if the call | ||
35 | +// stops the timer, false if the timer has already expired or been stopped. | ||
36 | +// | ||
37 | +// If the timer t has already expired and the t.task has been started in its own | ||
38 | +// goroutine; Stop does not wait for t.task to complete before returning. If the caller | ||
39 | +// needs to know whether t.task is completed, it must coordinate with t.task explicitly. | ||
40 | +func (t *Timer) Stop() bool { | ||
41 | + stopped := false | ||
42 | + for b := t.getBucket(); b != nil; b = t.getBucket() { | ||
43 | + // If b.Remove is called just after the timing wheel's goroutine has: | ||
44 | + // 1. removed t from b (through b.Flush -> b.remove) | ||
45 | + // 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add) | ||
46 | + // this may fail to remove t due to the change of t's bucket. | ||
47 | + stopped = b.Remove(t) | ||
48 | + | ||
49 | + // Thus, here we re-get t's possibly new bucket (nil for case 1, or ab (non-nil) for case 2), | ||
50 | + // and retry until the bucket becomes nil, which indicates that t has finally been removed. | ||
51 | + } | ||
52 | + return stopped | ||
53 | +} | ||
54 | + | ||
55 | +type bucket struct { | ||
56 | + // 64-bit atomic operations require 64-bit alignment, but 32-bit | ||
57 | + // compilers do not ensure it. So we must keep the 64-bit field | ||
58 | + // as the first field of the struct. | ||
59 | + // | ||
60 | + // For more explanations, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG | ||
61 | + // and https://go101.org/article/memory-layout.html. | ||
62 | + expiration int64 | ||
63 | + | ||
64 | + mu sync.Mutex | ||
65 | + timers *list.List | ||
66 | +} | ||
67 | + | ||
68 | +func newBucket() *bucket { | ||
69 | + return &bucket{ | ||
70 | + timers: list.New(), | ||
71 | + expiration: -1, | ||
72 | + } | ||
73 | +} | ||
74 | + | ||
75 | +func (b *bucket) Expiration() int64 { | ||
76 | + return atomic.LoadInt64(&b.expiration) | ||
77 | +} | ||
78 | + | ||
79 | +func (b *bucket) SetExpiration(expiration int64) bool { | ||
80 | + return atomic.SwapInt64(&b.expiration, expiration) != expiration | ||
81 | +} | ||
82 | + | ||
83 | +func (b *bucket) Add(t *Timer) { | ||
84 | + b.mu.Lock() | ||
85 | + | ||
86 | + e := b.timers.PushBack(t) | ||
87 | + t.setBucket(b) | ||
88 | + t.element = e | ||
89 | + | ||
90 | + b.mu.Unlock() | ||
91 | +} | ||
92 | + | ||
93 | +func (b *bucket) remove(t *Timer) bool { | ||
94 | + if t.getBucket() != b { | ||
95 | + // If remove is called from within t.Stop, and this happens just after the timing wheel's goroutine has: | ||
96 | + // 1. removed t from b (through b.Flush -> b.remove) | ||
97 | + // 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add) | ||
98 | + // then t.getBucket will return nil for case 1, or ab (non-nil) for case 2. | ||
99 | + // In either case, the returned value does not equal to b. | ||
100 | + return false | ||
101 | + } | ||
102 | + b.timers.Remove(t.element) | ||
103 | + t.setBucket(nil) | ||
104 | + t.element = nil | ||
105 | + return true | ||
106 | +} | ||
107 | + | ||
108 | +func (b *bucket) Remove(t *Timer) bool { | ||
109 | + b.mu.Lock() | ||
110 | + defer b.mu.Unlock() | ||
111 | + return b.remove(t) | ||
112 | +} | ||
113 | + | ||
114 | +func (b *bucket) Flush(reinsert func(*Timer)) { | ||
115 | + b.mu.Lock() | ||
116 | + defer b.mu.Unlock() | ||
117 | + | ||
118 | + for e := b.timers.Front(); e != nil; { | ||
119 | + next := e.Next() | ||
120 | + | ||
121 | + t := e.Value.(*Timer) | ||
122 | + b.remove(t) | ||
123 | + // Note that this operation will either execute the timer's task, or | ||
124 | + // insert the timer into another bucket belonging to a lower-level wheel. | ||
125 | + // | ||
126 | + // In either case, no further lock operation will happen to b.mu. | ||
127 | + reinsert(t) | ||
128 | + | ||
129 | + e = next | ||
130 | + } | ||
131 | + | ||
132 | + b.SetExpiration(-1) | ||
133 | +} |
@@ -0,0 +1,183 @@ | @@ -0,0 +1,183 @@ | ||
1 | +package kafkatimer | ||
2 | + | ||
3 | +import ( | ||
4 | + "container/heap" | ||
5 | + "sync" | ||
6 | + "sync/atomic" | ||
7 | + "time" | ||
8 | +) | ||
9 | + | ||
10 | +type item struct { | ||
11 | + Value interface{} | ||
12 | + Priority int64 | ||
13 | + Index int | ||
14 | +} | ||
15 | + | ||
16 | +// this is a priority queue as implemented by a min heap | ||
17 | +// ie. the 0th element is the *lowest* value | ||
18 | +type priorityQueue []*item | ||
19 | + | ||
20 | +func newPriorityQueue(capacity int) priorityQueue { | ||
21 | + return make(priorityQueue, 0, capacity) | ||
22 | +} | ||
23 | + | ||
24 | +func (pq priorityQueue) Len() int { | ||
25 | + return len(pq) | ||
26 | +} | ||
27 | + | ||
28 | +func (pq priorityQueue) Less(i, j int) bool { | ||
29 | + return pq[i].Priority < pq[j].Priority | ||
30 | +} | ||
31 | + | ||
32 | +func (pq priorityQueue) Swap(i, j int) { | ||
33 | + pq[i], pq[j] = pq[j], pq[i] | ||
34 | + pq[i].Index = i | ||
35 | + pq[j].Index = j | ||
36 | +} | ||
37 | + | ||
38 | +func (pq *priorityQueue) Push(x interface{}) { | ||
39 | + n := len(*pq) | ||
40 | + c := cap(*pq) | ||
41 | + if n+1 > c { | ||
42 | + npq := make(priorityQueue, n, c*2) | ||
43 | + copy(npq, *pq) | ||
44 | + *pq = npq | ||
45 | + } | ||
46 | + *pq = (*pq)[0 : n+1] | ||
47 | + item := x.(*item) | ||
48 | + item.Index = n | ||
49 | + (*pq)[n] = item | ||
50 | +} | ||
51 | + | ||
52 | +func (pq *priorityQueue) Pop() interface{} { | ||
53 | + n := len(*pq) | ||
54 | + c := cap(*pq) | ||
55 | + if n < (c/2) && c > 25 { | ||
56 | + npq := make(priorityQueue, n, c/2) | ||
57 | + copy(npq, *pq) | ||
58 | + *pq = npq | ||
59 | + } | ||
60 | + item := (*pq)[n-1] | ||
61 | + item.Index = -1 | ||
62 | + *pq = (*pq)[0 : n-1] | ||
63 | + return item | ||
64 | +} | ||
65 | + | ||
66 | +func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) { | ||
67 | + if pq.Len() == 0 { | ||
68 | + return nil, 0 | ||
69 | + } | ||
70 | + | ||
71 | + item := (*pq)[0] | ||
72 | + if item.Priority > max { | ||
73 | + return nil, item.Priority - max | ||
74 | + } | ||
75 | + heap.Remove(pq, 0) | ||
76 | + | ||
77 | + return item, 0 | ||
78 | +} | ||
79 | + | ||
80 | +// The end of PriorityQueue implementation. | ||
81 | + | ||
82 | +// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which | ||
83 | +// an element can only be taken when its delay has expired. The head of the | ||
84 | +// queue is the *Delayed* element whose delay expired furthest in the past. | ||
85 | +type DelayQueue struct { | ||
86 | + C chan interface{} | ||
87 | + | ||
88 | + mu sync.Mutex | ||
89 | + pq priorityQueue | ||
90 | + | ||
91 | + // Similar to the sleeping state of runtime.timers. | ||
92 | + sleeping int32 | ||
93 | + wakeupC chan struct{} | ||
94 | +} | ||
95 | + | ||
96 | +// New creates an instance of delayQueue with the specified size. | ||
97 | +func New(size int) *DelayQueue { | ||
98 | + return &DelayQueue{ | ||
99 | + C: make(chan interface{}), | ||
100 | + pq: newPriorityQueue(size), | ||
101 | + wakeupC: make(chan struct{}), | ||
102 | + } | ||
103 | +} | ||
104 | + | ||
105 | +// Offer inserts the element into the current queue. | ||
106 | +func (dq *DelayQueue) Offer(elem interface{}, expiration int64) { | ||
107 | + item := &item{Value: elem, Priority: expiration} | ||
108 | + | ||
109 | + dq.mu.Lock() | ||
110 | + heap.Push(&dq.pq, item) | ||
111 | + index := item.Index | ||
112 | + dq.mu.Unlock() | ||
113 | + | ||
114 | + if index == 0 { | ||
115 | + // A new item with the earliest expiration is added. | ||
116 | + if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) { | ||
117 | + dq.wakeupC <- struct{}{} | ||
118 | + } | ||
119 | + } | ||
120 | +} | ||
121 | + | ||
122 | +// Poll starts an infinite loop, in which it continually waits for an element | ||
123 | +// to expire and then send the expired element to the channel C. | ||
124 | +func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) { | ||
125 | + for { | ||
126 | + now := nowF() | ||
127 | + | ||
128 | + dq.mu.Lock() | ||
129 | + item, delta := dq.pq.PeekAndShift(now) | ||
130 | + if item == nil { | ||
131 | + // No items left or at least one item is pending. | ||
132 | + | ||
133 | + // We must ensure the atomicity of the whole operation, which is | ||
134 | + // composed of the above PeekAndShift and the following StoreInt32, | ||
135 | + // to avoid possible race conditions between Offer and Poll. | ||
136 | + atomic.StoreInt32(&dq.sleeping, 1) | ||
137 | + } | ||
138 | + dq.mu.Unlock() | ||
139 | + | ||
140 | + if item == nil { | ||
141 | + if delta == 0 { | ||
142 | + // No items left. | ||
143 | + select { | ||
144 | + case <-dq.wakeupC: | ||
145 | + // Wait until a new item is added. | ||
146 | + continue | ||
147 | + case <-exitC: | ||
148 | + goto exit | ||
149 | + } | ||
150 | + } else if delta > 0 { | ||
151 | + // At least one item is pending. | ||
152 | + select { | ||
153 | + case <-dq.wakeupC: | ||
154 | + // A new item with an "earlier" expiration than the current "earliest" one is added. | ||
155 | + continue | ||
156 | + case <-time.After(time.Duration(delta) * time.Millisecond): | ||
157 | + // The current "earliest" item expires. | ||
158 | + | ||
159 | + // Reset the sleeping state since there's no need to receive from wakeupC. | ||
160 | + if atomic.SwapInt32(&dq.sleeping, 0) == 0 { | ||
161 | + // A caller of Offer() is being blocked on sending to wakeupC, | ||
162 | + // drain wakeupC to unblock the caller. | ||
163 | + <-dq.wakeupC | ||
164 | + } | ||
165 | + continue | ||
166 | + case <-exitC: | ||
167 | + goto exit | ||
168 | + } | ||
169 | + } | ||
170 | + } | ||
171 | + | ||
172 | + select { | ||
173 | + case dq.C <- item.Value: | ||
174 | + // The expired element has been sent out successfully. | ||
175 | + case <-exitC: | ||
176 | + goto exit | ||
177 | + } | ||
178 | + } | ||
179 | + | ||
180 | +exit: | ||
181 | + // Reset the states | ||
182 | + atomic.StoreInt32(&dq.sleeping, 0) | ||
183 | +} |
@@ -0,0 +1,186 @@ | @@ -0,0 +1,186 @@ | ||
1 | +package delayqueue | ||
2 | + | ||
3 | +import ( | ||
4 | + "container/heap" | ||
5 | + "sync" | ||
6 | + "sync/atomic" | ||
7 | + "time" | ||
8 | +) | ||
9 | + | ||
10 | +// The start of PriorityQueue implementation. | ||
11 | +// Borrowed from https://github.com/nsqio/nsq/blob/master/internal/pqueue/pqueue.go | ||
12 | + | ||
13 | +type item struct { | ||
14 | + Value interface{} | ||
15 | + Priority int64 | ||
16 | + Index int | ||
17 | +} | ||
18 | + | ||
19 | +// this is a priority queue as implemented by a min heap | ||
20 | +// ie. the 0th element is the *lowest* value | ||
21 | +type priorityQueue []*item | ||
22 | + | ||
23 | +func newPriorityQueue(capacity int) priorityQueue { | ||
24 | + return make(priorityQueue, 0, capacity) | ||
25 | +} | ||
26 | + | ||
27 | +func (pq priorityQueue) Len() int { | ||
28 | + return len(pq) | ||
29 | +} | ||
30 | + | ||
31 | +func (pq priorityQueue) Less(i, j int) bool { | ||
32 | + return pq[i].Priority < pq[j].Priority | ||
33 | +} | ||
34 | + | ||
35 | +func (pq priorityQueue) Swap(i, j int) { | ||
36 | + pq[i], pq[j] = pq[j], pq[i] | ||
37 | + pq[i].Index = i | ||
38 | + pq[j].Index = j | ||
39 | +} | ||
40 | + | ||
41 | +func (pq *priorityQueue) Push(x interface{}) { | ||
42 | + n := len(*pq) | ||
43 | + c := cap(*pq) | ||
44 | + if n+1 > c { | ||
45 | + npq := make(priorityQueue, n, c*2) | ||
46 | + copy(npq, *pq) | ||
47 | + *pq = npq | ||
48 | + } | ||
49 | + *pq = (*pq)[0 : n+1] | ||
50 | + item := x.(*item) | ||
51 | + item.Index = n | ||
52 | + (*pq)[n] = item | ||
53 | +} | ||
54 | + | ||
55 | +func (pq *priorityQueue) Pop() interface{} { | ||
56 | + n := len(*pq) | ||
57 | + c := cap(*pq) | ||
58 | + if n < (c/2) && c > 25 { | ||
59 | + npq := make(priorityQueue, n, c/2) | ||
60 | + copy(npq, *pq) | ||
61 | + *pq = npq | ||
62 | + } | ||
63 | + item := (*pq)[n-1] | ||
64 | + item.Index = -1 | ||
65 | + *pq = (*pq)[0 : n-1] | ||
66 | + return item | ||
67 | +} | ||
68 | + | ||
69 | +func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) { | ||
70 | + if pq.Len() == 0 { | ||
71 | + return nil, 0 | ||
72 | + } | ||
73 | + | ||
74 | + item := (*pq)[0] | ||
75 | + if item.Priority > max { | ||
76 | + return nil, item.Priority - max | ||
77 | + } | ||
78 | + heap.Remove(pq, 0) | ||
79 | + | ||
80 | + return item, 0 | ||
81 | +} | ||
82 | + | ||
83 | +// The end of PriorityQueue implementation. | ||
84 | + | ||
85 | +// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which | ||
86 | +// an element can only be taken when its delay has expired. The head of the | ||
87 | +// queue is the *Delayed* element whose delay expired furthest in the past. | ||
88 | +type DelayQueue struct { | ||
89 | + C chan interface{} | ||
90 | + | ||
91 | + mu sync.Mutex | ||
92 | + pq priorityQueue | ||
93 | + | ||
94 | + // Similar to the sleeping state of runtime.timers. | ||
95 | + sleeping int32 | ||
96 | + wakeupC chan struct{} | ||
97 | +} | ||
98 | + | ||
99 | +// New creates an instance of delayQueue with the specified size. | ||
100 | +func New(size int) *DelayQueue { | ||
101 | + return &DelayQueue{ | ||
102 | + C: make(chan interface{}), | ||
103 | + pq: newPriorityQueue(size), | ||
104 | + wakeupC: make(chan struct{}), | ||
105 | + } | ||
106 | +} | ||
107 | + | ||
108 | +// Offer inserts the element into the current queue. | ||
109 | +func (dq *DelayQueue) Offer(elem interface{}, expiration int64) { | ||
110 | + item := &item{Value: elem, Priority: expiration} | ||
111 | + | ||
112 | + dq.mu.Lock() | ||
113 | + heap.Push(&dq.pq, item) | ||
114 | + index := item.Index | ||
115 | + dq.mu.Unlock() | ||
116 | + | ||
117 | + if index == 0 { | ||
118 | + // A new item with the earliest expiration is added. | ||
119 | + if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) { | ||
120 | + dq.wakeupC <- struct{}{} | ||
121 | + } | ||
122 | + } | ||
123 | +} | ||
124 | + | ||
125 | +// Poll starts an infinite loop, in which it continually waits for an element | ||
126 | +// to expire and then send the expired element to the channel C. | ||
127 | +func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) { | ||
128 | + for { | ||
129 | + now := nowF() | ||
130 | + | ||
131 | + dq.mu.Lock() | ||
132 | + item, delta := dq.pq.PeekAndShift(now) | ||
133 | + if item == nil { | ||
134 | + // No items left or at least one item is pending. | ||
135 | + | ||
136 | + // We must ensure the atomicity of the whole operation, which is | ||
137 | + // composed of the above PeekAndShift and the following StoreInt32, | ||
138 | + // to avoid possible race conditions between Offer and Poll. | ||
139 | + atomic.StoreInt32(&dq.sleeping, 1) | ||
140 | + } | ||
141 | + dq.mu.Unlock() | ||
142 | + | ||
143 | + if item == nil { | ||
144 | + if delta == 0 { | ||
145 | + // No items left. | ||
146 | + select { | ||
147 | + case <-dq.wakeupC: | ||
148 | + // Wait until a new item is added. | ||
149 | + continue | ||
150 | + case <-exitC: | ||
151 | + goto exit | ||
152 | + } | ||
153 | + } else if delta > 0 { | ||
154 | + // At least one item is pending. | ||
155 | + select { | ||
156 | + case <-dq.wakeupC: | ||
157 | + // A new item with an "earlier" expiration than the current "earliest" one is added. | ||
158 | + continue | ||
159 | + case <-time.After(time.Duration(delta) * time.Millisecond): | ||
160 | + // The current "earliest" item expires. | ||
161 | + | ||
162 | + // Reset the sleeping state since there's no need to receive from wakeupC. | ||
163 | + if atomic.SwapInt32(&dq.sleeping, 0) == 0 { | ||
164 | + // A caller of Offer() is being blocked on sending to wakeupC, | ||
165 | + // drain wakeupC to unblock the caller. | ||
166 | + <-dq.wakeupC | ||
167 | + } | ||
168 | + continue | ||
169 | + case <-exitC: | ||
170 | + goto exit | ||
171 | + } | ||
172 | + } | ||
173 | + } | ||
174 | + | ||
175 | + select { | ||
176 | + case dq.C <- item.Value: | ||
177 | + // The expired element has been sent out successfully. | ||
178 | + case <-exitC: | ||
179 | + goto exit | ||
180 | + } | ||
181 | + } | ||
182 | + | ||
183 | +exit: | ||
184 | + // Reset the states | ||
185 | + atomic.StoreInt32(&dq.sleeping, 0) | ||
186 | +} |
@@ -0,0 +1,227 @@ | @@ -0,0 +1,227 @@ | ||
1 | +package kafkatimer | ||
2 | + | ||
3 | +import ( | ||
4 | + "errors" | ||
5 | + "sync/atomic" | ||
6 | + "time" | ||
7 | + "unsafe" | ||
8 | + | ||
9 | + "github.com/RussellLuo/timingwheel/delayqueue" | ||
10 | +) | ||
11 | +//时间轮 kafka | ||
12 | + | ||
13 | +// TimingWheel is an implementation of Hierarchical Timing Wheels. | ||
14 | +type TimingWheel struct { | ||
15 | + tick int64 // in milliseconds | ||
16 | + wheelSize int64 | ||
17 | + | ||
18 | + interval int64 // in milliseconds | ||
19 | + currentTime int64 // in milliseconds | ||
20 | + buckets []*bucket | ||
21 | + queue *delayqueue.DelayQueue | ||
22 | + | ||
23 | + // The higher-level overflow wheel. | ||
24 | + // | ||
25 | + // NOTE: This field may be updated and read concurrently, through Add(). | ||
26 | + overflowWheel unsafe.Pointer // type: *TimingWheel | ||
27 | + | ||
28 | + exitC chan struct{} | ||
29 | + waitGroup waitGroupWrapper | ||
30 | +} | ||
31 | + | ||
32 | +// NewTimingWheel creates an instance of TimingWheel with the given tick and wheelSize. | ||
33 | +func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel { | ||
34 | + tickMs := int64(tick / time.Millisecond) | ||
35 | + if tickMs <= 0 { | ||
36 | + panic(errors.New("tick must be greater than or equal to 1ms")) | ||
37 | + } | ||
38 | + | ||
39 | + startMs := timeToMs(time.Now().UTC()) | ||
40 | + | ||
41 | + return newTimingWheel( | ||
42 | + tickMs, | ||
43 | + wheelSize, | ||
44 | + startMs, | ||
45 | + delayqueue.New(int(wheelSize)), | ||
46 | + ) | ||
47 | +} | ||
48 | + | ||
49 | +// newTimingWheel is an internal helper function that really creates an instance of TimingWheel. | ||
50 | +func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel { | ||
51 | + buckets := make([]*bucket, wheelSize) | ||
52 | + for i := range buckets { | ||
53 | + buckets[i] = newBucket() | ||
54 | + } | ||
55 | + return &TimingWheel{ | ||
56 | + tick: tickMs, | ||
57 | + wheelSize: wheelSize, | ||
58 | + currentTime: truncate(startMs, tickMs), | ||
59 | + interval: tickMs * wheelSize, | ||
60 | + buckets: buckets, | ||
61 | + queue: queue, | ||
62 | + exitC: make(chan struct{}), | ||
63 | + } | ||
64 | +} | ||
65 | + | ||
66 | +// add inserts the timer t into the current timing wheel. | ||
67 | +func (tw *TimingWheel) add(t *Timer) bool { | ||
68 | + currentTime := atomic.LoadInt64(&tw.currentTime) | ||
69 | + if t.expiration < currentTime+tw.tick { | ||
70 | + // Already expired | ||
71 | + return false | ||
72 | + } else if t.expiration < currentTime+tw.interval { | ||
73 | + // Put it into its own bucket | ||
74 | + virtualID := t.expiration / tw.tick | ||
75 | + b := tw.buckets[virtualID%tw.wheelSize] | ||
76 | + b.Add(t) | ||
77 | + | ||
78 | + // Set the bucket expiration time | ||
79 | + if b.SetExpiration(virtualID * tw.tick) { | ||
80 | + // The bucket needs to be enqueued since it was an expired bucket. | ||
81 | + // We only need to enqueue the bucket when its expiration time has changed, | ||
82 | + // i.e. the wheel has advanced and this bucket get reused with a new expiration. | ||
83 | + // Any further calls to set the expiration within the same wheel cycle will | ||
84 | + // pass in the same value and hence return false, thus the bucket with the | ||
85 | + // same expiration will not be enqueued multiple times. | ||
86 | + tw.queue.Offer(b, b.Expiration()) | ||
87 | + } | ||
88 | + | ||
89 | + return true | ||
90 | + } else { | ||
91 | + // Out of the interval. Put it into the overflow wheel | ||
92 | + overflowWheel := atomic.LoadPointer(&tw.overflowWheel) | ||
93 | + if overflowWheel == nil { | ||
94 | + atomic.CompareAndSwapPointer( | ||
95 | + &tw.overflowWheel, | ||
96 | + nil, | ||
97 | + unsafe.Pointer(newTimingWheel( | ||
98 | + tw.interval, | ||
99 | + tw.wheelSize, | ||
100 | + currentTime, | ||
101 | + tw.queue, | ||
102 | + )), | ||
103 | + ) | ||
104 | + overflowWheel = atomic.LoadPointer(&tw.overflowWheel) | ||
105 | + } | ||
106 | + return (*TimingWheel)(overflowWheel).add(t) | ||
107 | + } | ||
108 | +} | ||
109 | + | ||
110 | +// addOrRun inserts the timer t into the current timing wheel, or run the | ||
111 | +// timer's task if it has already expired. | ||
112 | +func (tw *TimingWheel) addOrRun(t *Timer) { | ||
113 | + if !tw.add(t) { | ||
114 | + // Already expired | ||
115 | + | ||
116 | + // Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc), | ||
117 | + // always execute the timer's task in its own goroutine. | ||
118 | + go t.task() | ||
119 | + } | ||
120 | +} | ||
121 | + | ||
122 | +func (tw *TimingWheel) advanceClock(expiration int64) { | ||
123 | + currentTime := atomic.LoadInt64(&tw.currentTime) | ||
124 | + if expiration >= currentTime+tw.tick { | ||
125 | + currentTime = truncate(expiration, tw.tick) | ||
126 | + atomic.StoreInt64(&tw.currentTime, currentTime) | ||
127 | + | ||
128 | + // Try to advance the clock of the overflow wheel if present | ||
129 | + overflowWheel := atomic.LoadPointer(&tw.overflowWheel) | ||
130 | + if overflowWheel != nil { | ||
131 | + (*TimingWheel)(overflowWheel).advanceClock(currentTime) | ||
132 | + } | ||
133 | + } | ||
134 | +} | ||
135 | + | ||
136 | +// Start starts the current timing wheel. | ||
137 | +func (tw *TimingWheel) Start() { | ||
138 | + tw.waitGroup.Wrap(func() { | ||
139 | + tw.queue.Poll(tw.exitC, func() int64 { | ||
140 | + return timeToMs(time.Now().UTC()) | ||
141 | + }) | ||
142 | + }) | ||
143 | + | ||
144 | + tw.waitGroup.Wrap(func() { | ||
145 | + for { | ||
146 | + select { | ||
147 | + case elem := <-tw.queue.C: | ||
148 | + b := elem.(*bucket) | ||
149 | + tw.advanceClock(b.Expiration()) | ||
150 | + b.Flush(tw.addOrRun) | ||
151 | + case <-tw.exitC: | ||
152 | + return | ||
153 | + } | ||
154 | + } | ||
155 | + }) | ||
156 | +} | ||
157 | + | ||
158 | +// Stop stops the current timing wheel. | ||
159 | +// | ||
160 | +// If there is any timer's task being running in its own goroutine, Stop does | ||
161 | +// not wait for the task to complete before returning. If the caller needs to | ||
162 | +// know whether the task is completed, it must coordinate with the task explicitly. | ||
163 | +func (tw *TimingWheel) Stop() { | ||
164 | + close(tw.exitC) | ||
165 | + tw.waitGroup.Wait() | ||
166 | +} | ||
167 | + | ||
168 | +// AfterFunc waits for the duration to elapse and then calls f in its own goroutine. | ||
169 | +// It returns a Timer that can be used to cancel the call using its Stop method. | ||
170 | +func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer { | ||
171 | + t := &Timer{ | ||
172 | + expiration: timeToMs(time.Now().UTC().Add(d)), | ||
173 | + task: f, | ||
174 | + } | ||
175 | + tw.addOrRun(t) | ||
176 | + return t | ||
177 | +} | ||
178 | + | ||
179 | +// Scheduler determines the execution plan of a task. | ||
180 | +type Scheduler interface { | ||
181 | + // Next returns the next execution time after the given (previous) time. | ||
182 | + // It will return a zero time if no next time is scheduled. | ||
183 | + // | ||
184 | + // All times must be UTC. | ||
185 | + Next(time.Time) time.Time | ||
186 | +} | ||
187 | + | ||
188 | +// ScheduleFunc calls f (in its own goroutine) according to the execution | ||
189 | +// plan scheduled by s. It returns a Timer that can be used to cancel the | ||
190 | +// call using its Stop method. | ||
191 | +// | ||
192 | +// If the caller want to terminate the execution plan halfway, it must | ||
193 | +// stop the timer and ensure that the timer is stopped actually, since in | ||
194 | +// the current implementation, there is a gap between the expiring and the | ||
195 | +// restarting of the timer. The wait time for ensuring is short since the | ||
196 | +// gap is very small. | ||
197 | +// | ||
198 | +// Internally, ScheduleFunc will ask the first execution time (by calling | ||
199 | +// s.Next()) initially, and create a timer if the execution time is non-zero. | ||
200 | +// Afterwards, it will ask the next execution time each time f is about to | ||
201 | +// be executed, and f will be called at the next execution time if the time | ||
202 | +// is non-zero. | ||
203 | +func (tw *TimingWheel) ScheduleFunc(s Scheduler, f func()) (t *Timer) { | ||
204 | + expiration := s.Next(time.Now().UTC()) | ||
205 | + if expiration.IsZero() { | ||
206 | + // No time is scheduled, return nil. | ||
207 | + return | ||
208 | + } | ||
209 | + | ||
210 | + t = &Timer{ | ||
211 | + expiration: timeToMs(expiration), | ||
212 | + task: func() { | ||
213 | + // Schedule the task to execute at the next time if possible. | ||
214 | + expiration := s.Next(msToTime(t.expiration)) | ||
215 | + if !expiration.IsZero() { | ||
216 | + t.expiration = timeToMs(expiration) | ||
217 | + tw.addOrRun(t) | ||
218 | + } | ||
219 | + | ||
220 | + // Actually execute the task. | ||
221 | + f() | ||
222 | + }, | ||
223 | + } | ||
224 | + tw.addOrRun(t) | ||
225 | + | ||
226 | + return | ||
227 | +} |
@@ -0,0 +1,26 @@ | @@ -0,0 +1,26 @@ | ||
1 | +package kafkatimer | ||
2 | + | ||
3 | +import ( | ||
4 | + "fmt" | ||
5 | + "testing" | ||
6 | + "time" | ||
7 | +) | ||
8 | + | ||
9 | +func F() { | ||
10 | + fmt.Println("I'm timer...") | ||
11 | +} | ||
12 | + | ||
13 | + | ||
14 | +func TestTimingWheel_Start(t *testing.T) { | ||
15 | + tw := NewTimingWheel(time.Millisecond, 20) | ||
16 | + tw.Start() | ||
17 | + defer tw.Stop() | ||
18 | + | ||
19 | + exitC := make(chan time.Time, 1) | ||
20 | + tw.AfterFunc(10 * time.Second, func() { | ||
21 | + fmt.Println("The timer fires") | ||
22 | + exitC <- time.Now().UTC() | ||
23 | + }) | ||
24 | + | ||
25 | + fmt.Println(<-exitC) | ||
26 | +} | ||
0 | \ No newline at end of file | 27 | \ No newline at end of file |
@@ -0,0 +1,38 @@ | @@ -0,0 +1,38 @@ | ||
1 | +package kafkatimer | ||
2 | + | ||
3 | +import ( | ||
4 | + "sync" | ||
5 | + "time" | ||
6 | +) | ||
7 | + | ||
8 | +// truncate returns the result of rounding x toward zero to a multiple of m. | ||
9 | +// If m <= 0, Truncate returns x unchanged. | ||
10 | +func truncate(x, m int64) int64 { | ||
11 | + if m <= 0 { | ||
12 | + return x | ||
13 | + } | ||
14 | + return x - x%m | ||
15 | +} | ||
16 | + | ||
17 | +// timeToMs returns an integer number, which represents t in milliseconds. | ||
18 | +func timeToMs(t time.Time) int64 { | ||
19 | + return t.UnixNano() / int64(time.Millisecond) | ||
20 | +} | ||
21 | + | ||
22 | +// msToTime returns the UTC time corresponding to the given Unix time, | ||
23 | +// t milliseconds since January 1, 1970 UTC. | ||
24 | +func msToTime(t int64) time.Time { | ||
25 | + return time.Unix(0, t*int64(time.Millisecond)).UTC() | ||
26 | +} | ||
27 | + | ||
28 | +type waitGroupWrapper struct { | ||
29 | + sync.WaitGroup | ||
30 | +} | ||
31 | + | ||
32 | +func (w *waitGroupWrapper) Wrap(cb func()) { | ||
33 | + w.Add(1) | ||
34 | + go func() { | ||
35 | + cb() | ||
36 | + w.Done() | ||
37 | + }() | ||
38 | +} |
src/components/net/conn.go
@@ -54,14 +54,16 @@ func (c *Connection) write() { | @@ -54,14 +54,16 @@ func (c *Connection) write() { | ||
54 | defer c.Quiting() | 54 | defer c.Quiting() |
55 | 55 | ||
56 | for msg := range c.WBuffer { | 56 | for msg := range c.WBuffer { |
57 | - if _, err := c.writer.Write(msg); err != nil { | ||
58 | - fmt.Println("write fail err: " + err.Error()) | 57 | + n, err := c.writer.Write(msg) |
58 | + if err != nil{ | ||
59 | + logger.Error("write fail err: " + err.Error(), "n: ", n) | ||
59 | return | 60 | return |
60 | } | 61 | } |
61 | if err := c.writer.Flush(); err != nil { | 62 | if err := c.writer.Flush(); err != nil { |
62 | - fmt.Println("write Flush fail err: " + err.Error()) | 63 | + logger.Error("write Flush fail err: " + err.Error()) |
63 | return | 64 | return |
64 | } | 65 | } |
66 | + logger.Debug("write :%s, n: %d", msg, n) | ||
65 | } | 67 | } |
66 | } | 68 | } |
67 | 69 | ||
@@ -95,11 +97,14 @@ func (c *Connection) read() { | @@ -95,11 +97,14 @@ func (c *Connection) read() { | ||
95 | func (c *Connection) Start() { | 97 | func (c *Connection) Start() { |
96 | go c.write() | 98 | go c.write() |
97 | go c.read() | 99 | go c.read() |
100 | + //for { | ||
101 | + // c.SendMsgByCode(100, 1, nil) | ||
102 | + // time.Sleep(2*time.Second) | ||
103 | + //} | ||
98 | } | 104 | } |
99 | 105 | ||
100 | func (c *Connection) Stop() { | 106 | func (c *Connection) Stop() { |
101 | close(c.RBuffer) | 107 | close(c.RBuffer) |
102 | - close(c.WBuffer) | ||
103 | c.Conn.Close() | 108 | c.Conn.Close() |
104 | } | 109 | } |
105 | 110 |
src/components/net/server.go
@@ -36,6 +36,10 @@ func NewServer(sConf *conf.SConf) *Server { | @@ -36,6 +36,10 @@ func NewServer(sConf *conf.SConf) *Server { | ||
36 | } | 36 | } |
37 | } | 37 | } |
38 | 38 | ||
39 | +func (s *Server) StartTimer() { | ||
40 | + | ||
41 | +} | ||
42 | + | ||
39 | //StartWorkerPool 启动worker工作池 | 43 | //StartWorkerPool 启动worker工作池 |
40 | func (s *Server) StartWorkerPool() { | 44 | func (s *Server) StartWorkerPool() { |
41 | //遍历需要启动worker的数量,依此启动 | 45 | //遍历需要启动worker的数量,依此启动 |
@@ -62,11 +66,11 @@ func (s *Server) StartOneWorker(workerID int, taskQueue chan *MsgPkg) { | @@ -62,11 +66,11 @@ func (s *Server) StartOneWorker(workerID int, taskQueue chan *MsgPkg) { | ||
62 | } | 66 | } |
63 | 67 | ||
64 | func (s *Server) DoMsgHandler(msg *MsgPkg) { | 68 | func (s *Server) DoMsgHandler(msg *MsgPkg) { |
65 | - logger.Debug("DoMsgHandler cmd: %d, data: %s", msg.Head.Cmd, msg.Body) | ||
66 | if md, ok := ActionMap[pb.ProtoCode(msg.Head.Cmd)]; ok { | 69 | if md, ok := ActionMap[pb.ProtoCode(msg.Head.Cmd)]; ok { |
67 | - logger.Debug("adfadfadfasdfadfadsf") | 70 | + logger.Debug("protocode handler: %d", msg.Head.Cmd) |
68 | errCode, protomsg := md(msg) | 71 | errCode, protomsg := md(msg) |
69 | rsp, err := proto.Marshal(protomsg) | 72 | rsp, err := proto.Marshal(protomsg) |
73 | + fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg) | ||
70 | if err != nil { | 74 | if err != nil { |
71 | msg.Conn.SendMsgByCode(-100, msg.Head.Cmd, nil) | 75 | msg.Conn.SendMsgByCode(-100, msg.Head.Cmd, nil) |
72 | return | 76 | return |
@@ -78,6 +82,7 @@ func (s *Server) DoMsgHandler(msg *MsgPkg) { | @@ -78,6 +82,7 @@ func (s *Server) DoMsgHandler(msg *MsgPkg) { | ||
78 | } | 82 | } |
79 | 83 | ||
80 | func (s *Server) OnClose(conn *Connection) { | 84 | func (s *Server) OnClose(conn *Connection) { |
85 | + //conn.Stop() | ||
81 | s.Clients.Delete(conn.Id) | 86 | s.Clients.Delete(conn.Id) |
82 | } | 87 | } |
83 | 88 |
@@ -0,0 +1 @@ | @@ -0,0 +1 @@ | ||
1 | +package skynettimer |
src/plugin/RolePlugin.go
@@ -45,11 +45,16 @@ func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) { | @@ -45,11 +45,16 @@ func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) { | ||
45 | if role == nil { | 45 | if role == nil { |
46 | return 2, nil | 46 | return 2, nil |
47 | } | 47 | } |
48 | + role.UpdateProperty("Device", req.Device) | ||
48 | 49 | ||
49 | - return 0, &pb.RoleRsp{ | ||
50 | - Role: role.Role, | ||
51 | - Hero: nil, | ||
52 | - Team: nil, | ||
53 | - Equips: nil, | 50 | + //return 0, &pb.RoleRsp{ |
51 | + // Role: role.Role, | ||
52 | + // Hero: nil, | ||
53 | + // Team: nil, | ||
54 | + // Equips: nil, | ||
55 | + //} | ||
56 | + return 0, &pb.LoginResponse{ | ||
57 | + Uid: role.Role.Uid, | ||
58 | + Device: role.Role.Device, | ||
54 | } | 59 | } |
55 | } | 60 | } |
56 | \ No newline at end of file | 61 | \ No newline at end of file |
src/plugin/protocode.go
@@ -10,8 +10,8 @@ func init() { | @@ -10,8 +10,8 @@ func init() { | ||
10 | logger.Debug("init protocode...") | 10 | logger.Debug("init protocode...") |
11 | net.ActionMap = make(map[pb.ProtoCode]net.ActionHandler) | 11 | net.ActionMap = make(map[pb.ProtoCode]net.ActionHandler) |
12 | 12 | ||
13 | - net.ActionMap[pb.ProtoCode_HeartRpc] = HeartRpc | ||
14 | - net.ActionMap[pb.ProtoCode_LoginRpc] = LoginRpc | ||
15 | - net.ActionMap[pb.ProtoCode_CreateRpc] = CreateRpc | 13 | + net.ActionMap[pb.ProtoCode_HeartReq] = HeartRpc |
14 | + net.ActionMap[pb.ProtoCode_LoginReq] = LoginRpc | ||
15 | + net.ActionMap[pb.ProtoCode_CreateReq] = CreateRpc | ||
16 | 16 | ||
17 | } | 17 | } |
src/utils/utils.go
@@ -5,6 +5,7 @@ import ( | @@ -5,6 +5,7 @@ import ( | ||
5 | "encoding/hex" | 5 | "encoding/hex" |
6 | "reflect" | 6 | "reflect" |
7 | "strings" | 7 | "strings" |
8 | + "time" | ||
8 | ) | 9 | ) |
9 | 10 | ||
10 | func GetSchemaType(schema interface{}) reflect.Type { | 11 | func GetSchemaType(schema interface{}) reflect.Type { |
@@ -52,4 +53,8 @@ func Md5V(str string) string { | @@ -52,4 +53,8 @@ func Md5V(str string) string { | ||
52 | h := md5.New() | 53 | h := md5.New() |
53 | h.Write([]byte(str)) | 54 | h.Write([]byte(str)) |
54 | return hex.EncodeToString(h.Sum(nil)) | 55 | return hex.EncodeToString(h.Sum(nil)) |
56 | +} | ||
57 | + | ||
58 | +func Timex() int64 { | ||
59 | + return time.Now().UnixMicro() | ||
55 | } | 60 | } |
56 | \ No newline at end of file | 61 | \ No newline at end of file |
test/client.go
1 | package main | 1 | package main |
2 | 2 | ||
3 | import ( | 3 | import ( |
4 | + "bufio" | ||
4 | "bytes" | 5 | "bytes" |
5 | "encoding/binary" | 6 | "encoding/binary" |
6 | "github.com/golang/protobuf/proto" | 7 | "github.com/golang/protobuf/proto" |
@@ -8,6 +9,7 @@ import ( | @@ -8,6 +9,7 @@ import ( | ||
8 | "pro2d/protos/pb" | 9 | "pro2d/protos/pb" |
9 | "pro2d/src/components/logger" | 10 | "pro2d/src/components/logger" |
10 | net2 "pro2d/src/components/net" | 11 | net2 "pro2d/src/components/net" |
12 | + "time" | ||
11 | ) | 13 | ) |
12 | 14 | ||
13 | func main() { | 15 | func main() { |
@@ -49,5 +51,25 @@ func main() { | @@ -49,5 +51,25 @@ func main() { | ||
49 | logger.Error(err) | 51 | logger.Error(err) |
50 | return | 52 | return |
51 | } | 53 | } |
52 | - client.Write(buf.Bytes()) | 54 | + |
55 | + rd := bufio.NewReadWriter(bufio.NewReader(client), bufio.NewWriter(client)) | ||
56 | + for { | ||
57 | + b1 := make([]byte, 1024) | ||
58 | + n, err := rd.Write(buf.Bytes()) | ||
59 | + if err != nil { | ||
60 | + logger.Error(err) | ||
61 | + return | ||
62 | + } | ||
63 | + rd.Flush() | ||
64 | + logger.Debug("write:n: %d, msg: %s", n, buf.Bytes()) | ||
65 | + | ||
66 | + n, err = rd.Read(b1) | ||
67 | + if err != nil { | ||
68 | + logger.Error(err) | ||
69 | + return | ||
70 | + } | ||
71 | + logger.Debug("recv: %s, n: %d\n", b1, n) | ||
72 | + time.Sleep(5*time.Second) | ||
73 | + } | ||
74 | + | ||
53 | } | 75 | } |
54 | \ No newline at end of file | 76 | \ No newline at end of file |
tools/protostostruct.go
@@ -11,11 +11,12 @@ import ( | @@ -11,11 +11,12 @@ import ( | ||
11 | ) | 11 | ) |
12 | 12 | ||
13 | var ( | 13 | var ( |
14 | - ProtoCode = "syntax = \"proto3\";\noption go_package = \"./pb;pb\";\n\npackage protocode;\n\nenum ProtoCode\n{\n UNKNOWN = 0x000;\n %s\n}" | ||
15 | - ProtoCodeLine = "\t%sRpc = %02x;\n" | 14 | + ProtoCode = "syntax = \"proto3\";\noption go_package = \"./pb;pb\";\n\npackage protocode;\n\nenum ProtoCode\n{\n UNKNOWN = 0;\n %s\n}" |
15 | + ProtoCodeLineReq = "\t%sReq = %d;\n" | ||
16 | + ProtoCodeLineRsp = "\t%sRsp = %d;\n" | ||
16 | 17 | ||
17 | GoProtoCodeStr = "package main\n\nimport (\n\t\"pro2d/protos/pb\"\n\t\"pro2d/src/components/logger\"\n\t\"pro2d/src/components/net\"\n)\n\nfunc init() {\n\tlogger.Debug(\"init protocode...\")\n\tnet.ActionMap = make(map[pb.ProtoCode]net.ActionHandler)\n\n%s\n}\n" | 18 | GoProtoCodeStr = "package main\n\nimport (\n\t\"pro2d/protos/pb\"\n\t\"pro2d/src/components/logger\"\n\t\"pro2d/src/components/net\"\n)\n\nfunc init() {\n\tlogger.Debug(\"init protocode...\")\n\tnet.ActionMap = make(map[pb.ProtoCode]net.ActionHandler)\n\n%s\n}\n" |
18 | - GoProtoCodeLine = "\tnet.ActionMap[pb.ProtoCode_%sRpc] = %sRpc\n" | 19 | + GoProtoCodeLine = "\tnet.ActionMap[pb.ProtoCode_%sReq] = %sRpc\n" |
19 | ) | 20 | ) |
20 | 21 | ||
21 | func ProtoToCode(readPath, filename string) (string, string) { | 22 | func ProtoToCode(readPath, filename string) (string, string) { |
@@ -54,13 +55,20 @@ func ProtoToCode(readPath, filename string) (string, string) { | @@ -54,13 +55,20 @@ func ProtoToCode(readPath, filename string) (string, string) { | ||
54 | } | 55 | } |
55 | lb := bytes.Split(line, []byte(" ")) | 56 | lb := bytes.Split(line, []byte(" ")) |
56 | for _, v := range lb { | 57 | for _, v := range lb { |
57 | - n = bytes.Index(v, []byte("Req")) | ||
58 | - if n < 0 { | 58 | + n1 := bytes.Index(v, []byte("Req")) |
59 | + n2 := bytes.Index(v, []byte("Rsp")) | ||
60 | + if n1 < 0 && n2 < 0 { | ||
59 | continue | 61 | continue |
60 | } | 62 | } |
61 | - code++ | ||
62 | - protoData += fmt.Sprintf(ProtoCodeLine, v[:n],code) | ||
63 | - goProtoData += fmt.Sprintf(GoProtoCodeLine, v[:n], v[:n]) | 63 | + if n1 >= 0 { |
64 | + code++ | ||
65 | + protoData += fmt.Sprintf(ProtoCodeLineReq, v[:n1],code) | ||
66 | + goProtoData += fmt.Sprintf(GoProtoCodeLine, v[:n1], v[:n1]) | ||
67 | + } | ||
68 | + if n2 >= 0 { | ||
69 | + code++ | ||
70 | + protoData += fmt.Sprintf(ProtoCodeLineRsp, v[:n2],code) | ||
71 | + } | ||
64 | } | 72 | } |
65 | } | 73 | } |
66 | 74 |
tools/protostostruct_test.go
1 | package main | 1 | package main |
2 | 2 | ||
3 | -import "testing" | 3 | +import ( |
4 | + "fmt" | ||
5 | + "testing" | ||
6 | +) | ||
4 | 7 | ||
5 | func TestReadProtos(t *testing.T) { | 8 | func TestReadProtos(t *testing.T) { |
6 | ReadProtos("/Users/mac/Documents/project/Pro2D/Pro2DServer/protos/", "/Users/mac/Documents/project/Pro2D/Pro2DServer/") | 9 | ReadProtos("/Users/mac/Documents/project/Pro2D/Pro2DServer/protos/", "/Users/mac/Documents/project/Pro2D/Pro2DServer/") |
7 | } | 10 | } |
11 | + | ||
12 | +func TestProtoToCode(t *testing.T) { | ||
13 | + fmt.Printf("0x%03x\n", 20) | ||
14 | +} | ||
8 | \ No newline at end of file | 15 | \ No newline at end of file |