diff --git a/client/main.go b/client/main.go index 6e5ee40..3c3f9c3 100644 --- a/client/main.go +++ b/client/main.go @@ -26,6 +26,8 @@ import ( // 创建SESSION 03 00 00 00 type Config struct { DeviceId string + DeviceType string + DeviceDesc string AddrServer string } @@ -73,13 +75,13 @@ func main() { Flags: []cli.Flag{ &cli.StringFlag{ Name: "log", - Value: "log.txt", + Value: "goproxy.log", Usage: "log file path", }, &cli.StringFlag{ Name: "conf", Aliases: []string{"c"}, - Value: "./rttys.conf", + Value: "./goproxy.conf", Usage: "config file to load", }, &cli.StringFlag{ @@ -117,13 +119,7 @@ func main() { } } -func runClient(c *cli.Context) { - - cfg := &Config{ - DeviceId: "jmq", - AddrServer: "dev.xzrobot.com:9011", - } - +func reConnect(dev *Device, cfg *Config) { tcpConn, err := createTcpConn(cfg.AddrServer) if err != nil { fmt.Println("TCP Connect Error! " + err.Error()) @@ -131,17 +127,8 @@ func runClient(c *cli.Context) { } fmt.Println(tcpConn.LocalAddr().String() + " : Client Connected") - dev := &Device{ - id: cfg.DeviceId, - category: "device", - conn: tcpConn, - create_time: time.Now().Unix(), - active: time.Now(), - registered: false, - closed: 0, - send: make(chan []byte, 100), - sessions: make(map[string]*Session), - } + dev.conn = tcpConn + dev.closed = 0 go dev.readLoop() go dev.writeLoop() @@ -151,8 +138,61 @@ func runClient(c *cli.Context) { s[1] = []byte("desc") //desc s[2] = []byte("token") //token dev.WriteMsg(MSG_TYPE_REGISTER, bytes.Join(s, []byte{0})) +} + +func runClient(c *cli.Context) { + + cfg := &Config{ + DeviceId: "jmq", + DeviceType: "device", + DeviceDesc: "jmq", + AddrServer: "dev.xzrobot.com:9011", + } + + for { + + dev := &Device{ + id: cfg.DeviceId, + category: cfg.DeviceType, + desc: cfg.DeviceDesc, + create_time: time.Now().Unix(), + active: time.Now(), + registered: false, + closed: 1, + send: make(chan []byte, 1024), + sessions: make(map[string]*Session), + } + + // tcpConn, err := createTcpConn(cfg.AddrServer) + // if err != nil { + // fmt.Println("TCP Connect Error! " + err.Error()) + // return + // } + // fmt.Println(tcpConn.LocalAddr().String() + " : Client Connected") + + // go dev.readLoop() + // go dev.writeLoop() + + //registe + // s := make([][]byte, 3) + // s[0] = []byte("jmq") //id + // s[1] = []byte("desc") //desc + // s[2] = []byte("token") //token + // dev.WriteMsg(MSG_TYPE_REGISTER, bytes.Join(s, []byte{0})) + + t := time.NewTicker(time.Second * 1) + for { + select { + case <-t.C: + if dev.closed == 1 { + reConnect(dev, cfg) + } + } + } + + log.Info().Msgf("main loop exit") + } - select {} } func (dev *Device) WriteMsg(typ int, data []byte) { @@ -172,6 +212,7 @@ func (dev *Device) readLoop() { if err != nil { if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { log.Error().Msg(err.Error()) + dev.closed = 1 } return } @@ -280,13 +321,13 @@ func (dev *Device) readLoop() { log.Error().Msgf("invalid msg type: %d", msg_type) } } - } func (dev *Device) writeLoop() { - ticker := time.NewTicker(time.Second) - defer dev.conn.Close() + defer log.Debug().Msgf("dev writeLoop finished") + + ticker := time.NewTicker(time.Second) ninactive := 0 lastHeartbeat := time.Now() @@ -301,6 +342,7 @@ func (dev *Device) writeLoop() { _, err := dev.conn.Write(msg) if err != nil { log.Error().Msg(err.Error()) + dev.closed = 1 return } @@ -314,6 +356,7 @@ func (dev *Device) writeLoop() { log.Error().Msgf("Inactive device in long time: %s", dev.id) if ninactive > 3 { log.Error().Msgf("Inactive 3 times, now kill it: %s", dev.id) + dev.closed = 1 return } ninactive = ninactive + 1 @@ -322,7 +365,7 @@ func (dev *Device) writeLoop() { if now.Sub(lastHeartbeat) > HEART_BEAT_INTERVAL-1 { lastHeartbeat = now if len(dev.send) < 1 { - // log.Info().Msgf("Send Heartbeat Time: %d", time.Now().Unix()) + log.Info().Msgf("Transmit Heartbeat Time: %d", time.Now().Unix()) dev.WriteMsg(MSG_TYPE_HEARTBEAT, []byte{}) } }