package main import ( "bufio" "bytes" "encoding/binary" "fmt" "io" "net" "os" "os/exec" "strings" "time" "github.com/creack/pty" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" ) // 帧类型 1字节 // 帧长度 4字节 // 帧数据 帧长度大小的数据 // 心跳包 02 00 00 00 00 // 注册包 01 00 00 00 00 device_id 00 token 00 desc // 创建SESSION 03 00 00 00 type Config struct { DeviceId string DeviceType string DeviceDesc string AddrServer string } const HEART_BEAT_INTERVAL = time.Second * 5 // 心跳超时时间 const ( MSG_TYPE_UNKOWNM = iota MSG_TYPE_REGISTER MSG_TYPE_HEARTBEAT MSG_TYPE_SESSION_CREATE MSG_TYPE_SESSION_DATA MSG_TYPE_SESSION_DESTORY MSG_TYPE_TUNNEL_CREATE MSG_TYPE_MAX ) type Session struct { sessionId string inBound *os.File dev *Device } type Device struct { id string category string desc string /* description of the device */ conn net.Conn create_time int64 /* connection time */ active time.Time registered bool closed uint32 send chan []byte // Buffered channel of outbound messages. sessions map[string]*Session } func main() { app := &cli.App{ Name: "XZRobot Ops Server", Usage: "The Server Side For xzrobot ops", Version: "1.0.0", Commands: []*cli.Command{ { Name: "run", Usage: "Run Server", Flags: []cli.Flag{ &cli.StringFlag{ Name: "log", Value: "goproxy.log", Usage: "log file path", }, &cli.StringFlag{ Name: "conf", Aliases: []string{"c"}, Value: "./goproxy.conf", Usage: "config file to load", }, &cli.StringFlag{ Name: "addr-dev", Value: ":9011", Usage: "address to listen device", }, &cli.StringFlag{ Name: "addr-user", Value: ":9012", Usage: "address to listen user", }, &cli.StringFlag{ Name: "db", Value: "sqlite://database.db", Usage: "database source", }, }, Action: func(c *cli.Context) error { runClient(c) return nil }, }, }, Action: func(c *cli.Context) error { c.App.Command("run").Run(c) return nil }, } err := app.Run(os.Args) if err != nil { fmt.Println(err) os.Exit(1) } } func reConnect(dev *Device, cfg *Config) { tcpConn, err := createTcpConn(cfg.AddrServer) if err != nil { fmt.Println("TCP Connect Error! " + err.Error()) return } fmt.Println(tcpConn.LocalAddr().String() + " : Client Connected") dev.conn = tcpConn dev.closed = 0 go dev.readLoop() go dev.writeLoop() //registe s := make([][]byte, 3) s[0] = []byte("jmq") //id s[1] = []byte("desc") //desc s[2] = []byte("token") //token dev.WriteMsg(MSG_TYPE_REGISTER, bytes.Join(s, []byte{0})) } func runClient(c *cli.Context) { cfg := &Config{ DeviceId: "jmq", DeviceType: "device", DeviceDesc: "jmq", AddrServer: "dev.xzrobot.com:9011", } for { dev := &Device{ id: cfg.DeviceId, category: cfg.DeviceType, desc: cfg.DeviceDesc, create_time: time.Now().Unix(), active: time.Now(), registered: false, closed: 1, send: make(chan []byte, 1024), sessions: make(map[string]*Session), } // tcpConn, err := createTcpConn(cfg.AddrServer) // if err != nil { // fmt.Println("TCP Connect Error! " + err.Error()) // return // } // fmt.Println(tcpConn.LocalAddr().String() + " : Client Connected") // go dev.readLoop() // go dev.writeLoop() //registe // s := make([][]byte, 3) // s[0] = []byte("jmq") //id // s[1] = []byte("desc") //desc // s[2] = []byte("token") //token // dev.WriteMsg(MSG_TYPE_REGISTER, bytes.Join(s, []byte{0})) t := time.NewTicker(time.Second * 1) for { select { case <-t.C: if dev.closed == 1 { reConnect(dev, cfg) } } } log.Info().Msgf("main loop exit") } } func (dev *Device) WriteMsg(typ int, data []byte) { b := []byte{byte(typ), 0, 0, 0, 0} binary.BigEndian.PutUint32(b[1:], uint32(len(data))) dev.send <- append(b, data...) } func (dev *Device) readLoop() { defer dev.conn.Close() defer log.Debug().Msgf("dev readLoop finished") reader := bufio.NewReader(dev.conn) for { b, err := reader.Peek(3) if err != nil { if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { log.Error().Msg(err.Error()) dev.closed = 1 } return } reader.Discard(3) msg_type := b[0] if msg_type >= MSG_TYPE_MAX { log.Error().Msgf("invalid msg type: %d", msg_type) return } msg_length := binary.BigEndian.Uint16(b[1:]) data := make([]byte, msg_length) _, err = io.ReadFull(reader, data) if err != nil { log.Error().Msg(err.Error()) return } dev.active = time.Now() switch msg_type { case MSG_TYPE_HEARTBEAT: log.Info().Msgf("Receive Heartbeat Time: %d", time.Now().Unix()) case MSG_TYPE_REGISTER: dev.registered = true log.Info().Msgf("Device Registry Success") case MSG_TYPE_SESSION_CREATE: sessionId := string(data[:32]) cmd := exec.Command("bash") ff, err := pty.Start(cmd) if err != nil { fmt.Println("Create Pty Error! " + err.Error()) return } s := &Session{ sessionId: sessionId, inBound: nil, dev: dev, } dev.sessions[sessionId] = s go func(s *Session) { defer ff.Close() reader := bufio.NewReader(ff) for { buffer := []byte{} len, err := reader.Read(buffer) if err != nil { log.Error().Msg(err.Error()) return } dev.WriteMsg(MSG_TYPE_SESSION_DATA, append([]byte(s.sessionId), buffer[:len]...)) } }(s) case MSG_TYPE_SESSION_DATA: sessionId := string(data[:32]) if session, ok := dev.sessions[sessionId]; ok { session.dev.WriteMsg(MSG_TYPE_SESSION_DATA, data) // session.inBound.Write(data[32:]) return } case MSG_TYPE_TUNNEL_CREATE: log.Info().Msgf("Receive Tunnel Create") cmd := exec.Command("bash") ff, err := pty.Start(cmd) if err != nil { fmt.Println("Create Pty Error! " + err.Error()) return } remoteConn, err := createTcpConn("192.168.21.146:10001") if err != nil { fmt.Println("Create remoteAddr Error! " + err.Error()) return } go func() { defer ff.Close() defer remoteConn.Close() _, err := io.Copy(ff, remoteConn) if err != nil { fmt.Println("Copy error! " + err.Error()) return } }() go func() { defer ff.Close() defer remoteConn.Close() _, err := io.Copy(remoteConn, ff) if err != nil { fmt.Println("Copy error! " + err.Error()) return } }() // createTunnel("localhost:10001", "localhost:20001") default: log.Error().Msgf("invalid msg type: %d", msg_type) } } } func (dev *Device) writeLoop() { defer dev.conn.Close() defer log.Debug().Msgf("dev writeLoop finished") ticker := time.NewTicker(time.Second) ninactive := 0 lastHeartbeat := time.Now() for { select { case msg, ok := <-dev.send: if !ok { return } _, err := dev.conn.Write(msg) if err != nil { log.Error().Msg(err.Error()) dev.closed = 1 return } case <-ticker.C: now := time.Now() if now.Sub(dev.active) > HEART_BEAT_INTERVAL*3/2 { if dev.id == "" { return } log.Error().Msgf("Inactive device in long time: %s", dev.id) if ninactive > 3 { log.Error().Msgf("Inactive 3 times, now kill it: %s", dev.id) dev.closed = 1 return } ninactive = ninactive + 1 } if now.Sub(lastHeartbeat) > HEART_BEAT_INTERVAL-1 { lastHeartbeat = now if len(dev.send) < 1 { log.Info().Msgf("Transmit Heartbeat Time: %d", time.Now().Unix()) dev.WriteMsg(MSG_TYPE_HEARTBEAT, []byte{}) } } } } } func createTcpConn(addr string) (*net.TCPConn, error) { tcpConn, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return nil, err } conn, err := net.DialTCP("tcp", nil, tcpConn) if err != nil { return nil, err } return conn, nil } func createTunnel(localAddr string, remoteAddr string) { localConn, err := createTcpConn(localAddr) if err != nil { fmt.Println("Create LocalConn Error! " + err.Error()) return } remoteConn, err := createTcpConn(remoteAddr) if err != nil { fmt.Println("Create remoteAddr Error! " + err.Error()) return } go func() { defer localConn.Close() defer remoteConn.Close() _, err := io.Copy(localConn, remoteConn) if err != nil { fmt.Println("Copy error! " + err.Error()) return } }() go func() { defer localConn.Close() defer remoteConn.Close() _, err := io.Copy(remoteConn, localConn) if err != nil { fmt.Println("Copy error! " + err.Error()) return } }() }