diff --git a/cmd/game.go b/cmd/game.go index 7cea249..d650402 100644 --- a/cmd/game.go +++ b/cmd/game.go @@ -6,6 +6,7 @@ import ( "pro2d/conf" "pro2d/src/components/logger" "pro2d/src/components/net" + _ "pro2d/src/plugin" "syscall" ) diff --git a/protos b/protos index c29c12e..3aa4793 160000 --- a/protos +++ b/protos @@ -1 +1 @@ -Subproject commit c29c12e0a07ed8e3a35a451ca7dbdadbc4bb70fb +Subproject commit 3aa479358834e2ea24c1c1b75c739b0524de1c4f diff --git a/src/common/common.go b/src/common/common.go index a4cb375..e8528f1 100644 --- a/src/common/common.go +++ b/src/common/common.go @@ -1,14 +1,21 @@ package common const ( + //协程池 大小 WorkerPoolSize = 10 MaxTaskPerWorker = 100 - MaxPacketSize = 10 * 1024 * 1024 - MaxMsgChanLen = 20 -) -const ( + //包头 HEADLEN = 16 + //jwt Pro2DTokenSignedString = "Pro2DSecret" + + //定时器 + TickMS = 10 + WheelSize = 3600 + + //心跳 + HEART_TIMER_INTERVAL = 5 //s + HEART_TIMEOUT_COUNT_MAX = 20 //最大超时次数 ) diff --git a/src/components/kafkatimer/bucket.go b/src/components/kafkatimer/bucket.go deleted file mode 100644 index 1e13f85..0000000 --- a/src/components/kafkatimer/bucket.go +++ /dev/null @@ -1,133 +0,0 @@ -package kafkatimer - -import ( - "container/list" - "sync" - "sync/atomic" - "unsafe" -) - -// Timer represents a single event. When the Timer expires, the given -// task will be executed. -type Timer struct { - expiration int64 // in milliseconds - task func() - - // The bucket that holds the list to which this timer's element belongs. - // - // NOTE: This field may be updated and read concurrently, - // through Timer.Stop() and Bucket.Flush(). - b unsafe.Pointer // type: *bucket - - // The timer's element. - element *list.Element -} - -func (t *Timer) getBucket() *bucket { - return (*bucket)(atomic.LoadPointer(&t.b)) -} - -func (t *Timer) setBucket(b *bucket) { - atomic.StorePointer(&t.b, unsafe.Pointer(b)) -} - -// Stop prevents the Timer from firing. It returns true if the call -// stops the timer, false if the timer has already expired or been stopped. -// -// If the timer t has already expired and the t.task has been started in its own -// goroutine; Stop does not wait for t.task to complete before returning. If the caller -// needs to know whether t.task is completed, it must coordinate with t.task explicitly. -func (t *Timer) Stop() bool { - stopped := false - for b := t.getBucket(); b != nil; b = t.getBucket() { - // If b.Remove is called just after the timing wheel's goroutine has: - // 1. removed t from b (through b.Flush -> b.remove) - // 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add) - // this may fail to remove t due to the change of t's bucket. - stopped = b.Remove(t) - - // Thus, here we re-get t's possibly new bucket (nil for case 1, or ab (non-nil) for case 2), - // and retry until the bucket becomes nil, which indicates that t has finally been removed. - } - return stopped -} - -type bucket struct { - // 64-bit atomic operations require 64-bit alignment, but 32-bit - // compilers do not ensure it. So we must keep the 64-bit field - // as the first field of the struct. - // - // For more explanations, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG - // and https://go101.org/article/memory-layout.html. - expiration int64 - - mu sync.Mutex - timers *list.List -} - -func newBucket() *bucket { - return &bucket{ - timers: list.New(), - expiration: -1, - } -} - -func (b *bucket) Expiration() int64 { - return atomic.LoadInt64(&b.expiration) -} - -func (b *bucket) SetExpiration(expiration int64) bool { - return atomic.SwapInt64(&b.expiration, expiration) != expiration -} - -func (b *bucket) Add(t *Timer) { - b.mu.Lock() - - e := b.timers.PushBack(t) - t.setBucket(b) - t.element = e - - b.mu.Unlock() -} - -func (b *bucket) remove(t *Timer) bool { - if t.getBucket() != b { - // If remove is called from within t.Stop, and this happens just after the timing wheel's goroutine has: - // 1. removed t from b (through b.Flush -> b.remove) - // 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add) - // then t.getBucket will return nil for case 1, or ab (non-nil) for case 2. - // In either case, the returned value does not equal to b. - return false - } - b.timers.Remove(t.element) - t.setBucket(nil) - t.element = nil - return true -} - -func (b *bucket) Remove(t *Timer) bool { - b.mu.Lock() - defer b.mu.Unlock() - return b.remove(t) -} - -func (b *bucket) Flush(reinsert func(*Timer)) { - b.mu.Lock() - defer b.mu.Unlock() - - for e := b.timers.Front(); e != nil; { - next := e.Next() - - t := e.Value.(*Timer) - b.remove(t) - // Note that this operation will either execute the timer's task, or - // insert the timer into another bucket belonging to a lower-level wheel. - // - // In either case, no further lock operation will happen to b.mu. - reinsert(t) - - e = next - } - - b.SetExpiration(-1) -} diff --git a/src/components/kafkatimer/delayqueue.go b/src/components/kafkatimer/delayqueue.go deleted file mode 100644 index e33da29..0000000 --- a/src/components/kafkatimer/delayqueue.go +++ /dev/null @@ -1,183 +0,0 @@ -package kafkatimer - -import ( - "container/heap" - "sync" - "sync/atomic" - "time" -) - -type item struct { - Value interface{} - Priority int64 - Index int -} - -// this is a priority queue as implemented by a min heap -// ie. the 0th element is the *lowest* value -type priorityQueue []*item - -func newPriorityQueue(capacity int) priorityQueue { - return make(priorityQueue, 0, capacity) -} - -func (pq priorityQueue) Len() int { - return len(pq) -} - -func (pq priorityQueue) Less(i, j int) bool { - return pq[i].Priority < pq[j].Priority -} - -func (pq priorityQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] - pq[i].Index = i - pq[j].Index = j -} - -func (pq *priorityQueue) Push(x interface{}) { - n := len(*pq) - c := cap(*pq) - if n+1 > c { - npq := make(priorityQueue, n, c*2) - copy(npq, *pq) - *pq = npq - } - *pq = (*pq)[0 : n+1] - item := x.(*item) - item.Index = n - (*pq)[n] = item -} - -func (pq *priorityQueue) Pop() interface{} { - n := len(*pq) - c := cap(*pq) - if n < (c/2) && c > 25 { - npq := make(priorityQueue, n, c/2) - copy(npq, *pq) - *pq = npq - } - item := (*pq)[n-1] - item.Index = -1 - *pq = (*pq)[0 : n-1] - return item -} - -func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) { - if pq.Len() == 0 { - return nil, 0 - } - - item := (*pq)[0] - if item.Priority > max { - return nil, item.Priority - max - } - heap.Remove(pq, 0) - - return item, 0 -} - -// The end of PriorityQueue implementation. - -// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which -// an element can only be taken when its delay has expired. The head of the -// queue is the *Delayed* element whose delay expired furthest in the past. -type DelayQueue struct { - C chan interface{} - - mu sync.Mutex - pq priorityQueue - - // Similar to the sleeping state of runtime.timers. - sleeping int32 - wakeupC chan struct{} -} - -// New creates an instance of delayQueue with the specified size. -func New(size int) *DelayQueue { - return &DelayQueue{ - C: make(chan interface{}), - pq: newPriorityQueue(size), - wakeupC: make(chan struct{}), - } -} - -// Offer inserts the element into the current queue. -func (dq *DelayQueue) Offer(elem interface{}, expiration int64) { - item := &item{Value: elem, Priority: expiration} - - dq.mu.Lock() - heap.Push(&dq.pq, item) - index := item.Index - dq.mu.Unlock() - - if index == 0 { - // A new item with the earliest expiration is added. - if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) { - dq.wakeupC <- struct{}{} - } - } -} - -// Poll starts an infinite loop, in which it continually waits for an element -// to expire and then send the expired element to the channel C. -func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) { - for { - now := nowF() - - dq.mu.Lock() - item, delta := dq.pq.PeekAndShift(now) - if item == nil { - // No items left or at least one item is pending. - - // We must ensure the atomicity of the whole operation, which is - // composed of the above PeekAndShift and the following StoreInt32, - // to avoid possible race conditions between Offer and Poll. - atomic.StoreInt32(&dq.sleeping, 1) - } - dq.mu.Unlock() - - if item == nil { - if delta == 0 { - // No items left. - select { - case <-dq.wakeupC: - // Wait until a new item is added. - continue - case <-exitC: - goto exit - } - } else if delta > 0 { - // At least one item is pending. - select { - case <-dq.wakeupC: - // A new item with an "earlier" expiration than the current "earliest" one is added. - continue - case <-time.After(time.Duration(delta) * time.Millisecond): - // The current "earliest" item expires. - - // Reset the sleeping state since there's no need to receive from wakeupC. - if atomic.SwapInt32(&dq.sleeping, 0) == 0 { - // A caller of Offer() is being blocked on sending to wakeupC, - // drain wakeupC to unblock the caller. - <-dq.wakeupC - } - continue - case <-exitC: - goto exit - } - } - } - - select { - case dq.C <- item.Value: - // The expired element has been sent out successfully. - case <-exitC: - goto exit - } - } - -exit: - // Reset the states - atomic.StoreInt32(&dq.sleeping, 0) -} diff --git a/src/components/kafkatimer/delayqueue/delayqueue.go b/src/components/kafkatimer/delayqueue/delayqueue.go deleted file mode 100644 index 82e6a9d..0000000 --- a/src/components/kafkatimer/delayqueue/delayqueue.go +++ /dev/null @@ -1,186 +0,0 @@ -package delayqueue - -import ( - "container/heap" - "sync" - "sync/atomic" - "time" -) - -// The start of PriorityQueue implementation. -// Borrowed from https://github.com/nsqio/nsq/blob/master/internal/pqueue/pqueue.go - -type item struct { - Value interface{} - Priority int64 - Index int -} - -// this is a priority queue as implemented by a min heap -// ie. the 0th element is the *lowest* value -type priorityQueue []*item - -func newPriorityQueue(capacity int) priorityQueue { - return make(priorityQueue, 0, capacity) -} - -func (pq priorityQueue) Len() int { - return len(pq) -} - -func (pq priorityQueue) Less(i, j int) bool { - return pq[i].Priority < pq[j].Priority -} - -func (pq priorityQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] - pq[i].Index = i - pq[j].Index = j -} - -func (pq *priorityQueue) Push(x interface{}) { - n := len(*pq) - c := cap(*pq) - if n+1 > c { - npq := make(priorityQueue, n, c*2) - copy(npq, *pq) - *pq = npq - } - *pq = (*pq)[0 : n+1] - item := x.(*item) - item.Index = n - (*pq)[n] = item -} - -func (pq *priorityQueue) Pop() interface{} { - n := len(*pq) - c := cap(*pq) - if n < (c/2) && c > 25 { - npq := make(priorityQueue, n, c/2) - copy(npq, *pq) - *pq = npq - } - item := (*pq)[n-1] - item.Index = -1 - *pq = (*pq)[0 : n-1] - return item -} - -func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) { - if pq.Len() == 0 { - return nil, 0 - } - - item := (*pq)[0] - if item.Priority > max { - return nil, item.Priority - max - } - heap.Remove(pq, 0) - - return item, 0 -} - -// The end of PriorityQueue implementation. - -// DelayQueue is an unbounded blocking queue of *Delayed* elements, in which -// an element can only be taken when its delay has expired. The head of the -// queue is the *Delayed* element whose delay expired furthest in the past. -type DelayQueue struct { - C chan interface{} - - mu sync.Mutex - pq priorityQueue - - // Similar to the sleeping state of runtime.timers. - sleeping int32 - wakeupC chan struct{} -} - -// New creates an instance of delayQueue with the specified size. -func New(size int) *DelayQueue { - return &DelayQueue{ - C: make(chan interface{}), - pq: newPriorityQueue(size), - wakeupC: make(chan struct{}), - } -} - -// Offer inserts the element into the current queue. -func (dq *DelayQueue) Offer(elem interface{}, expiration int64) { - item := &item{Value: elem, Priority: expiration} - - dq.mu.Lock() - heap.Push(&dq.pq, item) - index := item.Index - dq.mu.Unlock() - - if index == 0 { - // A new item with the earliest expiration is added. - if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) { - dq.wakeupC <- struct{}{} - } - } -} - -// Poll starts an infinite loop, in which it continually waits for an element -// to expire and then send the expired element to the channel C. -func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) { - for { - now := nowF() - - dq.mu.Lock() - item, delta := dq.pq.PeekAndShift(now) - if item == nil { - // No items left or at least one item is pending. - - // We must ensure the atomicity of the whole operation, which is - // composed of the above PeekAndShift and the following StoreInt32, - // to avoid possible race conditions between Offer and Poll. - atomic.StoreInt32(&dq.sleeping, 1) - } - dq.mu.Unlock() - - if item == nil { - if delta == 0 { - // No items left. - select { - case <-dq.wakeupC: - // Wait until a new item is added. - continue - case <-exitC: - goto exit - } - } else if delta > 0 { - // At least one item is pending. - select { - case <-dq.wakeupC: - // A new item with an "earlier" expiration than the current "earliest" one is added. - continue - case <-time.After(time.Duration(delta) * time.Millisecond): - // The current "earliest" item expires. - - // Reset the sleeping state since there's no need to receive from wakeupC. - if atomic.SwapInt32(&dq.sleeping, 0) == 0 { - // A caller of Offer() is being blocked on sending to wakeupC, - // drain wakeupC to unblock the caller. - <-dq.wakeupC - } - continue - case <-exitC: - goto exit - } - } - } - - select { - case dq.C <- item.Value: - // The expired element has been sent out successfully. - case <-exitC: - goto exit - } - } - -exit: - // Reset the states - atomic.StoreInt32(&dq.sleeping, 0) -} diff --git a/src/components/kafkatimer/timingwheel.go b/src/components/kafkatimer/timingwheel.go deleted file mode 100644 index 52b7f1d..0000000 --- a/src/components/kafkatimer/timingwheel.go +++ /dev/null @@ -1,227 +0,0 @@ -package kafkatimer - -import ( - "errors" - "sync/atomic" - "time" - "unsafe" - - "github.com/RussellLuo/timingwheel/delayqueue" -) -//时间轮 kafka - -// TimingWheel is an implementation of Hierarchical Timing Wheels. -type TimingWheel struct { - tick int64 // in milliseconds - wheelSize int64 - - interval int64 // in milliseconds - currentTime int64 // in milliseconds - buckets []*bucket - queue *delayqueue.DelayQueue - - // The higher-level overflow wheel. - // - // NOTE: This field may be updated and read concurrently, through Add(). - overflowWheel unsafe.Pointer // type: *TimingWheel - - exitC chan struct{} - waitGroup waitGroupWrapper -} - -// NewTimingWheel creates an instance of TimingWheel with the given tick and wheelSize. -func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel { - tickMs := int64(tick / time.Millisecond) - if tickMs <= 0 { - panic(errors.New("tick must be greater than or equal to 1ms")) - } - - startMs := timeToMs(time.Now().UTC()) - - return newTimingWheel( - tickMs, - wheelSize, - startMs, - delayqueue.New(int(wheelSize)), - ) -} - -// newTimingWheel is an internal helper function that really creates an instance of TimingWheel. -func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel { - buckets := make([]*bucket, wheelSize) - for i := range buckets { - buckets[i] = newBucket() - } - return &TimingWheel{ - tick: tickMs, - wheelSize: wheelSize, - currentTime: truncate(startMs, tickMs), - interval: tickMs * wheelSize, - buckets: buckets, - queue: queue, - exitC: make(chan struct{}), - } -} - -// add inserts the timer t into the current timing wheel. -func (tw *TimingWheel) add(t *Timer) bool { - currentTime := atomic.LoadInt64(&tw.currentTime) - if t.expiration < currentTime+tw.tick { - // Already expired - return false - } else if t.expiration < currentTime+tw.interval { - // Put it into its own bucket - virtualID := t.expiration / tw.tick - b := tw.buckets[virtualID%tw.wheelSize] - b.Add(t) - - // Set the bucket expiration time - if b.SetExpiration(virtualID * tw.tick) { - // The bucket needs to be enqueued since it was an expired bucket. - // We only need to enqueue the bucket when its expiration time has changed, - // i.e. the wheel has advanced and this bucket get reused with a new expiration. - // Any further calls to set the expiration within the same wheel cycle will - // pass in the same value and hence return false, thus the bucket with the - // same expiration will not be enqueued multiple times. - tw.queue.Offer(b, b.Expiration()) - } - - return true - } else { - // Out of the interval. Put it into the overflow wheel - overflowWheel := atomic.LoadPointer(&tw.overflowWheel) - if overflowWheel == nil { - atomic.CompareAndSwapPointer( - &tw.overflowWheel, - nil, - unsafe.Pointer(newTimingWheel( - tw.interval, - tw.wheelSize, - currentTime, - tw.queue, - )), - ) - overflowWheel = atomic.LoadPointer(&tw.overflowWheel) - } - return (*TimingWheel)(overflowWheel).add(t) - } -} - -// addOrRun inserts the timer t into the current timing wheel, or run the -// timer's task if it has already expired. -func (tw *TimingWheel) addOrRun(t *Timer) { - if !tw.add(t) { - // Already expired - - // Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc), - // always execute the timer's task in its own goroutine. - go t.task() - } -} - -func (tw *TimingWheel) advanceClock(expiration int64) { - currentTime := atomic.LoadInt64(&tw.currentTime) - if expiration >= currentTime+tw.tick { - currentTime = truncate(expiration, tw.tick) - atomic.StoreInt64(&tw.currentTime, currentTime) - - // Try to advance the clock of the overflow wheel if present - overflowWheel := atomic.LoadPointer(&tw.overflowWheel) - if overflowWheel != nil { - (*TimingWheel)(overflowWheel).advanceClock(currentTime) - } - } -} - -// Start starts the current timing wheel. -func (tw *TimingWheel) Start() { - tw.waitGroup.Wrap(func() { - tw.queue.Poll(tw.exitC, func() int64 { - return timeToMs(time.Now().UTC()) - }) - }) - - tw.waitGroup.Wrap(func() { - for { - select { - case elem := <-tw.queue.C: - b := elem.(*bucket) - tw.advanceClock(b.Expiration()) - b.Flush(tw.addOrRun) - case <-tw.exitC: - return - } - } - }) -} - -// Stop stops the current timing wheel. -// -// If there is any timer's task being running in its own goroutine, Stop does -// not wait for the task to complete before returning. If the caller needs to -// know whether the task is completed, it must coordinate with the task explicitly. -func (tw *TimingWheel) Stop() { - close(tw.exitC) - tw.waitGroup.Wait() -} - -// AfterFunc waits for the duration to elapse and then calls f in its own goroutine. -// It returns a Timer that can be used to cancel the call using its Stop method. -func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer { - t := &Timer{ - expiration: timeToMs(time.Now().UTC().Add(d)), - task: f, - } - tw.addOrRun(t) - return t -} - -// Scheduler determines the execution plan of a task. -type Scheduler interface { - // Next returns the next execution time after the given (previous) time. - // It will return a zero time if no next time is scheduled. - // - // All times must be UTC. - Next(time.Time) time.Time -} - -// ScheduleFunc calls f (in its own goroutine) according to the execution -// plan scheduled by s. It returns a Timer that can be used to cancel the -// call using its Stop method. -// -// If the caller want to terminate the execution plan halfway, it must -// stop the timer and ensure that the timer is stopped actually, since in -// the current implementation, there is a gap between the expiring and the -// restarting of the timer. The wait time for ensuring is short since the -// gap is very small. -// -// Internally, ScheduleFunc will ask the first execution time (by calling -// s.Next()) initially, and create a timer if the execution time is non-zero. -// Afterwards, it will ask the next execution time each time f is about to -// be executed, and f will be called at the next execution time if the time -// is non-zero. -func (tw *TimingWheel) ScheduleFunc(s Scheduler, f func()) (t *Timer) { - expiration := s.Next(time.Now().UTC()) - if expiration.IsZero() { - // No time is scheduled, return nil. - return - } - - t = &Timer{ - expiration: timeToMs(expiration), - task: func() { - // Schedule the task to execute at the next time if possible. - expiration := s.Next(msToTime(t.expiration)) - if !expiration.IsZero() { - t.expiration = timeToMs(expiration) - tw.addOrRun(t) - } - - // Actually execute the task. - f() - }, - } - tw.addOrRun(t) - - return -} diff --git a/src/components/kafkatimer/timingwheel_test.go b/src/components/kafkatimer/timingwheel_test.go deleted file mode 100644 index dbb6858..0000000 --- a/src/components/kafkatimer/timingwheel_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package kafkatimer - -import ( - "fmt" - "testing" - "time" -) - -func F() { - fmt.Println("I'm timer...") -} - - -func TestTimingWheel_Start(t *testing.T) { - tw := NewTimingWheel(time.Millisecond, 20) - tw.Start() - defer tw.Stop() - - exitC := make(chan time.Time, 1) - tw.AfterFunc(10 * time.Second, func() { - fmt.Println("The timer fires") - exitC <- time.Now().UTC() - }) - - fmt.Println(<-exitC) -} \ No newline at end of file diff --git a/src/components/kafkatimer/utils.go b/src/components/kafkatimer/utils.go deleted file mode 100644 index 42c8220..0000000 --- a/src/components/kafkatimer/utils.go +++ /dev/null @@ -1,38 +0,0 @@ -package kafkatimer - -import ( - "sync" - "time" -) - -// truncate returns the result of rounding x toward zero to a multiple of m. -// If m <= 0, Truncate returns x unchanged. -func truncate(x, m int64) int64 { - if m <= 0 { - return x - } - return x - x%m -} - -// timeToMs returns an integer number, which represents t in milliseconds. -func timeToMs(t time.Time) int64 { - return t.UnixNano() / int64(time.Millisecond) -} - -// msToTime returns the UTC time corresponding to the given Unix time, -// t milliseconds since January 1, 1970 UTC. -func msToTime(t int64) time.Time { - return time.Unix(0, t*int64(time.Millisecond)).UTC() -} - -type waitGroupWrapper struct { - sync.WaitGroup -} - -func (w *waitGroupWrapper) Wrap(cb func()) { - w.Add(1) - go func() { - cb() - w.Done() - }() -} diff --git a/src/components/net/conn.go b/src/components/net/conn.go index 781286f..253fabf 100644 --- a/src/components/net/conn.go +++ b/src/components/net/conn.go @@ -3,9 +3,12 @@ package net import ( "bufio" "fmt" + "math" "net" "pro2d/src/common" "pro2d/src/components/logger" + "pro2d/src/utils" + "sync/atomic" ) type Head struct { @@ -18,16 +21,20 @@ type Head struct { type Connection struct { net.Conn - Id int - Server *Server + Id int + Server *Server - scanner *bufio.Scanner - writer *bufio.Writer + scanner *bufio.Scanner + writer *bufio.Writer - WBuffer chan []byte - RBuffer chan *MsgPkg + WBuffer chan []byte + RBuffer chan *MsgPkg - Quit chan *Connection + Quit chan *Connection + + nextCheckTime int64 //下一次检查的时间 + lastHeartCheckTime int64 //最后收消息时间 + heartTimeoutCount int //超时次数 } type MsgPkg struct { @@ -42,11 +49,13 @@ func NewConn(id int, conn net.Conn, s *Server) *Connection { Conn: conn, Server: s, - scanner: bufio.NewScanner(conn), - writer: bufio.NewWriter(conn), - WBuffer: make(chan []byte), - RBuffer: make(chan *MsgPkg), - Quit: make(chan *Connection), + scanner: bufio.NewScanner(conn), + writer: bufio.NewWriter(conn), + WBuffer: make(chan []byte), + RBuffer: make(chan *MsgPkg), + Quit: make(chan *Connection), + lastHeartCheckTime: utils.Timex(), + heartTimeoutCount: 0, } } @@ -63,7 +72,6 @@ func (c *Connection) write() { logger.Error("write Flush fail err: " + err.Error()) return } - logger.Debug("write :%s, n: %d", msg, n) } } @@ -83,6 +91,8 @@ func (c *Connection) read() { //将请求消息发送给任务队列 c.Server.TaskQueue[workerID] <- req + atomic.StoreInt64(&c.lastHeartCheckTime, utils.Timex()) + //备注,可以在当前协程处理当条请求(如下, 实现很简单,已经删除),也可以丢到协程池里处理任务(如上),还未对比效果。 //c.Server.OnRecv(req) } @@ -94,6 +104,41 @@ func (c *Connection) read() { } } + +func (c *Connection) checkHeartBeat(now int64) { + lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime) + logger.Debug("checkHeartBeat ID: %d, last: %d, now: %d", c.Id, lastHeartCheckTime, now) + if math.Abs(float64(lastHeartCheckTime - now)) > common.HEART_TIMER_INTERVAL { + c.heartTimeoutCount++ + if c.heartTimeoutCount >= common.HEART_TIMEOUT_COUNT_MAX { + c.Quiting() + return + } + logger.Debug("timeout count: %d", c.heartTimeoutCount) + }else { + c.heartTimeoutCount = 0 + } +} + +func (c *Connection) update() { + nextCheckTime := atomic.LoadInt64(&c.nextCheckTime) + now := utils.Timex() + if now >= nextCheckTime { + c.checkHeartBeat(now) + nextCheckTime = now + common.HEART_TIMER_INTERVAL + atomic.StoreInt64(&c.nextCheckTime, nextCheckTime) + } +} +// +//func (c *Connection) SetLastHeartCheckTime() { +// now := utils.Timex() +// lastHeartCheckTime := atomic.LoadInt64(&c.lastHeartCheckTime) +// if now - lastHeartCheckTime < common.HEART_TIMER_INTERVAL { +// logger.Debug("heart too quick") +// } +// atomic.StoreInt64(&c.lastHeartCheckTime, now) +//} + func (c *Connection) Start() { go c.write() go c.read() @@ -109,6 +154,7 @@ func (c *Connection) Stop() { } func (c *Connection) Quiting() { + logger.Debug("ID: %d close", c.Id) c.Server.OnClose(c) } diff --git a/src/components/net/server.go b/src/components/net/server.go index 0ec6aa7..1635ac8 100644 --- a/src/components/net/server.go +++ b/src/components/net/server.go @@ -11,12 +11,15 @@ import ( "pro2d/src/components/db" "pro2d/src/components/etcd" "pro2d/src/components/logger" + "pro2d/src/components/timewheel" "pro2d/src/models" "sync" + "time" ) type ActionHandler func (msg *MsgPkg) (int32, proto.Message) var ActionMap map[pb.ProtoCode]ActionHandler +var TimeWheel *timewheel.TimeWheel type Server struct { SConf *conf.SConf @@ -36,10 +39,6 @@ func NewServer(sConf *conf.SConf) *Server { } } -func (s *Server) StartTimer() { - -} - //StartWorkerPool 启动worker工作池 func (s *Server) StartWorkerPool() { //遍历需要启动worker的数量,依此启动 @@ -96,6 +95,16 @@ func (s *Server) LoadPlugin() { logger.Debug("load plugin success") } +func (s *Server) handleTimeOut() { + s.Clients.Range(func(key, value interface{}) bool { + client := value.(*Connection) + client.update() + return true + }) + + TimeWheel.AfterFunc(1*time.Second, s.handleTimeOut) +} + func (s *Server)Start() error { //mongo 初始化 db.MongoDatabase = db.MongoClient.Database(conf.GlobalConf.GameConf.DBName) @@ -110,10 +119,10 @@ func (s *Server)Start() error { s.EtcdClient.PutWithLeasePrefix(conf.GlobalConf.GameConf.Name, conf.GlobalConf.GameConf.ID, fmt.Sprintf("%s:%d", conf.GlobalConf.GameConf.IP, conf.GlobalConf.GameConf.Port), 5) //初始化plugin - _, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath) - if err != nil { - return err - } + //_, err = plugin.Open(conf.GlobalConf.GameConf.PluginPath) + //if err != nil { + // return err + //} port := fmt.Sprintf(":%d", s.SConf.Port) l, err := net.Listen("tcp", port) @@ -121,8 +130,15 @@ func (s *Server)Start() error { return err } + //启动协程池 s.StartWorkerPool() + //启动定时器 + TimeWheel = timewheel.NewTimeWheel(common.TickMS * time.Millisecond, common.WheelSize) + TimeWheel.Start() + s.handleTimeOut() + + //监听端口 logger.Debug("listen on %s\n", port) id := 0 for { @@ -139,5 +155,5 @@ func (s *Server)Start() error { } func (s *Server)Stop() { -} - + TimeWheel.Stop() +} \ No newline at end of file diff --git a/src/components/skynettimer/timerwheel.go b/src/components/skynettimer/timerwheel.go deleted file mode 100644 index 0c70a33..0000000 --- a/src/components/skynettimer/timerwheel.go +++ /dev/null @@ -1 +0,0 @@ -package skynettimer diff --git a/src/components/timewheel/bucket.go b/src/components/timewheel/bucket.go new file mode 100644 index 0000000..0c54b51 --- /dev/null +++ b/src/components/timewheel/bucket.go @@ -0,0 +1,85 @@ +package timewheel + +import ( + "container/list" + "sync" + "sync/atomic" + "unsafe" +) +type bucket struct { + //过期时间 + expiration int64 + + mu sync.Mutex + //相同过期时间的任务队列 + timers *list.List +} + +func newBucket() *bucket { + return &bucket{ + expiration: -1, + mu: sync.Mutex{}, + timers: list.New(), + } +} + +func (b *bucket) SetExpiration(expiration int64) { + atomic.AddInt64(&b.expiration, expiration) +} + +func (b *bucket) Add(t *Timer) { + b.mu.Lock() + defer b.mu.Unlock() + + e := b.timers.PushBack(t) + t.setBucket(b) + t.element = e +} + +func (b *bucket) Flush(reinsert func(*Timer)) { + b.mu.Lock() + defer b.mu.Unlock() + + for e := b.timers.Front(); e != nil; { + next := e.Next() + t := e.Value.(*Timer) + b.remove(t) + + reinsert(t) + e = next + } +} + +func (b *bucket) remove(t *Timer) bool { + if t.getBucket() != b { + return false + } + b.timers.Remove(t.element) + t.setBucket(nil) + t.element = nil + return true +} + +func (b *bucket) Remove(t *Timer) bool { + b.mu.Lock() + defer b.mu.Unlock() + + return b.remove(t) +} + +type Timer struct { + expiration int64 + //要被执行的任务 + task func() + + b unsafe.Pointer + element *list.Element +} + +func (t *Timer) setBucket(b *bucket) { + atomic.StorePointer(&t.b, unsafe.Pointer(b)) +} + +func (t *Timer) getBucket() *bucket { + return (*bucket)(atomic.LoadPointer(&t.b)) +} \ No newline at end of file diff --git a/src/components/timewheel/timewheel.go b/src/components/timewheel/timewheel.go new file mode 100644 index 0000000..f17a9e1 --- /dev/null +++ b/src/components/timewheel/timewheel.go @@ -0,0 +1,165 @@ +package timewheel + +import ( + "pro2d/src/common" + "pro2d/src/components/workpool" + "sync/atomic" + "time" + "unsafe" +) + +type TimeWheel struct { + ticker *time.Ticker + tickMs int64 //一滴答的时间 1ms 可以自定义 我们这里选择使用1ms + wheelSize int64 + startMs int64 //开始时间 in millisecond + endMs int64 + wheelTime int64 //跑完一圈所需时间 + level int64 //层级 + + //时间刻度 列表 + bucket []*bucket + currentTime int64 //当前时间 in millisecond + prevflowWheel unsafe.Pointer // type: *TimingWheel + overflowWheel unsafe.Pointer // type: *TimingWheel + exitC chan struct{} + + WorkPool *workpool.WorkPool +} + +func NewTimeWheel(tick time.Duration, wheelSize int64) *TimeWheel { + //转化为毫秒 + tickMs := int64(tick / time.Millisecond) + //如果小于零 + if tickMs <=0 { + panic("tick must be greater than or equal to 1 ms") + } + + startMs := time.Now().UnixMilli() //ms + + workpool := workpool.NewWorkPool(common.WorkerPoolSize, common.MaxTaskPerWorker) + return newTimingWheel(tickMs, wheelSize, startMs, 0, nil, workpool) +} + +func newTimingWheel(tick, wheelSize int64, start, level int64, prev *TimeWheel, pool *workpool.WorkPool) *TimeWheel { + buckets := make([]*bucket, wheelSize) + for i := range buckets { + buckets[i] = newBucket() + } + + return &TimeWheel{ + tickMs: tick, + wheelSize: wheelSize, + startMs: start, + endMs: wheelSize * tick + start, + wheelTime: wheelSize * tick, + bucket: buckets, + currentTime: truncate(start, tick), + exitC: make(chan struct{}), + WorkPool: pool, + + prevflowWheel: unsafe.Pointer(prev), + level: level, + } +} + +func truncate(dst, m int64) int64 { + return dst - dst%m +} + +func (tw *TimeWheel) add(t *Timer) bool { + currentTime := atomic.LoadInt64(&tw.currentTime) + if t.expiration < currentTime + tw.tickMs { + return false + }else if t.expiration < currentTime + tw.wheelTime { + virtualID := t.expiration / tw.tickMs //需要多少滴答数 + b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize + b.Add(t) + + b.SetExpiration(virtualID * tw.tickMs) + }else { + overflowWheel := atomic.LoadPointer(&tw.overflowWheel) + if overflowWheel == nil { + level := atomic.LoadInt64(&tw.level) + 1 + atomic.CompareAndSwapPointer( + &tw.overflowWheel, + nil, + unsafe.Pointer(newTimingWheel(tw.wheelTime, tw.wheelSize, currentTime, level, tw , tw.WorkPool)), + ) + overflowWheel = atomic.LoadPointer(&tw.overflowWheel) + } + //递归添加到下一级定时器中 + (*TimeWheel)(overflowWheel).add(t) + } + + return true +} + +func (tw *TimeWheel) addOrRun(t *Timer) { + if !tw.add(t) { + workerID := t.expiration % tw.WorkPool.WorkerPoolSize + //将请求消息发送给任务队列 + tw.WorkPool.TaskQueue[workerID] <- t.task + } +} + +//拨动时钟 +func (tw *TimeWheel) advanceClock(expiration int64) { + level := atomic.LoadInt64(&tw.level) + currentTime := truncate(expiration, tw.tickMs) + atomic.StoreInt64(&tw.currentTime, currentTime) + + if level == 0 { + virtualID := expiration / tw.tickMs //需要多少滴答数 + b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize + b.Flush(tw.addOrRun) + } else { + prevflowWheel := atomic.LoadPointer(&tw.prevflowWheel) + if prevflowWheel != nil { + virtualID := expiration / tw.tickMs //需要多少滴答数 + b := tw.bucket[virtualID%tw.wheelSize] //pos = 所需滴答数 % wheelSize + b.Flush((*TimeWheel)(prevflowWheel).addOrRun) + } + } + + //如果基础的时钟指针转完了一圈,则递归拨动下一级时钟 + if currentTime >= tw.endMs { + atomic.StoreInt64(&tw.startMs, currentTime) + atomic.StoreInt64(&tw.endMs, currentTime + tw.wheelTime) + + overflowWheel := atomic.LoadPointer(&tw.overflowWheel) + if overflowWheel != nil { + (*TimeWheel)(overflowWheel).advanceClock(currentTime) + } + } +} + + +func (tw *TimeWheel) AfterFunc(d time.Duration, f func()) *Timer { + t := &Timer{ + expiration: time.Now().UTC().Add(d).UnixMilli(), + task: f, + } + tw.addOrRun(t) + return t +} + +func (tw *TimeWheel) Start() { + tw.ticker = time.NewTicker(time.Duration(tw.tickMs) * time.Millisecond) + tw.WorkPool.StartWorkerPool() + + go func() { + for { + select { + case t := <- tw.ticker.C: + tw.advanceClock(t.UnixMilli()) + case <- tw.exitC: + return + } + } + }() +} + +func (tw *TimeWheel) Stop() { + tw.exitC <- struct{}{} +} \ No newline at end of file diff --git a/src/components/timewheel/timewheel_test.go b/src/components/timewheel/timewheel_test.go new file mode 100644 index 0000000..460377f --- /dev/null +++ b/src/components/timewheel/timewheel_test.go @@ -0,0 +1,30 @@ +package timewheel + +import ( + "fmt" + "testing" + "time" +) +var tw *TimeWheel + +func Add() { + fmt.Println("ADD : 123456") + tw.AfterFunc(6*time.Second, Add) +} + +func Add1() { + fmt.Println("GET : 78901112") + tw.AfterFunc(9*time.Second, Add1) +} + +func TestTimeWheel_AfterFunc(t *testing.T) { + + tw = NewTimeWheel(time.Second, 5) + tw.Start() + defer tw.Stop() + + + Add() + Add1() + time.Sleep(time.Second * 200) +} diff --git a/src/components/workpool/workpool.go b/src/components/workpool/workpool.go new file mode 100644 index 0000000..e325231 --- /dev/null +++ b/src/components/workpool/workpool.go @@ -0,0 +1,41 @@ +package workpool + +type Job func() + +type WorkPool struct { + WorkerPoolSize int64 + MaxTaskPerWorker int64 + TaskQueue []chan Job +} + +func NewWorkPool(poolSize, maxTaskSize int64) *WorkPool { + return &WorkPool{ + WorkerPoolSize: poolSize, + MaxTaskPerWorker: maxTaskSize, + TaskQueue: make([]chan Job, poolSize), + } +} + +//StartOneWorker 启动一个Worker工作流程 +func (wp *WorkPool) StartOneWorker(workerID int, taskQueue chan Job) { + //不断的等待队列中的消息 + for { + select { + //有消息则取出队列的Request,并执行绑定的业务方法 + case job := <-taskQueue: + _ = workerID + job() + } + } +} + +func (wp *WorkPool) StartWorkerPool() { + //遍历需要启动worker的数量,依此启动 + for i := 0; i < int(wp.WorkerPoolSize); i++ { + //一个worker被启动 + //给当前worker对应的任务队列开辟空间 + wp.TaskQueue[i] = make(chan Job, wp.MaxTaskPerWorker) + //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来 + go wp.StartOneWorker(i, wp.TaskQueue[i]) + } +} \ No newline at end of file diff --git a/src/plugin/RolePlugin.go b/src/plugin/RolePlugin.go index d75561b..461bc3b 100644 --- a/src/plugin/RolePlugin.go +++ b/src/plugin/RolePlugin.go @@ -1,4 +1,4 @@ -package main +package plugin import ( "github.com/golang/protobuf/proto" @@ -10,6 +10,7 @@ import ( ) func HeartRpc(msg *net.MsgPkg) (int32, proto.Message) { + //msg.Conn.SetLastHeartCheckTime() return 0, nil } diff --git a/src/plugin/protocode.go b/src/plugin/protocode.go index 5fbdfda..92a9c30 100644 --- a/src/plugin/protocode.go +++ b/src/plugin/protocode.go @@ -1,4 +1,4 @@ -package main +package plugin import ( "pro2d/protos/pb" diff --git a/src/utils/utils.go b/src/utils/utils.go index 64f37a0..74bdc0a 100644 --- a/src/utils/utils.go +++ b/src/utils/utils.go @@ -55,6 +55,6 @@ func Md5V(str string) string { return hex.EncodeToString(h.Sum(nil)) } -func Timex() int64 { - return time.Now().UnixMicro() +func Timex() int64 { + return time.Now().Unix() } \ No newline at end of file diff --git a/test/client.go b/test/client.go index 258bbbd..da4b75f 100644 --- a/test/client.go +++ b/test/client.go @@ -16,7 +16,7 @@ func main() { head := &net2.Head{ Length: 0, - Cmd: int32(pb.ProtoCode_LoginRpc), + Cmd: int32(pb.ProtoCode_LoginReq), ErrCode: 0, PreField: 0, } -- libgit2 0.21.2