266 lines
4.7 KiB
Go
266 lines
4.7 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"bytes"
|
||
|
"encoding/binary"
|
||
|
"io"
|
||
|
"net"
|
||
|
"strings"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"github.com/rs/zerolog/log"
|
||
|
)
|
||
|
|
||
|
type Client interface {
|
||
|
DeviceID() string
|
||
|
DeviceCategory() string
|
||
|
|
||
|
WriteMsg(typ int, data []byte)
|
||
|
Close()
|
||
|
}
|
||
|
|
||
|
const HEART_BEAT_INTERVAL = time.Second * 5 // 心跳超时时间
|
||
|
|
||
|
const (
|
||
|
MSG_TYPE_UNKOWNM = iota
|
||
|
MSG_TYPE_HEARTBEAT
|
||
|
MSG_TYPE_REGISTER
|
||
|
MSG_TYPE_SESSION_CREATE
|
||
|
MSG_TYPE_SESSION_DATA
|
||
|
MSG_TYPE_SESSION_DESTORY
|
||
|
MSG_TYPE_TUNNEL_CREATE
|
||
|
MSG_TYPE_MAX
|
||
|
)
|
||
|
|
||
|
type Device struct {
|
||
|
br *Broker
|
||
|
id string
|
||
|
category string
|
||
|
desc string /* description of the device */
|
||
|
conn net.Conn
|
||
|
create_time int64 /* connection time */
|
||
|
active time.Time
|
||
|
uptime uint32
|
||
|
token string
|
||
|
registered bool
|
||
|
closed uint32
|
||
|
send chan []byte // Buffered channel of outbound messages.
|
||
|
}
|
||
|
|
||
|
type deviceMessage struct {
|
||
|
sid string
|
||
|
typ int
|
||
|
data []byte
|
||
|
}
|
||
|
|
||
|
func (dev *Device) DeviceID() string {
|
||
|
return dev.id
|
||
|
}
|
||
|
|
||
|
func (dev *Device) DeviceCategory() string {
|
||
|
return dev.category
|
||
|
}
|
||
|
|
||
|
// 往设备写数据
|
||
|
func (dev *Device) WriteMsg(typ int, data []byte) {
|
||
|
b := []byte{byte(typ), 0, 0}
|
||
|
|
||
|
binary.BigEndian.PutUint16(b[1:], uint16(len(data)))
|
||
|
|
||
|
dev.send <- append(b, data...)
|
||
|
}
|
||
|
|
||
|
func (dev *Device) Close() {
|
||
|
if atomic.LoadUint32(&dev.closed) == 1 {
|
||
|
return
|
||
|
}
|
||
|
atomic.StoreUint32(&dev.closed, 1)
|
||
|
|
||
|
log.Debug().Msgf("Device '%s' disconnected", dev.conn.RemoteAddr())
|
||
|
|
||
|
dev.conn.Close()
|
||
|
|
||
|
close(dev.send)
|
||
|
}
|
||
|
|
||
|
// 解析设备信息
|
||
|
func parseDeviceInfo(dev *Device, b []byte) bool {
|
||
|
|
||
|
fields := bytes.Split(b, []byte{0})
|
||
|
|
||
|
if len(fields) < 3 {
|
||
|
log.Error().Msg("msgTypeRegister: invalid")
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
dev.id = string(fields[0])
|
||
|
dev.desc = string(fields[1])
|
||
|
dev.token = string(fields[2])
|
||
|
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// 解析设备心跳
|
||
|
func parseHeartbeat(dev *Device, b []byte) {
|
||
|
dev.uptime = binary.BigEndian.Uint32(b[:4])
|
||
|
}
|
||
|
|
||
|
func (dev *Device) readLoop() {
|
||
|
defer func() {
|
||
|
dev.br.unregister <- dev
|
||
|
}()
|
||
|
|
||
|
reader := bufio.NewReader(dev.conn)
|
||
|
|
||
|
for {
|
||
|
b, err := reader.Peek(5)
|
||
|
if err != nil {
|
||
|
if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
|
||
|
log.Error().Msg(err.Error())
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
reader.Discard(5)
|
||
|
|
||
|
msg_type := b[0]
|
||
|
|
||
|
if msg_type >= MSG_TYPE_MAX {
|
||
|
log.Error().Msgf("invalid msg type: %d", msg_type)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
msg_length := binary.BigEndian.Uint32(b[1:])
|
||
|
data := make([]byte, msg_length)
|
||
|
_, err = io.ReadFull(reader, data)
|
||
|
if err != nil {
|
||
|
log.Error().Msg(err.Error())
|
||
|
return
|
||
|
}
|
||
|
|
||
|
dev.active = time.Now()
|
||
|
|
||
|
switch msg_type {
|
||
|
case MSG_TYPE_REGISTER:
|
||
|
if msg_length < 2 {
|
||
|
log.Error().Msg("msgTypeRegister: invalid")
|
||
|
return
|
||
|
}
|
||
|
if !parseDeviceInfo(dev, data) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
dev.br.register <- dev
|
||
|
|
||
|
case MSG_TYPE_SESSION_CREATE:
|
||
|
|
||
|
if msg_length < 33 {
|
||
|
log.Error().Msg("msgTypeLogin: invalid")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
dev.br.deviceMessageChan <- deviceMessage{string(data[:32]), MSG_TYPE_SESSION_CREATE, data[32:]}
|
||
|
|
||
|
case MSG_TYPE_SESSION_DATA:
|
||
|
if msg_length < 32 {
|
||
|
log.Error().Msg("msgTypeTermData|msgTypeFile: invalid")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
dev.br.deviceMessageChan <- deviceMessage{string(data[:32]), MSG_TYPE_SESSION_DATA, data[32:]}
|
||
|
|
||
|
case MSG_TYPE_HEARTBEAT:
|
||
|
parseHeartbeat(dev, b)
|
||
|
|
||
|
default:
|
||
|
log.Error().Msgf("invalid msg type: %d", msg_type)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (dev *Device) writeLoop() {
|
||
|
ticker := time.NewTicker(time.Second)
|
||
|
|
||
|
defer func() {
|
||
|
ticker.Stop()
|
||
|
dev.br.unregister <- dev
|
||
|
}()
|
||
|
|
||
|
ninactive := 0
|
||
|
lastHeartbeat := time.Now()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case msg, ok := <-dev.send:
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
_, err := dev.conn.Write(msg)
|
||
|
if err != nil {
|
||
|
log.Error().Msg(err.Error())
|
||
|
return
|
||
|
}
|
||
|
|
||
|
case <-ticker.C:
|
||
|
now := time.Now()
|
||
|
if now.Sub(dev.active) > HEART_BEAT_INTERVAL*3/2 {
|
||
|
if dev.id == "" {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
log.Error().Msgf("Inactive device in long time: %s", dev.id)
|
||
|
if ninactive > 3 {
|
||
|
log.Error().Msgf("Inactive 3 times, now kill it: %s", dev.id)
|
||
|
return
|
||
|
}
|
||
|
ninactive = ninactive + 1
|
||
|
}
|
||
|
|
||
|
if now.Sub(lastHeartbeat) > HEART_BEAT_INTERVAL-1 {
|
||
|
lastHeartbeat = now
|
||
|
if len(dev.send) < 1 {
|
||
|
dev.WriteMsg(MSG_TYPE_HEARTBEAT, []byte{})
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func listenDevice(br *Broker) {
|
||
|
|
||
|
ln, err := net.Listen("tcp", br.cfg.AddrDev)
|
||
|
if err != nil {
|
||
|
log.Error().Msgf(err.Error())
|
||
|
}
|
||
|
|
||
|
log.Info().Msgf("Listen Device on: %s", br.cfg.AddrDev)
|
||
|
|
||
|
go func() {
|
||
|
defer ln.Close()
|
||
|
|
||
|
for {
|
||
|
conn, err := ln.Accept()
|
||
|
if err != nil {
|
||
|
log.Error().Msg(err.Error())
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
log.Debug().Msgf("Device '%s' connected", conn.RemoteAddr())
|
||
|
|
||
|
dev := &Device{
|
||
|
br: br,
|
||
|
conn: conn,
|
||
|
create_time: time.Now().Unix(),
|
||
|
send: make(chan []byte, 256),
|
||
|
}
|
||
|
|
||
|
go dev.readLoop()
|
||
|
go dev.writeLoop()
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
}
|