diff --git a/client/client.code-workspace b/client/client.code-workspace deleted file mode 100644 index 986af1d..0000000 --- a/client/client.code-workspace +++ /dev/null @@ -1,11 +0,0 @@ -{ - "folders": [ - { - "path": "." - }, - { - "path": "../server" - } - ], - "settings": {} -} \ No newline at end of file diff --git a/client/go.mod b/client/go.mod index 5a950fa..d3b5731 100644 --- a/client/go.mod +++ b/client/go.mod @@ -3,8 +3,16 @@ module main go 1.21.5 require ( - github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect - github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/urfave/cli/v2 v2.27.1 // indirect - github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + github.com/creack/pty v1.1.21 + github.com/rs/zerolog v1.31.0 +) + +require ( + github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/urfave/cli/v2 v2.27.1 + github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + golang.org/x/sys v0.12.0 // indirect ) diff --git a/client/go.sum b/client/go.sum index 31517c0..93dca2a 100644 --- a/client/go.sum +++ b/client/go.sum @@ -1,8 +1,27 @@ +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.7 h1:6pwm8kMQKCmgUg0ZHTm5+/YvRK0s3THD/28+T6/kk4A= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/creack/pty v1.1.21 h1:1/QdRyBaHHJP61QkWMXlOIBfsgdDeeKfK8SYVUWJKf0= +github.com/creack/pty v1.1.21/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= +github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/urfave/cli/v2 v2.27.1 h1:8xSQ6szndafKVRmfyeUMxkNUJQMjL1F2zmsZ+qHpfho= github.com/urfave/cli/v2 v2.27.1/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/client/main.go b/client/main.go index a8215b8..6dd90a2 100644 --- a/client/main.go +++ b/client/main.go @@ -2,18 +2,24 @@ package main import ( "bufio" + "bytes" "encoding/binary" "fmt" "io" "net" "os" + "os/exec" "strings" "time" + "github.com/creack/pty" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" ) +// 帧类型 1字节 +// 帧长度 4字节 + type Config struct { AddrServer string } @@ -38,8 +44,6 @@ type Device struct { conn net.Conn create_time int64 /* connection time */ active time.Time - uptime uint32 - token string registered bool closed uint32 send chan []byte // Buffered channel of outbound messages. @@ -114,27 +118,41 @@ func runClient(c *cli.Context) { } fmt.Println(tcpConn.LocalAddr().String() + " : Client Connected") - reader := bufio.NewReader(tcpConn) - for { - s, err := reader.ReadString('\n') - if err != nil || err == io.EOF { - break - } else { - //接收到new的指令的时候,新建一个tcp连接 - if s == "new\n" { - - } - if s == "hi" { - //忽略掉hi的请求 - } - } + dev := &Device{ + id: "1234567890", + category: "device", + conn: tcpConn, + create_time: time.Now().Unix(), + active: time.Now(), + registered: false, + closed: 0, + send: make(chan []byte, 100), } + go dev.readLoop() + + //registe + s := make([][]byte, 3) + s[0] = []byte("deviceid") //id + s[1] = []byte(dev.id) //desc + s[2] = []byte(dev.id) //token + dev.WriteMsg(MSG_TYPE_REGISTER, bytes.Join(s, []byte{0})) + + go dev.writeLoop() + select {} } -func readLoop(conn *net.TCPConn) { - defer conn.Close() +func (dev *Device) WriteMsg(typ int, data []byte) { + b := []byte{byte(typ), 0, 0, 0, 0} - reader := bufio.NewReader(conn) + binary.BigEndian.PutUint32(b[1:], uint32(len(data))) + + dev.send <- append(b, data...) +} + +func (dev *Device) readLoop() { + defer dev.conn.Close() + + reader := bufio.NewReader(dev.conn) for { b, err := reader.Peek(5) @@ -162,27 +180,60 @@ func readLoop(conn *net.TCPConn) { return } - // dev.active = time.Now() + dev.active = time.Now() switch msg_type { case MSG_TYPE_HEARTBEAT: - parseHeartbeat(b) + // log.Info().Msgf("Receive Heartbeat Time: %d", time.Now().Unix()) + case MSG_TYPE_REGISTER: + dev.registered = true + log.Info().Msgf("Device Registry Success") + case MSG_TYPE_TUNNEL_CREATE: + log.Info().Msgf("Receive Tunnel Create") + cmd := exec.Command("bash") + ff, err := pty.Start(cmd) + if err != nil { + fmt.Println("Create Pty Error! " + err.Error()) + return + } + + remoteConn, err := createTcpConn("localhost:10001") + if err != nil { + fmt.Println("Create remoteAddr Error! " + err.Error()) + return + } + + go func() { + defer ff.Close() + defer remoteConn.Close() + _, err := io.Copy(ff, remoteConn) + if err != nil { + fmt.Println("Copy error! " + err.Error()) + return + } + }() + + go func() { + defer ff.Close() + defer remoteConn.Close() + _, err := io.Copy(remoteConn, ff) + if err != nil { + fmt.Println("Copy error! " + err.Error()) + return + } + }() + // createTunnel("localhost:10001", "localhost:20001") default: log.Error().Msgf("invalid msg type: %d", msg_type) } } } -func parseHeartbeat(b []byte) { - uptime := binary.BigEndian.Uint32(b[:4]) - log.Info().Msgf("Heartbeat Time: %d", uptime) -} - -func writeLoop(conn *net.TCPConn) { +func (dev *Device) writeLoop() { ticker := time.NewTicker(time.Second) - defer conn.Close() + defer dev.conn.Close() ninactive := 0 lastHeartbeat := time.Now() @@ -218,6 +269,7 @@ func writeLoop(conn *net.TCPConn) { if now.Sub(lastHeartbeat) > HEART_BEAT_INTERVAL-1 { lastHeartbeat = now if len(dev.send) < 1 { + // log.Info().Msgf("Send Heartbeat Time: %d", time.Now().Unix()) dev.WriteMsg(MSG_TYPE_HEARTBEAT, []byte{}) } } @@ -228,12 +280,10 @@ func writeLoop(conn *net.TCPConn) { func createTcpConn(addr string) (*net.TCPConn, error) { tcpConn, err := net.ResolveTCPAddr("tcp", addr) if err != nil { - fmt.Println("TCP Address error ! " + err.Error()) return nil, err } conn, err := net.DialTCP("tcp", nil, tcpConn) if err != nil { - fmt.Println("TCP Connect error ! " + err.Error()) return nil, err } return conn, nil @@ -251,10 +301,9 @@ func createTunnel(localAddr string, remoteAddr string) { return } - defer localConn.Close() - defer remoteConn.Close() - go func() { + defer localConn.Close() + defer remoteConn.Close() _, err := io.Copy(localConn, remoteConn) if err != nil { fmt.Println("Copy error! " + err.Error()) @@ -263,6 +312,8 @@ func createTunnel(localAddr string, remoteAddr string) { }() go func() { + defer localConn.Close() + defer remoteConn.Close() _, err := io.Copy(remoteConn, localConn) if err != nil { fmt.Println("Copy error! " + err.Error()) diff --git a/go_proxy.code-workspace b/go_proxy.code-workspace new file mode 100644 index 0000000..e329506 --- /dev/null +++ b/go_proxy.code-workspace @@ -0,0 +1,10 @@ +{ + "folders": [ + { + "path": "./client" + }, + { + "path": "./server" + } + ] +} \ No newline at end of file diff --git a/server/client.go b/server/client.go index 0d4e7d6..1d3106f 100644 --- a/server/client.go +++ b/server/client.go @@ -65,9 +65,9 @@ func (dev *Device) DeviceCategory() string { // 往设备写数据 func (dev *Device) WriteMsg(typ int, data []byte) { - b := []byte{byte(typ), 0, 0} + b := []byte{byte(typ), 0, 0, 0, 0} - binary.BigEndian.PutUint16(b[1:], uint16(len(data))) + binary.BigEndian.PutUint32(b[1:], uint32(len(data))) dev.send <- append(b, data...) } @@ -88,7 +88,7 @@ func (dev *Device) Close() { // 解析设备信息 func parseDeviceInfo(dev *Device, b []byte) bool { - fields := bytes.Split(b, []byte{0}) + fields := bytes.Split(b, []byte{0}) //通过\0分隔 if len(fields) < 3 { log.Error().Msg("msgTypeRegister: invalid") @@ -151,7 +151,7 @@ func (dev *Device) readLoop() { if !parseDeviceInfo(dev, data) { return } - + dev.WriteMsg(MSG_TYPE_TUNNEL_CREATE, []byte{}) dev.br.register <- dev case MSG_TYPE_SESSION_CREATE: @@ -170,7 +170,6 @@ func (dev *Device) readLoop() { } dev.br.deviceMessageChan <- deviceMessage{string(data[:32]), MSG_TYPE_SESSION_DATA, data[32:]} - case MSG_TYPE_HEARTBEAT: parseHeartbeat(dev, b) @@ -208,7 +207,7 @@ func (dev *Device) writeLoop() { now := time.Now() if now.Sub(dev.active) > HEART_BEAT_INTERVAL*3/2 { if dev.id == "" { - return + continue // 未注册直接踢出 } log.Error().Msgf("Inactive device in long time: %s", dev.id) @@ -253,6 +252,8 @@ func listenDevice(br *Broker) { dev := &Device{ br: br, conn: conn, + category: "device", + active: time.Now(), create_time: time.Now().Unix(), send: make(chan []byte, 256), } @@ -261,5 +262,4 @@ func listenDevice(br *Broker) { go dev.writeLoop() } }() - } diff --git a/server/main.go b/server/main.go index 682b8fd..b40e4bd 100644 --- a/server/main.go +++ b/server/main.go @@ -22,7 +22,8 @@ func runServer(c *cli.Context) { br := newBroker(cfg) listenDevice(br) - br.run() + go br.run() + select {} } func main() {