Commit 54b3f1336ab7a89a93a2a12bae7ba99867afd465

Authored by zhangqijia
1 parent b3d79719

add connector interface

cmd/test/client.go
1 package main 1 package main
2 2
3 import ( 3 import (
4 - "bufio"  
5 - "bytes"  
6 - "encoding/binary"  
7 "github.com/golang/protobuf/proto" 4 "github.com/golang/protobuf/proto"
8 - "net" 5 + "pro2d/common"
9 "pro2d/common/components" 6 "pro2d/common/components"
10 "pro2d/common/logger" 7 "pro2d/common/logger"
11 "pro2d/pb" 8 "pro2d/pb"
@@ -27,49 +24,19 @@ func main() { @@ -27,49 +24,19 @@ func main() {
27 } 24 }
28 l, _ :=proto.Marshal(loginReq) 25 l, _ :=proto.Marshal(loginReq)
29 26
30 - b := components.PBMessage{  
31 - Head: head,  
32 - Body: l, 27 + options := []components.ConnectorOption{
  28 + components.WithCtorCount(common.GlobalConf.TestClient.Count),
  29 + components.WithCtorSplitter(components.NewPBSplitter()),
33 } 30 }
34 - head.Length = uint32(16 + len(b.Body))  
35 - buf := &bytes.Buffer{}  
36 - err := binary.Write(buf, binary.BigEndian, head)  
37 - if err != nil {  
38 - logger.Error("err: %v, head: %v", err, head)  
39 - return  
40 - }  
41 - logger.Debug("head: %v", head)  
42 31
43 - err = binary.Write(buf, binary.BigEndian, b.Body)  
44 - if err != nil {  
45 - logger.Error("err: %v, msg: %v", err, b.Body)  
46 - return  
47 - }  
48 -  
49 - client, err := net.Dial("tcp", "localhost:8850")  
50 - if err != nil { 32 + client := components.NewConnector(common.GlobalConf.TestClient.Ip, common.GlobalConf.TestClient.Port, options...)
  33 + if err := client.Connect(); err != nil {
51 logger.Error(err) 34 logger.Error(err)
52 return 35 return
53 } 36 }
54 37
55 - rd := bufio.NewReadWriter(bufio.NewReader(client), bufio.NewWriter(client))  
56 for { 38 for {
57 - b1 := make([]byte, 1024)  
58 - n, err := rd.Write(buf.Bytes())  
59 - if err != nil {  
60 - logger.Error(err)  
61 - return  
62 - }  
63 - rd.Flush()  
64 - logger.Debug("write:n: %d, msg: %s", n, buf.Bytes())  
65 -  
66 - n, err = rd.Read(b1)  
67 - if err != nil {  
68 - logger.Error(err)  
69 - return  
70 - }  
71 - logger.Debug("recv: %s, n: %d\n", b1, n) 39 + client.Send(head.Cmd, l)
72 time.Sleep(1*time.Second) 40 time.Sleep(1*time.Second)
73 } 41 }
74 -  
75 } 42 }
76 \ No newline at end of file 43 \ No newline at end of file
common/components/conn.go
@@ -14,7 +14,7 @@ import ( @@ -14,7 +14,7 @@ import (
14 type Connection struct { 14 type Connection struct {
15 IConnection 15 IConnection
16 net.Conn 16 net.Conn
17 - Server IServer 17 + splitter ISplitter
18 Id uint32 18 Id uint32
19 19
20 scanner *bufio.Scanner 20 scanner *bufio.Scanner
@@ -36,7 +36,7 @@ var connectionPool = &sync.Pool{ @@ -36,7 +36,7 @@ var connectionPool = &sync.Pool{
36 New: func() interface{} { return new(Connection)}, 36 New: func() interface{} { return new(Connection)},
37 } 37 }
38 38
39 -func NewConn(id int, conn net.Conn, s IServer) *Connection { 39 +func NewConn(id int, conn net.Conn, splitter ISplitter) IConnection {
40 c := connectionPool.Get().(*Connection) 40 c := connectionPool.Get().(*Connection)
41 closed := atomic.LoadUint32(&c.Status) 41 closed := atomic.LoadUint32(&c.Status)
42 if closed != 0 { 42 if closed != 0 {
@@ -46,7 +46,7 @@ func NewConn(id int, conn net.Conn, s IServer) *Connection { @@ -46,7 +46,7 @@ func NewConn(id int, conn net.Conn, s IServer) *Connection {
46 46
47 atomic.StoreUint32(&c.Id, uint32(id)) 47 atomic.StoreUint32(&c.Id, uint32(id))
48 c.Conn = conn 48 c.Conn = conn
49 - c.Server = s 49 + c.splitter = splitter
50 50
51 c.scanner = bufio.NewScanner(conn) 51 c.scanner = bufio.NewScanner(conn)
52 c.writer = bufio.NewWriter(conn) 52 c.writer = bufio.NewWriter(conn)
@@ -118,7 +118,7 @@ func (c *Connection) Stop() { @@ -118,7 +118,7 @@ func (c *Connection) Stop() {
118 } 118 }
119 119
120 func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error { 120 func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error {
121 - buf, err := c.Server.GetSplitter().Pack(cmd, data, errCode, 0) 121 + buf, err := c.splitter.Pack(cmd, data, errCode, 0)
122 if err != nil { 122 if err != nil {
123 return err 123 return err
124 } 124 }
@@ -171,10 +171,10 @@ func (c *Connection) read() { @@ -171,10 +171,10 @@ func (c *Connection) read() {
171 c.Stop() 171 c.Stop()
172 }() 172 }()
173 173
174 - c.scanner.Split(c.Server.GetSplitter().ParseMsg) 174 + c.scanner.Split(c.splitter.ParseMsg)
175 175
176 for c.scanner.Scan() { 176 for c.scanner.Scan() {
177 - req, err := c.Server.GetSplitter().UnPack(c.scanner.Bytes()) 177 + req, err := c.splitter.UnPack(c.scanner.Bytes())
178 if err != nil { 178 if err != nil {
179 return 179 return
180 } 180 }
common/components/connector.go 0 → 100644
@@ -0,0 +1,100 @@ @@ -0,0 +1,100 @@
  1 +package components
  2 +
  3 +import (
  4 + "fmt"
  5 + "net"
  6 + "pro2d/common/logger"
  7 +)
  8 +
  9 +type ConnectorOption func(*Connector)
  10 +
  11 +func WithCtorSplitter(splitter ISplitter) ConnectorOption {
  12 + return func(connector *Connector) {
  13 + connector.splitter = splitter
  14 + }
  15 +}
  16 +
  17 +func WithCtorCount(count int) ConnectorOption {
  18 + return func(connector *Connector) {
  19 + connector.sum = count
  20 + }
  21 +}
  22 +
  23 +type Connector struct {
  24 + IConnector
  25 + IServer
  26 + splitter ISplitter
  27 + ip string
  28 + port int
  29 + sum int
  30 +
  31 + Conns IConnManage
  32 + ids uint32
  33 +}
  34 +
  35 +func NewConnector(ip string, port int, options ...ConnectorOption) IConnector {
  36 + c := &Connector{
  37 + ids: 0,
  38 + ip: ip,
  39 + port: port,
  40 + Conns: NewConnManage(),
  41 + }
  42 + for _, option := range options {
  43 + option(c)
  44 + }
  45 + return c
  46 +}
  47 +
  48 +func (c *Connector) Connect() error {
  49 + if c.sum == 0 {
  50 + c.sum = 1
  51 + }
  52 + for i := 0; i < c.sum; i++ {
  53 + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d",c.ip, c.port))
  54 + if err != nil {
  55 + return err
  56 + }
  57 + c.ids++
  58 + cli := NewConn(int(c.ids), conn, c.splitter)
  59 + cli.SetConnectionCallback(c.OnConnect)
  60 + cli.SetMessageCallback(c.OnMessage)
  61 + cli.SetCloseCallback(c.OnClose)
  62 + cli.SetTimerCallback(c.OnTimer)
  63 +
  64 + cli.Start()
  65 + }
  66 +
  67 + return nil
  68 +}
  69 +
  70 +func (c *Connector) DisConnect(){
  71 + c.Conns.StopAllConns()
  72 +}
  73 +
  74 +func (c *Connector) Send( cmd uint32, b []byte) {
  75 + c.Conns.Range(func(key interface{}, value interface{}) bool {
  76 + conn := value.(IConnection)
  77 + conn.Send(0, cmd ,b)
  78 + return true
  79 + })
  80 +}
  81 +
  82 +func (c *Connector) GetSplitter() ISplitter {
  83 + return c.splitter
  84 +}
  85 +
  86 +func (c *Connector) OnConnect(conn IConnection){
  87 + c.Conns.AddConn(conn.GetID(), conn)
  88 +}
  89 +
  90 +func (c *Connector) OnMessage(msg IMessage){
  91 + logger.Debug("recv msg cmd: %d, conn: %d data: %s", msg.GetHeader().GetMsgID(), msg.GetSession().GetID(), msg.GetData())
  92 +}
  93 +
  94 +func (c *Connector) OnClose(conn IConnection){
  95 + logger.Debug("onclose id: %d", conn.GetID())
  96 +}
  97 +
  98 +func (c *Connector) OnTimer(conn IConnection){
  99 + logger.Debug("ontimer id: %d", conn.GetID())
  100 +}
0 \ No newline at end of file 101 \ No newline at end of file
common/components/icompontents.go
@@ -13,7 +13,6 @@ type ( @@ -13,7 +13,6 @@ type (
13 } 13 }
14 //网络包 14 //网络包
15 IMessage interface { 15 IMessage interface {
16 - IHead  
17 GetHeader() IHead //获取消息头 16 GetHeader() IHead //获取消息头
18 SetHeader(header IHead) //设置消息头 17 SetHeader(header IHead) //设置消息头
19 18
@@ -71,6 +70,14 @@ type ( @@ -71,6 +70,14 @@ type (
71 SetTimerCallback(TimerCallback) 70 SetTimerCallback(TimerCallback)
72 } 71 }
73 72
  73 + //Connector
  74 + IConnector interface {
  75 + Connect() error
  76 + DisConnect()
  77 +
  78 + Send(cmd uint32, b []byte)
  79 + }
  80 +
74 //httpserver 81 //httpserver
75 IHttp interface { 82 IHttp interface {
76 Start() error 83 Start() error
common/components/server.go
@@ -130,7 +130,7 @@ func (s *Server) Start() error { @@ -130,7 +130,7 @@ func (s *Server) Start() error {
130 } 130 }
131 131
132 id++ 132 id++
133 - client := NewConn(id, conn, s) 133 + client := NewConn(id, conn, s.splitter)
134 s.newConnection(client) 134 s.newConnection(client)
135 } 135 }
136 } 136 }
@@ -71,6 +71,12 @@ type LogConf struct { @@ -71,6 +71,12 @@ type LogConf struct {
71 LogConn *LogConn `yaml:"Conn" json:"Conn"` 71 LogConn *LogConn `yaml:"Conn" json:"Conn"`
72 } 72 }
73 73
  74 +type TestClient struct {
  75 + Ip string `yaml:"ip"`
  76 + Port int`yaml:"port"`
  77 + Count int `yaml:"count"`
  78 +}
  79 +
74 type ServerConf struct { 80 type ServerConf struct {
75 ID string `yaml:"id"` 81 ID string `yaml:"id"`
76 Name string `yaml:"name"` 82 Name string `yaml:"name"`
@@ -80,6 +86,7 @@ type ServerConf struct { @@ -80,6 +86,7 @@ type ServerConf struct {
80 GameConf *SConf `yaml:"server_game"` 86 GameConf *SConf `yaml:"server_game"`
81 RedisConf *RedisConf `yaml:"redis"` 87 RedisConf *RedisConf `yaml:"redis"`
82 LogConf *LogConf `yaml:"logconf" json:"logconf"` 88 LogConf *LogConf `yaml:"logconf" json:"logconf"`
  89 + TestClient *TestClient `yaml:"test_client"`
83 Etcd *Etcd `yaml:"etcd"` 90 Etcd *Etcd `yaml:"etcd"`
84 } 91 }
85 92
@@ -20,7 +20,7 @@ server_account: @@ -20,7 +20,7 @@ server_account:
20 id: "1" 20 id: "1"
21 name: "account" 21 name: "account"
22 ip: "192.168.0.206" 22 ip: "192.168.0.206"
23 - port: 8858 23 + port: 8080
24 pool_size: 1 24 pool_size: 1
25 debugport: 6062 25 debugport: 6062
26 mongo: 26 mongo:
@@ -39,6 +39,11 @@ server_game: @@ -39,6 +39,11 @@ server_game:
39 <<: *default-mongo 39 <<: *default-mongo
40 dbname: "game" 40 dbname: "game"
41 41
  42 +test_client:
  43 + ip: "127.0.0.1"
  44 + port: 8850
  45 + count: 10
  46 +
42 logconf: 47 logconf:
43 TimeFormat: "2006-01-02 15:04:05" 48 TimeFormat: "2006-01-02 15:04:05"
44 Console: 49 Console: