Blame view

src/components/net/conn.go 2.22 KB
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
1
2
3
4
5
6
  package net
  
  import (
  	"bufio"
  	"fmt"
  	"net"
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
7
8
  	"pro2d/src/common"
  	"pro2d/src/components/logger"
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  )
  
  type Head struct {
  	Length 		int32
  	Cmd 		int32
  	ErrCode 	int32
  	PreField    int32
  }
  
  
  type Connection struct {
  	net.Conn
  	Id int
  	Server *Server
  
  	scanner *bufio.Scanner
  	writer *bufio.Writer
  
  	WBuffer chan []byte
  	RBuffer chan *MsgPkg
  
  	Quit chan *Connection
  }
  
  type MsgPkg struct {
9644352a   zhangqijia   登录服改为http,游戏服改为长连...
34
  	Head *Head
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
  	Body []byte
  	Conn *Connection
  }
  
  func NewConn(id int, conn net.Conn, s *Server) *Connection {
  	return &Connection{
  		Id: id,
  		Conn: conn,
  		Server: s,
  
  		scanner: bufio.NewScanner(conn),
  		writer: bufio.NewWriter(conn),
  		WBuffer: make(chan []byte),
  		RBuffer: make(chan *MsgPkg),
  		Quit: make(chan *Connection),
  	}
  }
  
  func (c *Connection) write()  {
  	defer c.Quiting()
  
  	for msg := range c.WBuffer {
  		if _, err := c.writer.Write(msg); err != nil {
  			fmt.Println("write fail err: " + err.Error())
  			return
  		}
  		if err := c.writer.Flush(); err != nil {
  			fmt.Println("write Flush fail err: " + err.Error())
  			return
  		}
  	}
  }
  
  func (c *Connection) read() {
  	defer c.Quiting()
5d9cf01c   zhangqijia   plugin 热更
70
  	c.scanner.Split(ParseMsg)
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
71
  
5d9cf01c   zhangqijia   plugin 热更
72
73
74
  	for c.scanner.Scan() {
  		req, err := DecodeMsg(c.scanner.Bytes())
  		if err != nil {
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
75
76
  			return
  		}
5d9cf01c   zhangqijia   plugin 热更
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
  
  		req.Conn = c
  		//得到需要处理此条连接的workerID
  		workerID := c.Id % c.Server.SConf.WorkerPoolSize
  		//将请求消息发送给任务队列
  		c.Server.TaskQueue[workerID] <- req
  
  		//备注,可以在当前协程处理当条请求(如下, 实现很简单,已经删除),也可以丢到协程池里处理任务(如上),还未对比效果。
  		//c.Server.OnRecv(req)
  	}
  
  	if err := c.scanner.Err(); err != nil {
  		fmt.Printf("scanner.err: %s\n", err.Error())
  		c.Quiting()
  		return
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
  	}
  }
  
  func (c *Connection) Start()  {
  	go c.write()
  	go c.read()
  }
  
  func (c *Connection) Stop()  {
  	close(c.RBuffer)
  	close(c.WBuffer)
  	c.Conn.Close()
  }
  
  func (c *Connection) Quiting()  {
  	c.Server.OnClose(c)
  }
  
9644352a   zhangqijia   登录服改为http,游戏服改为长连...
110
111
112
113
114
115
116
117
118
119
120
121
122
  func (c *Connection) SendMsgByCode(errCode int32, cmd int32, data []byte){
  	h := &Head{
  		Length:   int32(common.HEADLEN + len(data)),
  		Cmd:      cmd,
  		ErrCode:  errCode,
  		PreField: 0,
  	}
  	pkg := &MsgPkg{
  		Head: h,
  		Body: data,
  	}
  	buf, err := EncodeMsg(pkg)
  	if err != nil {
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
123
  		logger.Error("SendMsg error: %v", err)
9644352a   zhangqijia   登录服改为http,游戏服改为长连...
124
125
126
  		return
  	}
  	c.WBuffer <- buf
fee11bff   zhangqijia   客户端无法使用grpc热更,不用g...
127
  }