From 54b3f1336ab7a89a93a2a12bae7ba99867afd465 Mon Sep 17 00:00:00 2001 From: zqj <582132116@qq.com> Date: Wed, 23 Mar 2022 10:57:58 +0800 Subject: [PATCH] add connector interface --- cmd/test/client.go | 47 +++++++---------------------------------------- common/components/conn.go | 12 ++++++------ common/components/connector.go | 100 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ common/components/icompontents.go | 9 ++++++++- common/components/server.go | 2 +- common/conf.go | 7 +++++++ conf/conf.yaml | 7 ++++++- 7 files changed, 135 insertions(+), 49 deletions(-) create mode 100644 common/components/connector.go diff --git a/cmd/test/client.go b/cmd/test/client.go index 02377a5..a1276ad 100644 --- a/cmd/test/client.go +++ b/cmd/test/client.go @@ -1,11 +1,8 @@ package main import ( - "bufio" - "bytes" - "encoding/binary" "github.com/golang/protobuf/proto" - "net" + "pro2d/common" "pro2d/common/components" "pro2d/common/logger" "pro2d/pb" @@ -27,49 +24,19 @@ func main() { } l, _ :=proto.Marshal(loginReq) - b := components.PBMessage{ - Head: head, - Body: l, + options := []components.ConnectorOption{ + components.WithCtorCount(common.GlobalConf.TestClient.Count), + components.WithCtorSplitter(components.NewPBSplitter()), } - head.Length = uint32(16 + len(b.Body)) - buf := &bytes.Buffer{} - err := binary.Write(buf, binary.BigEndian, head) - if err != nil { - logger.Error("err: %v, head: %v", err, head) - return - } - logger.Debug("head: %v", head) - err = binary.Write(buf, binary.BigEndian, b.Body) - if err != nil { - logger.Error("err: %v, msg: %v", err, b.Body) - return - } - - client, err := net.Dial("tcp", "localhost:8850") - if err != nil { + client := components.NewConnector(common.GlobalConf.TestClient.Ip, common.GlobalConf.TestClient.Port, options...) + if err := client.Connect(); err != nil { logger.Error(err) return } - rd := bufio.NewReadWriter(bufio.NewReader(client), bufio.NewWriter(client)) for { - b1 := make([]byte, 1024) - n, err := rd.Write(buf.Bytes()) - if err != nil { - logger.Error(err) - return - } - rd.Flush() - logger.Debug("write:n: %d, msg: %s", n, buf.Bytes()) - - n, err = rd.Read(b1) - if err != nil { - logger.Error(err) - return - } - logger.Debug("recv: %s, n: %d\n", b1, n) + client.Send(head.Cmd, l) time.Sleep(1*time.Second) } - } \ No newline at end of file diff --git a/common/components/conn.go b/common/components/conn.go index 19f7a18..9cd3135 100644 --- a/common/components/conn.go +++ b/common/components/conn.go @@ -14,7 +14,7 @@ import ( type Connection struct { IConnection net.Conn - Server IServer + splitter ISplitter Id uint32 scanner *bufio.Scanner @@ -36,7 +36,7 @@ var connectionPool = &sync.Pool{ New: func() interface{} { return new(Connection)}, } -func NewConn(id int, conn net.Conn, s IServer) *Connection { +func NewConn(id int, conn net.Conn, splitter ISplitter) IConnection { c := connectionPool.Get().(*Connection) closed := atomic.LoadUint32(&c.Status) if closed != 0 { @@ -46,7 +46,7 @@ func NewConn(id int, conn net.Conn, s IServer) *Connection { atomic.StoreUint32(&c.Id, uint32(id)) c.Conn = conn - c.Server = s + c.splitter = splitter c.scanner = bufio.NewScanner(conn) c.writer = bufio.NewWriter(conn) @@ -118,7 +118,7 @@ func (c *Connection) Stop() { } func (c *Connection) Send(errCode int32, cmd uint32, data []byte) error { - buf, err := c.Server.GetSplitter().Pack(cmd, data, errCode, 0) + buf, err := c.splitter.Pack(cmd, data, errCode, 0) if err != nil { return err } @@ -171,10 +171,10 @@ func (c *Connection) read() { c.Stop() }() - c.scanner.Split(c.Server.GetSplitter().ParseMsg) + c.scanner.Split(c.splitter.ParseMsg) for c.scanner.Scan() { - req, err := c.Server.GetSplitter().UnPack(c.scanner.Bytes()) + req, err := c.splitter.UnPack(c.scanner.Bytes()) if err != nil { return } diff --git a/common/components/connector.go b/common/components/connector.go new file mode 100644 index 0000000..860eac7 --- /dev/null +++ b/common/components/connector.go @@ -0,0 +1,100 @@ +package components + +import ( + "fmt" + "net" + "pro2d/common/logger" +) + +type ConnectorOption func(*Connector) + +func WithCtorSplitter(splitter ISplitter) ConnectorOption { + return func(connector *Connector) { + connector.splitter = splitter + } +} + +func WithCtorCount(count int) ConnectorOption { + return func(connector *Connector) { + connector.sum = count + } +} + +type Connector struct { + IConnector + IServer + splitter ISplitter + ip string + port int + sum int + + Conns IConnManage + ids uint32 +} + +func NewConnector(ip string, port int, options ...ConnectorOption) IConnector { + c := &Connector{ + ids: 0, + ip: ip, + port: port, + Conns: NewConnManage(), + } + for _, option := range options { + option(c) + } + return c +} + +func (c *Connector) Connect() error { + if c.sum == 0 { + c.sum = 1 + } + for i := 0; i < c.sum; i++ { + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d",c.ip, c.port)) + if err != nil { + return err + } + c.ids++ + cli := NewConn(int(c.ids), conn, c.splitter) + cli.SetConnectionCallback(c.OnConnect) + cli.SetMessageCallback(c.OnMessage) + cli.SetCloseCallback(c.OnClose) + cli.SetTimerCallback(c.OnTimer) + + cli.Start() + } + + return nil +} + +func (c *Connector) DisConnect(){ + c.Conns.StopAllConns() +} + +func (c *Connector) Send( cmd uint32, b []byte) { + c.Conns.Range(func(key interface{}, value interface{}) bool { + conn := value.(IConnection) + conn.Send(0, cmd ,b) + return true + }) +} + +func (c *Connector) GetSplitter() ISplitter { + return c.splitter +} + +func (c *Connector) OnConnect(conn IConnection){ + c.Conns.AddConn(conn.GetID(), conn) +} + +func (c *Connector) OnMessage(msg IMessage){ + logger.Debug("recv msg cmd: %d, conn: %d data: %s", msg.GetHeader().GetMsgID(), msg.GetSession().GetID(), msg.GetData()) +} + +func (c *Connector) OnClose(conn IConnection){ + logger.Debug("onclose id: %d", conn.GetID()) +} + +func (c *Connector) OnTimer(conn IConnection){ + logger.Debug("ontimer id: %d", conn.GetID()) +} \ No newline at end of file diff --git a/common/components/icompontents.go b/common/components/icompontents.go index 00cf8e3..9672276 100644 --- a/common/components/icompontents.go +++ b/common/components/icompontents.go @@ -13,7 +13,6 @@ type ( } //网络包 IMessage interface { - IHead GetHeader() IHead //获取消息头 SetHeader(header IHead) //设置消息头 @@ -71,6 +70,14 @@ type ( SetTimerCallback(TimerCallback) } + //Connector + IConnector interface { + Connect() error + DisConnect() + + Send(cmd uint32, b []byte) + } + //httpserver IHttp interface { Start() error diff --git a/common/components/server.go b/common/components/server.go index 70d74ea..683453c 100644 --- a/common/components/server.go +++ b/common/components/server.go @@ -130,7 +130,7 @@ func (s *Server) Start() error { } id++ - client := NewConn(id, conn, s) + client := NewConn(id, conn, s.splitter) s.newConnection(client) } } diff --git a/common/conf.go b/common/conf.go index 711428a..7d72915 100644 --- a/common/conf.go +++ b/common/conf.go @@ -71,6 +71,12 @@ type LogConf struct { LogConn *LogConn `yaml:"Conn" json:"Conn"` } +type TestClient struct { + Ip string `yaml:"ip"` + Port int`yaml:"port"` + Count int `yaml:"count"` +} + type ServerConf struct { ID string `yaml:"id"` Name string `yaml:"name"` @@ -80,6 +86,7 @@ type ServerConf struct { GameConf *SConf `yaml:"server_game"` RedisConf *RedisConf `yaml:"redis"` LogConf *LogConf `yaml:"logconf" json:"logconf"` + TestClient *TestClient `yaml:"test_client"` Etcd *Etcd `yaml:"etcd"` } diff --git a/conf/conf.yaml b/conf/conf.yaml index e1aeb06..2ca90d5 100644 --- a/conf/conf.yaml +++ b/conf/conf.yaml @@ -20,7 +20,7 @@ server_account: id: "1" name: "account" ip: "192.168.0.206" - port: 8858 + port: 8080 pool_size: 1 debugport: 6062 mongo: @@ -39,6 +39,11 @@ server_game: <<: *default-mongo dbname: "game" +test_client: + ip: "127.0.0.1" + port: 8850 + count: 10 + logconf: TimeFormat: "2006-01-02 15:04:05" Console: -- libgit2 0.21.2