From 98b0736dfc80fe77acc419f692ca248ef57d288d Mon Sep 17 00:00:00 2001 From: zqj <582132116@qq.com> Date: Thu, 10 Mar 2022 18:17:48 +0800 Subject: [PATCH] 添加定时器, 检查心跳 --- go.mod | 3 +++ go.sum | 6 ++++++ protos | 2 +- src/components/db/mongo.go | 5 +++++ src/components/db/schema.go | 43 +++++++++++++++++++++++++++++++++++++++++++ src/components/kafkatimer/bucket.go | 133 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/components/kafkatimer/delayqueue.go | 183 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/components/kafkatimer/delayqueue/delayqueue.go | 186 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/components/kafkatimer/timingwheel.go | 227 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/components/kafkatimer/timingwheel_test.go | 26 ++++++++++++++++++++++++++ src/components/kafkatimer/utils.go | 38 ++++++++++++++++++++++++++++++++++++++ src/components/net/conn.go | 13 +++++++++---- src/components/net/server.go | 9 +++++++-- src/components/skynettimer/timerwheel.go | 1 + src/plugin/RolePlugin.go | 15 ++++++++++----- src/plugin/protocode.go | 6 +++--- src/utils/utils.go | 5 +++++ test/client.go | 24 +++++++++++++++++++++++- tools/protostostruct.go | 24 ++++++++++++++++-------- tools/protostostruct_test.go | 9 ++++++++- 20 files changed, 933 insertions(+), 25 deletions(-) create mode 100644 src/components/db/schema.go create mode 100644 src/components/kafkatimer/bucket.go create mode 100644 src/components/kafkatimer/delayqueue.go create mode 100644 src/components/kafkatimer/delayqueue/delayqueue.go create mode 100644 src/components/kafkatimer/timingwheel.go create mode 100644 src/components/kafkatimer/timingwheel_test.go create mode 100644 src/components/kafkatimer/utils.go create mode 100644 src/components/skynettimer/timerwheel.go diff --git a/go.mod b/go.mod index 61b3bba..f4cb55b 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,14 @@ module pro2d go 1.17 require ( + github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/garyburd/redigo v1.6.3 github.com/gin-gonic/gin v1.7.7 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/protobuf v1.5.2 + github.com/ouqiang/timewheel v1.0.1 go.etcd.io/etcd/api/v3 v3.5.2 go.etcd.io/etcd/client/v3 v3.5.2 go.mongodb.org/mongo-driver v1.8.3 @@ -35,6 +37,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/robfig/cron v1.2.0 // indirect github.com/ugorji/go/codec v1.1.7 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.0.2 // indirect diff --git a/go.sum b/go.sum index e7cf565..8619ac7 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7oFtbDZXC4dnY093M1kZx6k/95sen92gafbY= +github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -128,6 +130,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/ouqiang/timewheel v1.0.1 h1:XxhrYwqhJ3z8nthEnhZcHyZ/dcE29ACJEJR3Ika0W2g= +github.com/ouqiang/timewheel v1.0.1/go.mod h1:896mz+8zvRU6i0PLVR0qaNuU5roxC874OB4TxUvUewY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -149,6 +153,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= diff --git a/protos b/protos index a2411bb..c29c12e 160000 --- a/protos +++ b/protos @@ -1 +1 @@ -Subproject commit a2411bbcfc631805718ef4d692a42ba0200e698c +Subproject commit c29c12e0a07ed8e3a35a451ca7dbdadbc4bb70fb diff --git a/src/components/db/mongo.go b/src/components/db/mongo.go index 94c26e6..befd4a6 100644 --- a/src/components/db/mongo.go +++ b/src/components/db/mongo.go @@ -11,6 +11,7 @@ import ( "pro2d/src/utils" "sort" "strconv" + "strings" "time" ) @@ -221,6 +222,10 @@ func (m *MgoColl) Update(update interface{}) { m.FindOneAndUpdate(m.pri, update) } +func (m *MgoColl) UpdateProperty(key string, val interface{}) { + m.FindOneAndUpdate(m.pri, bson.M{strings.ToLower(key): val}) +} + func (m *MgoColl)Save() { m.FindOneAndUpdate(m.pri, m.schema) } \ No newline at end of file diff --git a/src/components/db/schema.go b/src/components/db/schema.go new file mode 100644 index 0000000..b2847d0 --- /dev/null +++ b/src/components/db/schema.go @@ -0,0 +1,43 @@ +package db + +import ( + "reflect" + "strings" +) + +type Schema struct { + reflect.Type + pri interface{} + schema interface{} +} + +func NewSchema(pri, schema interface{}) *Schema { + s := reflect.TypeOf(schema) + if s.Kind() == reflect.Ptr { + s = reflect.TypeOf(s).Elem() + } + return &Schema{ + Type: s, + pri: pri, + schema: schema, + } +} + +func (s *Schema)GetSchemaType() reflect.Type { + return s.Type +} + +func (s *Schema)GetCollName() string { + return strings.ToLower(s.GetSchemaType().Name()) +} + +func (s *Schema)GetPriKey() string { + var pri string + for i := 0; i < s.NumField(); i++ { + if s.Field(i).Tag.Get("pri") == "1" { + pri = strings.ToLower(s.Field(i).Name) + break + } + } + return pri +} diff --git a/src/components/kafkatimer/bucket.go b/src/components/kafkatimer/bucket.go new file mode 100644 index 0000000..1e13f85 --- /dev/null +++ b/src/components/kafkatimer/bucket.go @@ -0,0 +1,133 @@ +package kafkatimer + +import ( + "container/list" + "sync" + "sync/atomic" + "unsafe" +) + +// Timer represents a single event. When the Timer expires, the given +// task will be executed. +type Timer struct { + expiration int64 // in milliseconds + task func() + + // The bucket that holds the list to which this timer's element belongs. + // + // NOTE: This field may be updated and read concurrently, + // through Timer.Stop() and Bucket.Flush(). + b unsafe.Pointer // type: *bucket + + // The timer's element. + element *list.Element +} + +func (t *Timer) getBucket() *bucket { + return (*bucket)(atomic.LoadPointer(&t.b)) +} + +func (t *Timer) setBucket(b *bucket) { + atomic.StorePointer(&t.b, unsafe.Pointer(b)) +} + +// Stop prevents the Timer from firing. It returns true if the call +// stops the timer, false if the timer has already expired or been stopped. +// +// If the timer t has already expired and the t.task has been started in its own +// goroutine; Stop does not wait for t.task to complete before returning. If the caller +// needs to know whether t.task is completed, it must coordinate with t.task explicitly. +func (t *Timer) Stop() bool { + stopped := false + for b := t.getBucket(); b != nil; b = t.getBucket() { + // If b.Remove is called just after the timing wheel's goroutine has: + // 1. removed t from b (through b.Flush -> b.remove) + // 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add) + // this may fail to remove t due to the change of t's bucket. + stopped = b.Remove(t) + + // Thus, here we re-get t's possibly new bucket (nil for case 1, or ab (non-nil) for case 2), + // and retry until the bucket becomes nil, which indicates that t has finally been removed. + } + return stopped +} + +type bucket struct { + // 64-bit atomic operations require 64-bit alignment, but 32-bit + // compilers do not ensure it. So we must keep the 64-bit field + // as the first field of the struct. + // + // For more explanations, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG + // and https://go101.org/article/memory-layout.html. + expiration int64 + + mu sync.Mutex + timers *list.List +} + +func newBucket() *bucket { + return &bucket{ + timers: list.New(), + expiration: -1, + } +} + +func (b *bucket) Expiration() int64 { + return atomic.LoadInt64(&b.expiration) +} + +func (b *bucket) SetExpiration(expiration int64) bool { + return atomic.SwapInt64(&b.expiration, expiration) != expiration +} + +func (b *bucket) Add(t *Timer) { + b.mu.Lock() + + e := b.timers.PushBack(t) + t.setBucket(b) + t.element = e + + b.mu.Unlock() +} + +func (b *bucket) remove(t *Timer) bool { + if t.getBucket() != b { + // If remove is called from within t.Stop, and this happens just after the timing wheel's goroutine has: + // 1. removed t from b (through b.Flush -> b.remove) + // 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add) + // then t.getBucket will return nil for case 1, or ab (non-nil) for case 2. + // In either case, the returned value does not equal to b. + return false + } + b.timers.Remove(t.element) + t.setBucket(nil) + t.element = nil + return true +} + +func (b *bucket) Remove(t *Timer) bool { + b.mu.Lock() + defer b.mu.Unlock() + return b.remove(t) +} + +func (b *bucket) Flush(reinsert func(*Timer)) { + b.mu.Lock() + defer b.mu.Unlock() + + for e := b.timers.Front(); e != nil; { + next := e.Next() + + t := e.Value.(*Timer) + b.remove(t) + // Note that this operation will either execute the timer's task, or + // insert the timer into another bucket belonging to a lower-level wheel. + // + // In either case, no further lock operation will happen to b.mu. + reinsert(t) + + e = next + } + + b.SetExpiration(-1) +} diff --git a/src/components/kafkatimer/delayqueue.go b/src/components/kafkatimer/delayqueue.go new file mode 100644 index 0000000..e33da29 --- /dev/null +++ b/src/components/kafkatimer/delayqueue.go @@ -0,0 +1,183 @@ +package kafkatimer + +import ( + "container/heap" + "sync" + "sync/atomic" + "time" +) + +type item struct { + Value interface{} + Priority int64 + Index int +} + +// this is a priority queue as implemented by a min heap +// ie. the 0th element is the *lowest* value +type priorityQueue []*item + +func newPriorityQueue(capacity int) priorityQueue { + return make(priorityQueue, 0, capacity) +} + +func (pq priorityQueue) Len() int { + return len(pq) +} + +func (pq priorityQueue) Less(i, j int) bool { + return pq[i].Priority < pq[j].Priority +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].Index = i + pq[j].Index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + c := cap(*pq) + if n+1 > c { + npq := make(priorityQueue, n, c*2) + copy(npq, *pq) + *pq = npq + } + *pq = (*pq)[0 : n+1] + item := x.(*item) + item.Index = n + (*pq)[n] = item +} + +func (pq *priorityQueue) Pop() interface{} { + n := len(*pq) + c := cap(*pq) + if n < (c/2) && c > 25 { + npq := make(priorityQueue, n, c/2) + copy(npq, *pq) + *pq = npq + } + item := (*pq)[n-1] + item.Index = -1 + *pq = (*pq)[0 : n-1] + return item +} + +func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) { + if pq.Len() == 0 { + return nil, 0 + } + + item := (*pq)[0] + if item.Priority > max { + return nil, item.Priority - max + } + heap.Remove(pq, 0) + + return item, 0 +} + +// The end of PriorityQueue implementation. + +// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which +// an element can only be taken when its delay has expired. The head of the +// queue is the *Delayed* element whose delay expired furthest in the past. +type DelayQueue struct { + C chan interface{} + + mu sync.Mutex + pq priorityQueue + + // Similar to the sleeping state of runtime.timers. + sleeping int32 + wakeupC chan struct{} +} + +// New creates an instance of delayQueue with the specified size. +func New(size int) *DelayQueue { + return &DelayQueue{ + C: make(chan interface{}), + pq: newPriorityQueue(size), + wakeupC: make(chan struct{}), + } +} + +// Offer inserts the element into the current queue. +func (dq *DelayQueue) Offer(elem interface{}, expiration int64) { + item := &item{Value: elem, Priority: expiration} + + dq.mu.Lock() + heap.Push(&dq.pq, item) + index := item.Index + dq.mu.Unlock() + + if index == 0 { + // A new item with the earliest expiration is added. + if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) { + dq.wakeupC <- struct{}{} + } + } +} + +// Poll starts an infinite loop, in which it continually waits for an element +// to expire and then send the expired element to the channel C. +func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) { + for { + now := nowF() + + dq.mu.Lock() + item, delta := dq.pq.PeekAndShift(now) + if item == nil { + // No items left or at least one item is pending. + + // We must ensure the atomicity of the whole operation, which is + // composed of the above PeekAndShift and the following StoreInt32, + // to avoid possible race conditions between Offer and Poll. + atomic.StoreInt32(&dq.sleeping, 1) + } + dq.mu.Unlock() + + if item == nil { + if delta == 0 { + // No items left. + select { + case <-dq.wakeupC: + // Wait until a new item is added. + continue + case <-exitC: + goto exit + } + } else if delta > 0 { + // At least one item is pending. + select { + case <-dq.wakeupC: + // A new item with an "earlier" expiration than the current "earliest" one is added. + continue + case <-time.After(time.Duration(delta) * time.Millisecond): + // The current "earliest" item expires. + + // Reset the sleeping state since there's no need to receive from wakeupC. + if atomic.SwapInt32(&dq.sleeping, 0) == 0 { + // A caller of Offer() is being blocked on sending to wakeupC, + // drain wakeupC to unblock the caller. + <-dq.wakeupC + } + continue + case <-exitC: + goto exit + } + } + } + + select { + case dq.C <- item.Value: + // The expired element has been sent out successfully. + case <-exitC: + goto exit + } + } + +exit: + // Reset the states + atomic.StoreInt32(&dq.sleeping, 0) +} diff --git a/src/components/kafkatimer/delayqueue/delayqueue.go b/src/components/kafkatimer/delayqueue/delayqueue.go new file mode 100644 index 0000000..82e6a9d --- /dev/null +++ b/src/components/kafkatimer/delayqueue/delayqueue.go @@ -0,0 +1,186 @@ +package delayqueue + +import ( + "container/heap" + "sync" + "sync/atomic" + "time" +) + +// The start of PriorityQueue implementation. +// Borrowed from https://github.com/nsqio/nsq/blob/master/internal/pqueue/pqueue.go + +type item struct { + Value interface{} + Priority int64 + Index int +} + +// this is a priority queue as implemented by a min heap +// ie. the 0th element is the *lowest* value +type priorityQueue []*item + +func newPriorityQueue(capacity int) priorityQueue { + return make(priorityQueue, 0, capacity) +} + +func (pq priorityQueue) Len() int { + return len(pq) +} + +func (pq priorityQueue) Less(i, j int) bool { + return pq[i].Priority < pq[j].Priority +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].Index = i + pq[j].Index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + c := cap(*pq) + if n+1 > c { + npq := make(priorityQueue, n, c*2) + copy(npq, *pq) + *pq = npq + } + *pq = (*pq)[0 : n+1] + item := x.(*item) + item.Index = n + (*pq)[n] = item +} + +func (pq *priorityQueue) Pop() interface{} { + n := len(*pq) + c := cap(*pq) + if n < (c/2) && c > 25 { + npq := make(priorityQueue, n, c/2) + copy(npq, *pq) + *pq = npq + } + item := (*pq)[n-1] + item.Index = -1 + *pq = (*pq)[0 : n-1] + return item +} + +func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) { + if pq.Len() == 0 { + return nil, 0 + } + + item := (*pq)[0] + if item.Priority > max { + return nil, item.Priority - max + } + heap.Remove(pq, 0) + + return item, 0 +} + +// The end of PriorityQueue implementation. + +// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which +// an element can only be taken when its delay has expired. The head of the +// queue is the *Delayed* element whose delay expired furthest in the past. +type DelayQueue struct { + C chan interface{} + + mu sync.Mutex + pq priorityQueue + + // Similar to the sleeping state of runtime.timers. + sleeping int32 + wakeupC chan struct{} +} + +// New creates an instance of delayQueue with the specified size. +func New(size int) *DelayQueue { + return &DelayQueue{ + C: make(chan interface{}), + pq: newPriorityQueue(size), + wakeupC: make(chan struct{}), + } +} + +// Offer inserts the element into the current queue. +func (dq *DelayQueue) Offer(elem interface{}, expiration int64) { + item := &item{Value: elem, Priority: expiration} + + dq.mu.Lock() + heap.Push(&dq.pq, item) + index := item.Index + dq.mu.Unlock() + + if index == 0 { + // A new item with the earliest expiration is added. + if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) { + dq.wakeupC <- struct{}{} + } + } +} + +// Poll starts an infinite loop, in which it continually waits for an element +// to expire and then send the expired element to the channel C. +func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) { + for { + now := nowF() + + dq.mu.Lock() + item, delta := dq.pq.PeekAndShift(now) + if item == nil { + // No items left or at least one item is pending. + + // We must ensure the atomicity of the whole operation, which is + // composed of the above PeekAndShift and the following StoreInt32, + // to avoid possible race conditions between Offer and Poll. + atomic.StoreInt32(&dq.sleeping, 1) + } + dq.mu.Unlock() + + if item == nil { + if delta == 0 { + // No items left. + select { + case <-dq.wakeupC: + // Wait until a new item is added. + continue + case <-exitC: + goto exit + } + } else if delta > 0 { + // At least one item is pending. + select { + case <-dq.wakeupC: + // A new item with an "earlier" expiration than the current "earliest" one is added. + continue + case <-time.After(time.Duration(delta) * time.Millisecond): + // The current "earliest" item expires. + + // Reset the sleeping state since there's no need to receive from wakeupC. + if atomic.SwapInt32(&dq.sleeping, 0) == 0 { + // A caller of Offer() is being blocked on sending to wakeupC, + // drain wakeupC to unblock the caller. + <-dq.wakeupC + } + continue + case <-exitC: + goto exit + } + } + } + + select { + case dq.C <- item.Value: + // The expired element has been sent out successfully. + case <-exitC: + goto exit + } + } + +exit: + // Reset the states + atomic.StoreInt32(&dq.sleeping, 0) +} diff --git a/src/components/kafkatimer/timingwheel.go b/src/components/kafkatimer/timingwheel.go new file mode 100644 index 0000000..52b7f1d --- /dev/null +++ b/src/components/kafkatimer/timingwheel.go @@ -0,0 +1,227 @@ +package kafkatimer + +import ( + "errors" + "sync/atomic" + "time" + "unsafe" + + "github.com/RussellLuo/timingwheel/delayqueue" +) +//时间轮 kafka + +// TimingWheel is an implementation of Hierarchical Timing Wheels. +type TimingWheel struct { + tick int64 // in milliseconds + wheelSize int64 + + interval int64 // in milliseconds + currentTime int64 // in milliseconds + buckets []*bucket + queue *delayqueue.DelayQueue + + // The higher-level overflow wheel. + // + // NOTE: This field may be updated and read concurrently, through Add(). + overflowWheel unsafe.Pointer // type: *TimingWheel + + exitC chan struct{} + waitGroup waitGroupWrapper +} + +// NewTimingWheel creates an instance of TimingWheel with the given tick and wheelSize. +func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel { + tickMs := int64(tick / time.Millisecond) + if tickMs <= 0 { + panic(errors.New("tick must be greater than or equal to 1ms")) + } + + startMs := timeToMs(time.Now().UTC()) + + return newTimingWheel( + tickMs, + wheelSize, + startMs, + delayqueue.New(int(wheelSize)), + ) +} + +// newTimingWheel is an internal helper function that really creates an instance of TimingWheel. +func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel { + buckets := make([]*bucket, wheelSize) + for i := range buckets { + buckets[i] = newBucket() + } + return &TimingWheel{ + tick: tickMs, + wheelSize: wheelSize, + currentTime: truncate(startMs, tickMs), + interval: tickMs * wheelSize, + buckets: buckets, + queue: queue, + exitC: make(chan struct{}), + } +} + +// add inserts the timer t into the current timing wheel. +func (tw *TimingWheel) add(t *Timer) bool { + currentTime := atomic.LoadInt64(&tw.currentTime) + if t.expiration < currentTime+tw.tick { + // Already expired + return false + } else if t.expiration < currentTime+tw.interval { + // Put it into its own bucket + virtualID := t.expiration / tw.tick + b := tw.buckets[virtualID%tw.wheelSize] + b.Add(t) + + // Set the bucket expiration time + if b.SetExpiration(virtualID * tw.tick) { + // The bucket needs to be enqueued since it was an expired bucket. + // We only need to enqueue the bucket when its expiration time has changed, + // i.e. the wheel has advanced and this bucket get reused with a new expiration. + // Any further calls to set the expiration within the same wheel cycle will + // pass in the same value and hence return false, thus the bucket with the + // same expiration will not be enqueued multiple times. + tw.queue.Offer(b, b.Expiration()) + } + + return true + } else { + // Out of the interval. Put it into the overflow wheel + overflowWheel := atomic.LoadPointer(&tw.overflowWheel) + if overflowWheel == nil { + atomic.CompareAndSwapPointer( + &tw.overflowWheel, + nil, + unsafe.Pointer(newTimingWheel( + tw.interval, + tw.wheelSize, + currentTime, + tw.queue, + )), + ) + overflowWheel = atomic.LoadPointer(&tw.overflowWheel) + } + return (*TimingWheel)(overflowWheel).add(t) + } +} + +// addOrRun inserts the timer t into the current timing wheel, or run the +// timer's task if it has already expired. +func (tw *TimingWheel) addOrRun(t *Timer) { + if !tw.add(t) { + // Already expired + + // Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc), + // always execute the timer's task in its own goroutine. + go t.task() + } +} + +func (tw *TimingWheel) advanceClock(expiration int64) { + currentTime := atomic.LoadInt64(&tw.currentTime) + if expiration >= currentTime+tw.tick { + currentTime = truncate(expiration, tw.tick) + atomic.StoreInt64(&tw.currentTime, currentTime) + + // Try to advance the clock of the overflow wheel if present + overflowWheel := atomic.LoadPointer(&tw.overflowWheel) + if overflowWheel != nil { + (*TimingWheel)(overflowWheel).advanceClock(currentTime) + } + } +} + +// Start starts the current timing wheel. +func (tw *TimingWheel) Start() { + tw.waitGroup.Wrap(func() { + tw.queue.Poll(tw.exitC, func() int64 { + return timeToMs(time.Now().UTC()) + }) + }) + + tw.waitGroup.Wrap(func() { + for { + select { + case elem := <-tw.queue.C: + b := elem.(*bucket) + tw.advanceClock(b.Expiration()) + b.Flush(tw.addOrRun) + case <-tw.exitC: + return + } + } + }) +} + +// Stop stops the current timing wheel. +// +// If there is any timer's task being running in its own goroutine, Stop does +// not wait for the task to complete before returning. If the caller needs to +// know whether the task is completed, it must coordinate with the task explicitly. +func (tw *TimingWheel) Stop() { + close(tw.exitC) + tw.waitGroup.Wait() +} + +// AfterFunc waits for the duration to elapse and then calls f in its own goroutine. +// It returns a Timer that can be used to cancel the call using its Stop method. +func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer { + t := &Timer{ + expiration: timeToMs(time.Now().UTC().Add(d)), + task: f, + } + tw.addOrRun(t) + return t +} + +// Scheduler determines the execution plan of a task. +type Scheduler interface { + // Next returns the next execution time after the given (previous) time. + // It will return a zero time if no next time is scheduled. + // + // All times must be UTC. + Next(time.Time) time.Time +} + +// ScheduleFunc calls f (in its own goroutine) according to the execution +// plan scheduled by s. It returns a Timer that can be used to cancel the +// call using its Stop method. +// +// If the caller want to terminate the execution plan halfway, it must +// stop the timer and ensure that the timer is stopped actually, since in +// the current implementation, there is a gap between the expiring and the +// restarting of the timer. The wait time for ensuring is short since the +// gap is very small. +// +// Internally, ScheduleFunc will ask the first execution time (by calling +// s.Next()) initially, and create a timer if the execution time is non-zero. +// Afterwards, it will ask the next execution time each time f is about to +// be executed, and f will be called at the next execution time if the time +// is non-zero. +func (tw *TimingWheel) ScheduleFunc(s Scheduler, f func()) (t *Timer) { + expiration := s.Next(time.Now().UTC()) + if expiration.IsZero() { + // No time is scheduled, return nil. + return + } + + t = &Timer{ + expiration: timeToMs(expiration), + task: func() { + // Schedule the task to execute at the next time if possible. + expiration := s.Next(msToTime(t.expiration)) + if !expiration.IsZero() { + t.expiration = timeToMs(expiration) + tw.addOrRun(t) + } + + // Actually execute the task. + f() + }, + } + tw.addOrRun(t) + + return +} diff --git a/src/components/kafkatimer/timingwheel_test.go b/src/components/kafkatimer/timingwheel_test.go new file mode 100644 index 0000000..dbb6858 --- /dev/null +++ b/src/components/kafkatimer/timingwheel_test.go @@ -0,0 +1,26 @@ +package kafkatimer + +import ( + "fmt" + "testing" + "time" +) + +func F() { + fmt.Println("I'm timer...") +} + + +func TestTimingWheel_Start(t *testing.T) { + tw := NewTimingWheel(time.Millisecond, 20) + tw.Start() + defer tw.Stop() + + exitC := make(chan time.Time, 1) + tw.AfterFunc(10 * time.Second, func() { + fmt.Println("The timer fires") + exitC <- time.Now().UTC() + }) + + fmt.Println(<-exitC) +} \ No newline at end of file diff --git a/src/components/kafkatimer/utils.go b/src/components/kafkatimer/utils.go new file mode 100644 index 0000000..42c8220 --- /dev/null +++ b/src/components/kafkatimer/utils.go @@ -0,0 +1,38 @@ +package kafkatimer + +import ( + "sync" + "time" +) + +// truncate returns the result of rounding x toward zero to a multiple of m. +// If m <= 0, Truncate returns x unchanged. +func truncate(x, m int64) int64 { + if m <= 0 { + return x + } + return x - x%m +} + +// timeToMs returns an integer number, which represents t in milliseconds. +func timeToMs(t time.Time) int64 { + return t.UnixNano() / int64(time.Millisecond) +} + +// msToTime returns the UTC time corresponding to the given Unix time, +// t milliseconds since January 1, 1970 UTC. +func msToTime(t int64) time.Time { + return time.Unix(0, t*int64(time.Millisecond)).UTC() +} + +type waitGroupWrapper struct { + sync.WaitGroup +} + +func (w *waitGroupWrapper) Wrap(cb func()) { + w.Add(1) + go func() { + cb() + w.Done() + }() +} diff --git a/src/components/net/conn.go b/src/components/net/conn.go index 584a0ef..781286f 100644 --- a/src/components/net/conn.go +++ b/src/components/net/conn.go @@ -54,14 +54,16 @@ func (c *Connection) write() { defer c.Quiting() for msg := range c.WBuffer { - if _, err := c.writer.Write(msg); err != nil { - fmt.Println("write fail err: " + err.Error()) + n, err := c.writer.Write(msg) + if err != nil{ + logger.Error("write fail err: " + err.Error(), "n: ", n) return } if err := c.writer.Flush(); err != nil { - fmt.Println("write Flush fail err: " + err.Error()) + logger.Error("write Flush fail err: " + err.Error()) return } + logger.Debug("write :%s, n: %d", msg, n) } } @@ -95,11 +97,14 @@ func (c *Connection) read() { func (c *Connection) Start() { go c.write() go c.read() + //for { + // c.SendMsgByCode(100, 1, nil) + // time.Sleep(2*time.Second) + //} } func (c *Connection) Stop() { close(c.RBuffer) - close(c.WBuffer) c.Conn.Close() } diff --git a/src/components/net/server.go b/src/components/net/server.go index 5f0166d..0ec6aa7 100644 --- a/src/components/net/server.go +++ b/src/components/net/server.go @@ -36,6 +36,10 @@ func NewServer(sConf *conf.SConf) *Server { } } +func (s *Server) StartTimer() { + +} + //StartWorkerPool 启动worker工作池 func (s *Server) StartWorkerPool() { //遍历需要启动worker的数量,依此启动 @@ -62,11 +66,11 @@ func (s *Server) StartOneWorker(workerID int, taskQueue chan *MsgPkg) { } func (s *Server) DoMsgHandler(msg *MsgPkg) { - logger.Debug("DoMsgHandler cmd: %d, data: %s", msg.Head.Cmd, msg.Body) if md, ok := ActionMap[pb.ProtoCode(msg.Head.Cmd)]; ok { - logger.Debug("adfadfadfasdfadfadsf") + logger.Debug("protocode handler: %d", msg.Head.Cmd) errCode, protomsg := md(msg) rsp, err := proto.Marshal(protomsg) + fmt.Printf("errCode: %d, protomsg:%v\n", errCode, protomsg) if err != nil { msg.Conn.SendMsgByCode(-100, msg.Head.Cmd, nil) return @@ -78,6 +82,7 @@ func (s *Server) DoMsgHandler(msg *MsgPkg) { } func (s *Server) OnClose(conn *Connection) { + //conn.Stop() s.Clients.Delete(conn.Id) } diff --git a/src/components/skynettimer/timerwheel.go b/src/components/skynettimer/timerwheel.go new file mode 100644 index 0000000..0c70a33 --- /dev/null +++ b/src/components/skynettimer/timerwheel.go @@ -0,0 +1 @@ +package skynettimer diff --git a/src/plugin/RolePlugin.go b/src/plugin/RolePlugin.go index 235d45f..d75561b 100644 --- a/src/plugin/RolePlugin.go +++ b/src/plugin/RolePlugin.go @@ -45,11 +45,16 @@ func LoginRpc(msg *net.MsgPkg) (int32, proto.Message) { if role == nil { return 2, nil } + role.UpdateProperty("Device", req.Device) - return 0, &pb.RoleRsp{ - Role: role.Role, - Hero: nil, - Team: nil, - Equips: nil, + //return 0, &pb.RoleRsp{ + // Role: role.Role, + // Hero: nil, + // Team: nil, + // Equips: nil, + //} + return 0, &pb.LoginResponse{ + Uid: role.Role.Uid, + Device: role.Role.Device, } } \ No newline at end of file diff --git a/src/plugin/protocode.go b/src/plugin/protocode.go index 6286e2c..5fbdfda 100644 --- a/src/plugin/protocode.go +++ b/src/plugin/protocode.go @@ -10,8 +10,8 @@ func init() { logger.Debug("init protocode...") net.ActionMap = make(map[pb.ProtoCode]net.ActionHandler) - net.ActionMap[pb.ProtoCode_HeartRpc] = HeartRpc - net.ActionMap[pb.ProtoCode_LoginRpc] = LoginRpc - net.ActionMap[pb.ProtoCode_CreateRpc] = CreateRpc + net.ActionMap[pb.ProtoCode_HeartReq] = HeartRpc + net.ActionMap[pb.ProtoCode_LoginReq] = LoginRpc + net.ActionMap[pb.ProtoCode_CreateReq] = CreateRpc } diff --git a/src/utils/utils.go b/src/utils/utils.go index a81b4bd..64f37a0 100644 --- a/src/utils/utils.go +++ b/src/utils/utils.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "reflect" "strings" + "time" ) func GetSchemaType(schema interface{}) reflect.Type { @@ -52,4 +53,8 @@ func Md5V(str string) string { h := md5.New() h.Write([]byte(str)) return hex.EncodeToString(h.Sum(nil)) +} + +func Timex() int64 { + return time.Now().UnixMicro() } \ No newline at end of file diff --git a/test/client.go b/test/client.go index e6a8999..258bbbd 100644 --- a/test/client.go +++ b/test/client.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "bytes" "encoding/binary" "github.com/golang/protobuf/proto" @@ -8,6 +9,7 @@ import ( "pro2d/protos/pb" "pro2d/src/components/logger" net2 "pro2d/src/components/net" + "time" ) func main() { @@ -49,5 +51,25 @@ func main() { logger.Error(err) return } - client.Write(buf.Bytes()) + + rd := bufio.NewReadWriter(bufio.NewReader(client), bufio.NewWriter(client)) + for { + b1 := make([]byte, 1024) + n, err := rd.Write(buf.Bytes()) + if err != nil { + logger.Error(err) + return + } + rd.Flush() + logger.Debug("write:n: %d, msg: %s", n, buf.Bytes()) + + n, err = rd.Read(b1) + if err != nil { + logger.Error(err) + return + } + logger.Debug("recv: %s, n: %d\n", b1, n) + time.Sleep(5*time.Second) + } + } \ No newline at end of file diff --git a/tools/protostostruct.go b/tools/protostostruct.go index 3db1f23..da19c7e 100644 --- a/tools/protostostruct.go +++ b/tools/protostostruct.go @@ -11,11 +11,12 @@ import ( ) var ( - ProtoCode = "syntax = \"proto3\";\noption go_package = \"./pb;pb\";\n\npackage protocode;\n\nenum ProtoCode\n{\n UNKNOWN = 0x000;\n %s\n}" - ProtoCodeLine = "\t%sRpc = %02x;\n" + ProtoCode = "syntax = \"proto3\";\noption go_package = \"./pb;pb\";\n\npackage protocode;\n\nenum ProtoCode\n{\n UNKNOWN = 0;\n %s\n}" + ProtoCodeLineReq = "\t%sReq = %d;\n" + ProtoCodeLineRsp = "\t%sRsp = %d;\n" GoProtoCodeStr = "package main\n\nimport (\n\t\"pro2d/protos/pb\"\n\t\"pro2d/src/components/logger\"\n\t\"pro2d/src/components/net\"\n)\n\nfunc init() {\n\tlogger.Debug(\"init protocode...\")\n\tnet.ActionMap = make(map[pb.ProtoCode]net.ActionHandler)\n\n%s\n}\n" - GoProtoCodeLine = "\tnet.ActionMap[pb.ProtoCode_%sRpc] = %sRpc\n" + GoProtoCodeLine = "\tnet.ActionMap[pb.ProtoCode_%sReq] = %sRpc\n" ) func ProtoToCode(readPath, filename string) (string, string) { @@ -54,13 +55,20 @@ func ProtoToCode(readPath, filename string) (string, string) { } lb := bytes.Split(line, []byte(" ")) for _, v := range lb { - n = bytes.Index(v, []byte("Req")) - if n < 0 { + n1 := bytes.Index(v, []byte("Req")) + n2 := bytes.Index(v, []byte("Rsp")) + if n1 < 0 && n2 < 0 { continue } - code++ - protoData += fmt.Sprintf(ProtoCodeLine, v[:n],code) - goProtoData += fmt.Sprintf(GoProtoCodeLine, v[:n], v[:n]) + if n1 >= 0 { + code++ + protoData += fmt.Sprintf(ProtoCodeLineReq, v[:n1],code) + goProtoData += fmt.Sprintf(GoProtoCodeLine, v[:n1], v[:n1]) + } + if n2 >= 0 { + code++ + protoData += fmt.Sprintf(ProtoCodeLineRsp, v[:n2],code) + } } } diff --git a/tools/protostostruct_test.go b/tools/protostostruct_test.go index c37f0d8..a13aa5b 100644 --- a/tools/protostostruct_test.go +++ b/tools/protostostruct_test.go @@ -1,7 +1,14 @@ package main -import "testing" +import ( + "fmt" + "testing" +) func TestReadProtos(t *testing.T) { ReadProtos("/Users/mac/Documents/project/Pro2D/Pro2DServer/protos/", "/Users/mac/Documents/project/Pro2D/Pro2DServer/") } + +func TestProtoToCode(t *testing.T) { + fmt.Printf("0x%03x\n", 20) +} \ No newline at end of file -- libgit2 0.21.2