267 lines
4.9 KiB
Go
267 lines
4.9 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"encoding/binary"
|
|
"io"
|
|
"net"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type Client interface {
|
|
DeviceID() string
|
|
DeviceCategory() string
|
|
|
|
WriteMsg(typ int, data []byte)
|
|
Close()
|
|
}
|
|
|
|
const HEART_BEAT_INTERVAL = time.Second * 5 // 心跳超时时间
|
|
|
|
const (
|
|
MSG_TYPE_UNKOWNM = iota
|
|
MSG_TYPE_REGISTER
|
|
MSG_TYPE_HEARTBEAT
|
|
MSG_TYPE_SESSION_CREATE
|
|
MSG_TYPE_SESSION_DATA
|
|
MSG_TYPE_SESSION_DESTORY
|
|
MSG_TYPE_TUNNEL_CREATE
|
|
MSG_TYPE_MAX
|
|
)
|
|
|
|
type Device struct {
|
|
br *Broker
|
|
id string
|
|
category string
|
|
desc string /* description of the device */
|
|
conn net.Conn
|
|
create_time int64 /* connection time */
|
|
active time.Time
|
|
uptime uint32
|
|
token string
|
|
registered bool
|
|
closed uint32
|
|
send chan []byte // Buffered channel of outbound messages.
|
|
}
|
|
|
|
type deviceMessage struct {
|
|
sid string
|
|
typ int
|
|
data []byte
|
|
}
|
|
|
|
func (dev *Device) DeviceID() string {
|
|
return dev.id
|
|
}
|
|
|
|
func (dev *Device) DeviceCategory() string {
|
|
return dev.category
|
|
}
|
|
|
|
// 往设备写数据
|
|
func (dev *Device) WriteMsg(typ int, data []byte) {
|
|
b := []byte{byte(typ), 0, 0, 0, 0}
|
|
|
|
binary.BigEndian.PutUint32(b[1:], uint32(len(data)))
|
|
|
|
dev.send <- append(b, data...)
|
|
}
|
|
|
|
func (dev *Device) Close() {
|
|
if atomic.LoadUint32(&dev.closed) == 1 {
|
|
return
|
|
}
|
|
atomic.StoreUint32(&dev.closed, 1)
|
|
|
|
log.Debug().Msgf("Device '%s' disconnected", dev.conn.RemoteAddr())
|
|
|
|
dev.conn.Close()
|
|
|
|
close(dev.send)
|
|
}
|
|
|
|
// 解析设备信息
|
|
func parseDeviceInfo(dev *Device, b []byte) bool {
|
|
|
|
fields := bytes.Split(b, []byte{0}) //通过\0分隔
|
|
|
|
if len(fields) < 3 {
|
|
log.Error().Msg("msgTypeRegister: invalid")
|
|
return false
|
|
}
|
|
|
|
dev.id = string(fields[0])
|
|
dev.desc = string(fields[1])
|
|
dev.token = string(fields[2])
|
|
|
|
return true
|
|
}
|
|
|
|
// 解析设备心跳
|
|
func parseHeartbeat(dev *Device, b []byte) {
|
|
dev.uptime = binary.BigEndian.Uint32(b[:4])
|
|
}
|
|
|
|
func (dev *Device) readLoop() {
|
|
defer func() {
|
|
dev.br.unregister <- dev
|
|
}()
|
|
|
|
reader := bufio.NewReader(dev.conn)
|
|
|
|
for {
|
|
b, err := reader.Peek(5)
|
|
if err != nil {
|
|
if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
|
|
log.Error().Msg(err.Error())
|
|
}
|
|
return
|
|
}
|
|
|
|
reader.Discard(5)
|
|
|
|
msg_type := b[0]
|
|
|
|
if msg_type >= MSG_TYPE_MAX {
|
|
log.Error().Msgf("invalid msg type: %d", msg_type)
|
|
return
|
|
}
|
|
|
|
msg_length := binary.BigEndian.Uint32(b[1:])
|
|
data := make([]byte, msg_length)
|
|
_, err = io.ReadFull(reader, data)
|
|
if err != nil {
|
|
log.Error().Msg(err.Error())
|
|
return
|
|
}
|
|
|
|
dev.active = time.Now()
|
|
|
|
switch msg_type {
|
|
case MSG_TYPE_REGISTER:
|
|
if msg_length < 2 {
|
|
log.Error().Msg("msgTypeRegister: invalid")
|
|
return
|
|
}
|
|
if !parseDeviceInfo(dev, data) {
|
|
log.Error().Msg("parseDeviceInfo Failed")
|
|
return
|
|
}
|
|
// dev.WriteMsg(MSG_TYPE_TUNNEL_CREATE, []byte{})
|
|
dev.br.register <- dev
|
|
|
|
case MSG_TYPE_SESSION_CREATE:
|
|
|
|
if msg_length < 33 {
|
|
log.Error().Msg("msgTypeLogin: invalid")
|
|
return
|
|
}
|
|
|
|
dev.br.deviceMessageChan <- deviceMessage{string(data[:32]), MSG_TYPE_SESSION_CREATE, data[32:]}
|
|
|
|
case MSG_TYPE_SESSION_DATA:
|
|
if msg_length < 32 {
|
|
log.Error().Msg("msgTypeTermData|msgTypeFile: invalid")
|
|
return
|
|
}
|
|
|
|
dev.br.deviceMessageChan <- deviceMessage{string(data[:32]), MSG_TYPE_SESSION_DATA, data[32:]}
|
|
case MSG_TYPE_HEARTBEAT:
|
|
parseHeartbeat(dev, b)
|
|
|
|
default:
|
|
log.Error().Msgf("invalid msg type: %d", msg_type)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (dev *Device) writeLoop() {
|
|
ticker := time.NewTicker(time.Second)
|
|
|
|
defer func() {
|
|
ticker.Stop()
|
|
dev.br.unregister <- dev
|
|
}()
|
|
|
|
ninactive := 0
|
|
lastHeartbeat := time.Now()
|
|
|
|
for {
|
|
select {
|
|
case msg, ok := <-dev.send:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
_, err := dev.conn.Write(msg)
|
|
if err != nil {
|
|
log.Error().Msg(err.Error())
|
|
return
|
|
}
|
|
|
|
case <-ticker.C:
|
|
now := time.Now()
|
|
if now.Sub(dev.active) > HEART_BEAT_INTERVAL*3/2 {
|
|
if dev.id == "" {
|
|
continue // 未注册直接踢出
|
|
}
|
|
|
|
log.Error().Msgf("Inactive device in long time: %s", dev.id)
|
|
if ninactive > 3 {
|
|
log.Error().Msgf("Inactive 3 times, now kill it: %s", dev.id)
|
|
return
|
|
}
|
|
ninactive = ninactive + 1
|
|
}
|
|
|
|
if now.Sub(lastHeartbeat) > HEART_BEAT_INTERVAL-1 {
|
|
lastHeartbeat = now
|
|
if len(dev.send) < 1 {
|
|
dev.WriteMsg(MSG_TYPE_HEARTBEAT, []byte{})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func listenDevice(br *Broker) {
|
|
|
|
ln, err := net.Listen("tcp", br.cfg.AddrDev)
|
|
if err != nil {
|
|
log.Error().Msgf(err.Error())
|
|
}
|
|
|
|
log.Info().Msgf("Listen Device on: %s", br.cfg.AddrDev)
|
|
|
|
go func() {
|
|
defer ln.Close()
|
|
|
|
for {
|
|
conn, err := ln.Accept()
|
|
if err != nil {
|
|
log.Error().Msg(err.Error())
|
|
continue
|
|
}
|
|
|
|
log.Debug().Msgf("Device '%s' connected", conn.RemoteAddr())
|
|
|
|
dev := &Device{
|
|
br: br,
|
|
conn: conn,
|
|
category: "device",
|
|
active: time.Now(),
|
|
create_time: time.Now().Unix(),
|
|
send: make(chan []byte, 256),
|
|
}
|
|
|
|
go dev.readLoop()
|
|
go dev.writeLoop()
|
|
}
|
|
}()
|
|
}
|