goproxy/server/broker.go

160 lines
3.4 KiB
Go
Raw Permalink Normal View History

2024-01-09 13:00:48 +00:00
package main
import (
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
)
type Session struct {
dev Client
user Client
confirmed uint32
}
type Broker struct {
cfg *Config
devices map[string]Client
sessions map[string]*Session
register chan Client
unregister chan Client
deviceMessageChan chan deviceMessage
// userMessageChan chan userMessage
}
func newBroker(cfg *Config) *Broker {
return &Broker{
cfg: cfg,
devices: make(map[string]Client),
sessions: make(map[string]*Session),
register: make(chan Client, 1000),
unregister: make(chan Client, 1000),
deviceMessageChan: make(chan deviceMessage, 1000),
// userMessageChan: make(chan userMessage, 1000),
}
}
func (br *Broker) run() {
for {
select {
case c := <-br.register:
devid := c.DeviceID()
if c.DeviceCategory() == "device" {
dev := c.(*Device)
err := byte(0)
msg := "OK"
if _, ok := br.devices[devid]; ok {
log.Error().Msg("Device ID conflicting: " + devid)
msg = "ID conflicting"
err = 1
} else {
dev.registered = true
br.devices[devid] = c
log.Info().Msgf("Device '%s' registered", devid)
}
c.WriteMsg(MSG_TYPE_REGISTER, append([]byte{err}, msg...))
if err > 0 {
// ensure the last packet was sent
time.AfterFunc(time.Millisecond*100, func() {
dev.Close()
})
}
} else {
if dev, ok := br.devices[devid]; ok {
sid := GenUniqueID("sid")
s := &Session{
dev: dev,
user: c,
}
time.AfterFunc(time.Second*3, func() {
if atomic.LoadUint32(&s.confirmed) == 0 {
c.Close()
}
})
br.sessions[sid] = s
dev.WriteMsg(MSG_TYPE_SESSION_CREATE, []byte(sid))
log.Info().Msg("New session: " + sid)
} else {
log.Error().Msgf("Not found the device '%s'", devid)
}
}
case c := <-br.unregister:
devid := c.DeviceID()
c.Close()
if c.DeviceCategory() == "device" {
if !c.(*Device).registered {
break
}
delete(br.devices, devid)
for sid, s := range br.sessions {
if s.dev == c {
s.user.Close()
delete(br.sessions, sid)
log.Info().Msg("Delete session: " + sid)
}
}
log.Info().Msgf("Device '%s' unregistered", devid)
} else {
sid := c.(*Device).DeviceID()
if _, ok := br.sessions[sid]; ok {
delete(br.sessions, sid)
if dev, ok := br.devices[devid]; ok {
dev.WriteMsg(MSG_TYPE_SESSION_DESTORY, []byte(sid))
}
log.Info().Msg("Delete session: " + sid)
}
}
case msg := <-br.deviceMessageChan:
if msg.typ == MSG_TYPE_SESSION_CREATE {
if s, ok := br.sessions[msg.sid]; ok {
atomic.StoreUint32(&s.confirmed, 1)
// u := s.user.(*user)
// u.sid = msg.sid
// userLoginAck(loginErrorNone, s.user)
}
}
if msg.typ == MSG_TYPE_SESSION_DATA {
if s, ok := br.sessions[msg.sid]; ok {
s.user.WriteMsg(websocket.BinaryMessage, msg.data)
}
}
// case msg := <-br.userMessageChan:
// if s, ok := br.sessions[msg.sid]; ok {
// if dev, ok := br.devices[s.dev.DeviceID()]; ok {
// data := msg.data
// if msg.typ == websocket.BinaryMessage {
// dev.WriteMsg(MSG_TYPE_SESSION_DATA, append([]byte(msg.sid), data[1:]...))
// }
// }
// }
}
}
}