Blame view

src/components/net/conn.go 2.33 KB
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
1
2
3
4
5
6
  package net
  
  import (
  	"bufio"
  	"fmt"
  	"net"
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
7
8
  	"pro2d/src/common"
  	"pro2d/src/components/logger"
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  )
  
  type Head struct {
  	Length 		int32
  	Cmd 		int32
  	ErrCode 	int32
  	PreField    int32
  }
  
  
  type Connection struct {
  	net.Conn
  	Id int
  	Server *Server
  
  	scanner *bufio.Scanner
  	writer *bufio.Writer
  
  	WBuffer chan []byte
  	RBuffer chan *MsgPkg
  
  	Quit chan *Connection
  }
  
  type MsgPkg struct {
9644352a   zhangqijia   登录服改为http,游戏服改为长连...
34
  	Head *Head
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
  	Body []byte
  	Conn *Connection
  }
  
  func NewConn(id int, conn net.Conn, s *Server) *Connection {
  	return &Connection{
  		Id: id,
  		Conn: conn,
  		Server: s,
  
  		scanner: bufio.NewScanner(conn),
  		writer: bufio.NewWriter(conn),
  		WBuffer: make(chan []byte),
  		RBuffer: make(chan *MsgPkg),
  		Quit: make(chan *Connection),
  	}
  }
  
  func (c *Connection) write()  {
  	defer c.Quiting()
  
  	for msg := range c.WBuffer {
98b0736d   zhangqijia   添加定时器, 检查心跳
57
58
59
  		n, err := c.writer.Write(msg)
  		if err != nil{
  			logger.Error("write fail err: " + err.Error(), "n: ", n)
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
60
61
62
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
98b0736d   zhangqijia   添加定时器, 检查心跳
63
  			logger.Error("write Flush fail err: " + err.Error())
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
64
65
  			return
  		}
98b0736d   zhangqijia   添加定时器, 检查心跳
66
  		logger.Debug("write :%s, n: %d", msg, n)
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
67
68
69
70
71
  	}
  }
  
  func (c *Connection) read() {
  	defer c.Quiting()
5d9cf01c   zhangqijia   plugin 热更
72
  	c.scanner.Split(ParseMsg)
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
73
  
5d9cf01c   zhangqijia   plugin 热更
74
75
76
  	for c.scanner.Scan() {
  		req, err := DecodeMsg(c.scanner.Bytes())
  		if err != nil {
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
77
78
  			return
  		}
5d9cf01c   zhangqijia   plugin 热更
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
  
  		req.Conn = c
  		//得到需要处理此条连接的workerID
  		workerID := c.Id % c.Server.SConf.WorkerPoolSize
  		//将请求消息发送给任务队列
  		c.Server.TaskQueue[workerID] <- req
  
  		//备注,可以在当前协程处理当条请求(如下, 实现很简单,已经删除),也可以丢到协程池里处理任务(如上),还未对比效果。
  		//c.Server.OnRecv(req)
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		fmt.Printf("scanner.err: %s\n", err.Error())
  		c.Quiting()
  		return
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
94
95
96
97
98
99
  	}
  }
  
  func (c *Connection) Start()  {
  	go c.write()
  	go c.read()
98b0736d   zhangqijia   添加定时器, 检查心跳
100
101
102
103
  	//for {
  	//	c.SendMsgByCode(100, 1, nil)
  	//	time.Sleep(2*time.Second)
  	//}
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
104
105
106
107
  }
  
  func (c *Connection) Stop()  {
  	close(c.RBuffer)
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
108
109
110
111
112
113
114
  	c.Conn.Close()
  }
  
  func (c *Connection) Quiting()  {
  	c.Server.OnClose(c)
  }
  
9644352a   zhangqijia   登录服改为http,游戏服改为长连...
115
116
117
118
119
120
121
122
123
124
125
126
127
  func (c *Connection) SendMsgByCode(errCode int32, cmd int32, data []byte){
  	h := &Head{
  		Length:   int32(common.HEADLEN + len(data)),
  		Cmd:      cmd,
  		ErrCode:  errCode,
  		PreField: 0,
  	}
  	pkg := &MsgPkg{
  		Head: h,
  		Body: data,
  	}
  	buf, err := EncodeMsg(pkg)
  	if err != nil {
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
128
  		logger.Error("SendMsg error: %v", err)
9644352a   zhangqijia   登录服改为http,游戏服改为长连...
129
130
131
  		return
  	}
  	c.WBuffer <- buf
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
132
  }