package main import ( "sync/atomic" "time" "github.com/gorilla/websocket" "github.com/rs/zerolog/log" ) type Session struct { dev Client user Client confirmed uint32 } type Broker struct { cfg *Config devices map[string]Client sessions map[string]*Session register chan Client unregister chan Client deviceMessageChan chan deviceMessage // userMessageChan chan userMessage } func newBroker(cfg *Config) *Broker { return &Broker{ cfg: cfg, devices: make(map[string]Client), sessions: make(map[string]*Session), register: make(chan Client, 1000), unregister: make(chan Client, 1000), deviceMessageChan: make(chan deviceMessage, 1000), // userMessageChan: make(chan userMessage, 1000), } } func (br *Broker) run() { for { select { case c := <-br.register: devid := c.DeviceID() if c.DeviceCategory() == "device" { dev := c.(*Device) err := byte(0) msg := "OK" if _, ok := br.devices[devid]; ok { log.Error().Msg("Device ID conflicting: " + devid) msg = "ID conflicting" err = 1 } else { dev.registered = true br.devices[devid] = c log.Info().Msgf("Device '%s' registered", devid) } c.WriteMsg(MSG_TYPE_REGISTER, append([]byte{err}, msg...)) if err > 0 { // ensure the last packet was sent time.AfterFunc(time.Millisecond*100, func() { dev.Close() }) } } else { if dev, ok := br.devices[devid]; ok { sid := GenUniqueID("sid") s := &Session{ dev: dev, user: c, } time.AfterFunc(time.Second*3, func() { if atomic.LoadUint32(&s.confirmed) == 0 { c.Close() } }) br.sessions[sid] = s dev.WriteMsg(MSG_TYPE_SESSION_CREATE, []byte(sid)) log.Info().Msg("New session: " + sid) } else { log.Error().Msgf("Not found the device '%s'", devid) } } case c := <-br.unregister: devid := c.DeviceID() c.Close() if c.DeviceCategory() == "device" { if !c.(*Device).registered { break } delete(br.devices, devid) for sid, s := range br.sessions { if s.dev == c { s.user.Close() delete(br.sessions, sid) log.Info().Msg("Delete session: " + sid) } } log.Info().Msgf("Device '%s' unregistered", devid) } else { sid := c.(*Device).DeviceID() if _, ok := br.sessions[sid]; ok { delete(br.sessions, sid) if dev, ok := br.devices[devid]; ok { dev.WriteMsg(MSG_TYPE_SESSION_DESTORY, []byte(sid)) } log.Info().Msg("Delete session: " + sid) } } case msg := <-br.deviceMessageChan: if msg.typ == MSG_TYPE_SESSION_CREATE { if s, ok := br.sessions[msg.sid]; ok { atomic.StoreUint32(&s.confirmed, 1) // u := s.user.(*user) // u.sid = msg.sid // userLoginAck(loginErrorNone, s.user) } } if msg.typ == MSG_TYPE_SESSION_DATA { if s, ok := br.sessions[msg.sid]; ok { s.user.WriteMsg(websocket.BinaryMessage, msg.data) } } // case msg := <-br.userMessageChan: // if s, ok := br.sessions[msg.sid]; ok { // if dev, ok := br.devices[s.dev.DeviceID()]; ok { // data := msg.data // if msg.typ == websocket.BinaryMessage { // dev.WriteMsg(MSG_TYPE_SESSION_DATA, append([]byte(msg.sid), data[1:]...)) // } // } // } } } }