package main import ( "bufio" "bytes" "encoding/binary" "io" "net" "strings" "sync/atomic" "time" "github.com/rs/zerolog/log" ) type Client interface { DeviceID() string DeviceCategory() string WriteMsg(typ int, data []byte) Close() } const HEART_BEAT_INTERVAL = time.Second * 5 // 心跳超时时间 const ( MSG_TYPE_UNKOWNM = iota MSG_TYPE_HEARTBEAT MSG_TYPE_REGISTER MSG_TYPE_SESSION_CREATE MSG_TYPE_SESSION_DATA MSG_TYPE_SESSION_DESTORY MSG_TYPE_TUNNEL_CREATE MSG_TYPE_MAX ) type Device struct { br *Broker id string category string desc string /* description of the device */ conn net.Conn create_time int64 /* connection time */ active time.Time uptime uint32 token string registered bool closed uint32 send chan []byte // Buffered channel of outbound messages. } type deviceMessage struct { sid string typ int data []byte } func (dev *Device) DeviceID() string { return dev.id } func (dev *Device) DeviceCategory() string { return dev.category } // 往设备写数据 func (dev *Device) WriteMsg(typ int, data []byte) { b := []byte{byte(typ), 0, 0, 0, 0} binary.BigEndian.PutUint32(b[1:], uint32(len(data))) dev.send <- append(b, data...) } func (dev *Device) Close() { if atomic.LoadUint32(&dev.closed) == 1 { return } atomic.StoreUint32(&dev.closed, 1) log.Debug().Msgf("Device '%s' disconnected", dev.conn.RemoteAddr()) dev.conn.Close() close(dev.send) } // 解析设备信息 func parseDeviceInfo(dev *Device, b []byte) bool { fields := bytes.Split(b, []byte{0}) //通过\0分隔 if len(fields) < 3 { log.Error().Msg("msgTypeRegister: invalid") return false } dev.id = string(fields[0]) dev.desc = string(fields[1]) dev.token = string(fields[2]) return true } // 解析设备心跳 func parseHeartbeat(dev *Device, b []byte) { dev.uptime = binary.BigEndian.Uint32(b[:4]) } func (dev *Device) readLoop() { defer func() { dev.br.unregister <- dev }() reader := bufio.NewReader(dev.conn) for { b, err := reader.Peek(5) if err != nil { if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { log.Error().Msg(err.Error()) } return } reader.Discard(5) msg_type := b[0] if msg_type >= MSG_TYPE_MAX { log.Error().Msgf("invalid msg type: %d", msg_type) return } msg_length := binary.BigEndian.Uint32(b[1:]) data := make([]byte, msg_length) _, err = io.ReadFull(reader, data) if err != nil { log.Error().Msg(err.Error()) return } dev.active = time.Now() switch msg_type { case MSG_TYPE_REGISTER: if msg_length < 2 { log.Error().Msg("msgTypeRegister: invalid") return } if !parseDeviceInfo(dev, data) { return } dev.WriteMsg(MSG_TYPE_TUNNEL_CREATE, []byte{}) dev.br.register <- dev case MSG_TYPE_SESSION_CREATE: if msg_length < 33 { log.Error().Msg("msgTypeLogin: invalid") return } dev.br.deviceMessageChan <- deviceMessage{string(data[:32]), MSG_TYPE_SESSION_CREATE, data[32:]} case MSG_TYPE_SESSION_DATA: if msg_length < 32 { log.Error().Msg("msgTypeTermData|msgTypeFile: invalid") return } dev.br.deviceMessageChan <- deviceMessage{string(data[:32]), MSG_TYPE_SESSION_DATA, data[32:]} case MSG_TYPE_HEARTBEAT: parseHeartbeat(dev, b) default: log.Error().Msgf("invalid msg type: %d", msg_type) } } } func (dev *Device) writeLoop() { ticker := time.NewTicker(time.Second) defer func() { ticker.Stop() dev.br.unregister <- dev }() ninactive := 0 lastHeartbeat := time.Now() for { select { case msg, ok := <-dev.send: if !ok { return } _, err := dev.conn.Write(msg) if err != nil { log.Error().Msg(err.Error()) return } case <-ticker.C: now := time.Now() if now.Sub(dev.active) > HEART_BEAT_INTERVAL*3/2 { if dev.id == "" { continue // 未注册直接踢出 } log.Error().Msgf("Inactive device in long time: %s", dev.id) if ninactive > 3 { log.Error().Msgf("Inactive 3 times, now kill it: %s", dev.id) return } ninactive = ninactive + 1 } if now.Sub(lastHeartbeat) > HEART_BEAT_INTERVAL-1 { lastHeartbeat = now if len(dev.send) < 1 { dev.WriteMsg(MSG_TYPE_HEARTBEAT, []byte{}) } } } } } func listenDevice(br *Broker) { ln, err := net.Listen("tcp", br.cfg.AddrDev) if err != nil { log.Error().Msgf(err.Error()) } log.Info().Msgf("Listen Device on: %s", br.cfg.AddrDev) go func() { defer ln.Close() for { conn, err := ln.Accept() if err != nil { log.Error().Msg(err.Error()) continue } log.Debug().Msgf("Device '%s' connected", conn.RemoteAddr()) dev := &Device{ br: br, conn: conn, category: "device", active: time.Now(), create_time: time.Now().Unix(), send: make(chan []byte, 256), } go dev.readLoop() go dev.writeLoop() } }() }