160 lines
3.4 KiB
Go
160 lines
3.4 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"github.com/gorilla/websocket"
|
||
|
"github.com/rs/zerolog/log"
|
||
|
)
|
||
|
|
||
|
type Session struct {
|
||
|
dev Client
|
||
|
user Client
|
||
|
confirmed uint32
|
||
|
}
|
||
|
|
||
|
type Broker struct {
|
||
|
cfg *Config
|
||
|
devices map[string]Client
|
||
|
sessions map[string]*Session
|
||
|
register chan Client
|
||
|
unregister chan Client
|
||
|
deviceMessageChan chan deviceMessage
|
||
|
// userMessageChan chan userMessage
|
||
|
}
|
||
|
|
||
|
func newBroker(cfg *Config) *Broker {
|
||
|
return &Broker{
|
||
|
cfg: cfg,
|
||
|
devices: make(map[string]Client),
|
||
|
sessions: make(map[string]*Session),
|
||
|
register: make(chan Client, 1000),
|
||
|
unregister: make(chan Client, 1000),
|
||
|
deviceMessageChan: make(chan deviceMessage, 1000),
|
||
|
// userMessageChan: make(chan userMessage, 1000),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (br *Broker) run() {
|
||
|
for {
|
||
|
select {
|
||
|
case c := <-br.register:
|
||
|
devid := c.DeviceID()
|
||
|
|
||
|
if c.DeviceCategory() == "device" {
|
||
|
dev := c.(*Device)
|
||
|
err := byte(0)
|
||
|
msg := "OK"
|
||
|
|
||
|
if _, ok := br.devices[devid]; ok {
|
||
|
log.Error().Msg("Device ID conflicting: " + devid)
|
||
|
msg = "ID conflicting"
|
||
|
err = 1
|
||
|
} else {
|
||
|
dev.registered = true
|
||
|
br.devices[devid] = c
|
||
|
log.Info().Msgf("Device '%s' registered", devid)
|
||
|
}
|
||
|
|
||
|
c.WriteMsg(MSG_TYPE_REGISTER, append([]byte{err}, msg...))
|
||
|
|
||
|
if err > 0 {
|
||
|
// ensure the last packet was sent
|
||
|
time.AfterFunc(time.Millisecond*100, func() {
|
||
|
dev.Close()
|
||
|
})
|
||
|
}
|
||
|
} else {
|
||
|
if dev, ok := br.devices[devid]; ok {
|
||
|
sid := GenUniqueID("sid")
|
||
|
|
||
|
s := &Session{
|
||
|
dev: dev,
|
||
|
user: c,
|
||
|
}
|
||
|
|
||
|
time.AfterFunc(time.Second*3, func() {
|
||
|
if atomic.LoadUint32(&s.confirmed) == 0 {
|
||
|
c.Close()
|
||
|
}
|
||
|
})
|
||
|
|
||
|
br.sessions[sid] = s
|
||
|
|
||
|
dev.WriteMsg(MSG_TYPE_SESSION_CREATE, []byte(sid))
|
||
|
log.Info().Msg("New session: " + sid)
|
||
|
} else {
|
||
|
log.Error().Msgf("Not found the device '%s'", devid)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
case c := <-br.unregister:
|
||
|
devid := c.DeviceID()
|
||
|
|
||
|
c.Close()
|
||
|
|
||
|
if c.DeviceCategory() == "device" {
|
||
|
if !c.(*Device).registered {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
delete(br.devices, devid)
|
||
|
|
||
|
for sid, s := range br.sessions {
|
||
|
if s.dev == c {
|
||
|
s.user.Close()
|
||
|
delete(br.sessions, sid)
|
||
|
log.Info().Msg("Delete session: " + sid)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
log.Info().Msgf("Device '%s' unregistered", devid)
|
||
|
} else {
|
||
|
sid := c.(*Device).DeviceID()
|
||
|
|
||
|
if _, ok := br.sessions[sid]; ok {
|
||
|
delete(br.sessions, sid)
|
||
|
|
||
|
if dev, ok := br.devices[devid]; ok {
|
||
|
dev.WriteMsg(MSG_TYPE_SESSION_DESTORY, []byte(sid))
|
||
|
}
|
||
|
|
||
|
log.Info().Msg("Delete session: " + sid)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
case msg := <-br.deviceMessageChan:
|
||
|
if msg.typ == MSG_TYPE_SESSION_CREATE {
|
||
|
if s, ok := br.sessions[msg.sid]; ok {
|
||
|
|
||
|
atomic.StoreUint32(&s.confirmed, 1)
|
||
|
|
||
|
// u := s.user.(*user)
|
||
|
// u.sid = msg.sid
|
||
|
|
||
|
// userLoginAck(loginErrorNone, s.user)
|
||
|
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if msg.typ == MSG_TYPE_SESSION_DATA {
|
||
|
if s, ok := br.sessions[msg.sid]; ok {
|
||
|
s.user.WriteMsg(websocket.BinaryMessage, msg.data)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// case msg := <-br.userMessageChan:
|
||
|
// if s, ok := br.sessions[msg.sid]; ok {
|
||
|
// if dev, ok := br.devices[s.dev.DeviceID()]; ok {
|
||
|
// data := msg.data
|
||
|
|
||
|
// if msg.typ == websocket.BinaryMessage {
|
||
|
// dev.WriteMsg(MSG_TYPE_SESSION_DATA, append([]byte(msg.sid), data[1:]...))
|
||
|
// }
|
||
|
// }
|
||
|
// }
|
||
|
}
|
||
|
}
|
||
|
}
|