From 3b95cf56451dafc0a8bd161a2dfc29f4143244bf Mon Sep 17 00:00:00 2001 From: MQjehovah <1421706030@qq.com> Date: Mon, 22 Jan 2024 09:52:50 +0800 Subject: [PATCH] fix --- README.md | 16 +++++++++ client/main.go | 79 ++++++++++++++++++++++++++++++++++------- go_proxy.code-workspace | 3 ++ server/client.go | 5 +-- 4 files changed, 88 insertions(+), 15 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..fd7a8e4 --- /dev/null +++ b/README.md @@ -0,0 +1,16 @@ + +```mermaid +sequenceDiagram + participant Device + participant Broker + participant Web + Device->>Broker: 建立TCP连接 + Broker->>Broker: 5s未注册,主动断开 + Device->>Broker: 注册报文 + Broker->>Device: 注册回复 + Device->>Broker: 心跳报文 + Broker->>Device: 心跳报文 + Web->>Broker: 创建Session + Broker->>Device: 创建Session + +``` diff --git a/client/main.go b/client/main.go index 3954474..6e5ee40 100644 --- a/client/main.go +++ b/client/main.go @@ -19,8 +19,13 @@ import ( // 帧类型 1字节 // 帧长度 4字节 +// 帧数据 帧长度大小的数据 +// 心跳包 02 00 00 00 00 +// 注册包 01 00 00 00 00 device_id 00 token 00 desc +// 创建SESSION 03 00 00 00 type Config struct { + DeviceId string AddrServer string } @@ -28,8 +33,8 @@ const HEART_BEAT_INTERVAL = time.Second * 5 // 心跳超时时间 const ( MSG_TYPE_UNKOWNM = iota - MSG_TYPE_HEARTBEAT MSG_TYPE_REGISTER + MSG_TYPE_HEARTBEAT MSG_TYPE_SESSION_CREATE MSG_TYPE_SESSION_DATA MSG_TYPE_SESSION_DESTORY @@ -37,6 +42,12 @@ const ( MSG_TYPE_MAX ) +type Session struct { + sessionId string + inBound *os.File + dev *Device +} + type Device struct { id string category string @@ -47,6 +58,7 @@ type Device struct { registered bool closed uint32 send chan []byte // Buffered channel of outbound messages. + sessions map[string]*Session } func main() { @@ -108,7 +120,8 @@ func main() { func runClient(c *cli.Context) { cfg := &Config{ - AddrServer: "localhost:9011", + DeviceId: "jmq", + AddrServer: "dev.xzrobot.com:9011", } tcpConn, err := createTcpConn(cfg.AddrServer) @@ -119,7 +132,7 @@ func runClient(c *cli.Context) { fmt.Println(tcpConn.LocalAddr().String() + " : Client Connected") dev := &Device{ - id: "1234567890", + id: cfg.DeviceId, category: "device", conn: tcpConn, create_time: time.Now().Unix(), @@ -127,35 +140,35 @@ func runClient(c *cli.Context) { registered: false, closed: 0, send: make(chan []byte, 100), + sessions: make(map[string]*Session), } go dev.readLoop() + go dev.writeLoop() //registe s := make([][]byte, 3) - s[0] = []byte("deviceid") //id - s[1] = []byte(dev.id) //desc - s[2] = []byte(dev.id) //token + s[0] = []byte("jmq") //id + s[1] = []byte("desc") //desc + s[2] = []byte("token") //token dev.WriteMsg(MSG_TYPE_REGISTER, bytes.Join(s, []byte{0})) - go dev.writeLoop() select {} } func (dev *Device) WriteMsg(typ int, data []byte) { b := []byte{byte(typ), 0, 0, 0, 0} - binary.BigEndian.PutUint32(b[1:], uint32(len(data))) - dev.send <- append(b, data...) } func (dev *Device) readLoop() { defer dev.conn.Close() + defer log.Debug().Msgf("dev readLoop finished") reader := bufio.NewReader(dev.conn) for { - b, err := reader.Peek(5) + b, err := reader.Peek(3) if err != nil { if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { log.Error().Msg(err.Error()) @@ -163,7 +176,7 @@ func (dev *Device) readLoop() { return } - reader.Discard(5) + reader.Discard(3) msg_type := b[0] @@ -172,7 +185,7 @@ func (dev *Device) readLoop() { return } - msg_length := binary.BigEndian.Uint32(b[1:]) + msg_length := binary.BigEndian.Uint16(b[1:]) data := make([]byte, msg_length) _, err = io.ReadFull(reader, data) if err != nil { @@ -184,11 +197,50 @@ func (dev *Device) readLoop() { switch msg_type { case MSG_TYPE_HEARTBEAT: - // log.Info().Msgf("Receive Heartbeat Time: %d", time.Now().Unix()) + log.Info().Msgf("Receive Heartbeat Time: %d", time.Now().Unix()) case MSG_TYPE_REGISTER: dev.registered = true log.Info().Msgf("Device Registry Success") + case MSG_TYPE_SESSION_CREATE: + sessionId := string(data[:32]) + + cmd := exec.Command("bash") + ff, err := pty.Start(cmd) + if err != nil { + fmt.Println("Create Pty Error! " + err.Error()) + return + } + + s := &Session{ + sessionId: sessionId, + inBound: nil, + dev: dev, + } + + dev.sessions[sessionId] = s + + go func(s *Session) { + defer ff.Close() + reader := bufio.NewReader(ff) + for { + buffer := []byte{} + len, err := reader.Read(buffer) + if err != nil { + log.Error().Msg(err.Error()) + return + } + dev.WriteMsg(MSG_TYPE_SESSION_DATA, append([]byte(s.sessionId), buffer[:len]...)) + } + }(s) + + case MSG_TYPE_SESSION_DATA: + sessionId := string(data[:32]) + if session, ok := dev.sessions[sessionId]; ok { + session.dev.WriteMsg(MSG_TYPE_SESSION_DATA, data) + // session.inBound.Write(data[32:]) + return + } case MSG_TYPE_TUNNEL_CREATE: log.Info().Msgf("Receive Tunnel Create") cmd := exec.Command("bash") @@ -228,6 +280,7 @@ func (dev *Device) readLoop() { log.Error().Msgf("invalid msg type: %d", msg_type) } } + } func (dev *Device) writeLoop() { diff --git a/go_proxy.code-workspace b/go_proxy.code-workspace index e329506..7a465fe 100644 --- a/go_proxy.code-workspace +++ b/go_proxy.code-workspace @@ -5,6 +5,9 @@ }, { "path": "./server" + }, + { + "path": "." } ] } \ No newline at end of file diff --git a/server/client.go b/server/client.go index 1d3106f..6883ae3 100644 --- a/server/client.go +++ b/server/client.go @@ -25,8 +25,8 @@ const HEART_BEAT_INTERVAL = time.Second * 5 // 心跳超时时间 const ( MSG_TYPE_UNKOWNM = iota - MSG_TYPE_HEARTBEAT MSG_TYPE_REGISTER + MSG_TYPE_HEARTBEAT MSG_TYPE_SESSION_CREATE MSG_TYPE_SESSION_DATA MSG_TYPE_SESSION_DESTORY @@ -149,9 +149,10 @@ func (dev *Device) readLoop() { return } if !parseDeviceInfo(dev, data) { + log.Error().Msg("parseDeviceInfo Failed") return } - dev.WriteMsg(MSG_TYPE_TUNNEL_CREATE, []byte{}) + // dev.WriteMsg(MSG_TYPE_TUNNEL_CREATE, []byte{}) dev.br.register <- dev case MSG_TYPE_SESSION_CREATE: