diff --git a/broker/broker.go b/broker/broker.go index cf772c9..8d51929 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -2,13 +2,13 @@ package broker import ( "crypto/tls" + encJson "encoding/json" "errors" "fmt" "net" "net/http" "sync" "time" - encJson "encoding/json" "github.com/fhmq/hmq/broker/lib/sessions" "github.com/fhmq/hmq/broker/lib/topics" @@ -205,8 +205,8 @@ func (b *Broker) StartWebsocketListening() { func (b *Broker) wsHandler(ws *websocket.Conn) { // io.Copy(ws, ws) ws.PayloadType = websocket.BinaryFrame - err:=b.handleConnection(CLIENT, ws) - if err!=nil{ + err := b.handleConnection(CLIENT, ws) + if err != nil { ws.Close() } } @@ -259,9 +259,9 @@ func (b *Broker) StartClientListening(Tls bool) { } tmpDelay = ACCEPT_MIN_SLEEP - go func(){ - err :=b.handleConnection(CLIENT, conn) - if err!=nil{ + go func() { + err := b.handleConnection(CLIENT, conn) + if err != nil { conn.Close() } }() @@ -301,9 +301,9 @@ func (b *Broker) StartClusterListening() { } tmpDelay = ACCEPT_MIN_SLEEP - go func(){ - err :=b.handleConnection(ROUTER, conn) - if err!=nil{ + go func() { + err := b.handleConnection(ROUTER, conn) + if err != nil { conn.Close() } }() @@ -322,11 +322,11 @@ func (b *Broker) DisConnClientByClientId(clientId string) { conn.Close() } -func (b *Broker) handleConnection(typ int, conn net.Conn) error{ +func (b *Broker) handleConnection(typ int, conn net.Conn) error { //process connect packet packet, err := packets.ReadPacket(conn) if err != nil { - return errors.New(fmt.Sprintln("read connect packet error:%v",err)) + return errors.New(fmt.Sprintf("read connect packet error:%v", err)) } if packet == nil { return errors.New("received nil packet") @@ -344,21 +344,21 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) error{ if connack.ReturnCode != packets.Accepted { if err := connack.Write(conn); err != nil { - return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn)) + return fmt.Errorf("send connack error:%v,clientID:%v,conn:%v", err, msg.ClientIdentifier, conn) } - return errors.New(fmt.Sprintln("connect packet validate failed with connack.ReturnCode%v",connack.ReturnCode)) + return fmt.Errorf("connect packet validate failed with connack.ReturnCode%v", connack.ReturnCode) } if typ == CLIENT && !b.CheckConnectAuth(msg.ClientIdentifier, msg.Username, string(msg.Password)) { connack.ReturnCode = packets.ErrRefusedNotAuthorised if err := connack.Write(conn); err != nil { - return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn)) + return fmt.Errorf("send connack error:%v,clientID:%v,conn:%v", err, msg.ClientIdentifier, conn) } - return errors.New(fmt.Sprintln("connect packet CheckConnectAuth failed with connack.ReturnCode%v",connack.ReturnCode)) + return fmt.Errorf("connect packet CheckConnectAuth failed with connack.ReturnCode%v", connack.ReturnCode) } if err := connack.Write(conn); err != nil { - return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn)) + return fmt.Errorf("send connack error:%v,clientID:%v,conn:%v", err, msg.ClientIdentifier, conn) } willmsg := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) @@ -389,7 +389,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) error{ c.init() if err := b.getSession(c, msg, connack); err != nil { - return errors.New(fmt.Sprintln("get session error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn)) + return fmt.Errorf("get session error:%v,clientID:%v,conn:%v", err, msg.ClientIdentifier, conn) } cid := c.info.clientID @@ -413,13 +413,13 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) error{ pubPack.TopicName = info.willMsg.TopicName pubPack.Payload = info.willMsg.Payload } - + pubInfo := Info{ - ClientID: info.clientID, - Username: info.username, - Password: info.password, + ClientID: info.clientID, + Username: info.username, + Password: info.password, Keepalive: info.keepalive, - WillMsg: pubPack, + WillMsg: pubPack, } b.OnlineOfflineNotification(pubInfo, true, c.lastMsgTime) @@ -456,7 +456,7 @@ func (b *Broker) ConnectToDiscovery() { log.Error("Error trying to connect to route", zap.Error(err)) log.Debug("Connect to route timeout, retry...") - if 0 == tempDelay { + if tempDelay == 0 { tempDelay = 1 * time.Second } else { tempDelay *= 2 @@ -528,7 +528,7 @@ func (b *Broker) connectRouter(id, addr string) { log.Debug("Connect to route timeout, retry...") - if 0 == timeDelay { + if timeDelay == 0 { timeDelay = 1 * time.Second } else { timeDelay *= 2 @@ -714,11 +714,11 @@ func (b *Broker) BroadcastUnSubscribe(topicsToUnSubscribeFrom []string) { } type OnlineOfflineMsg struct { - ClientID string `json:"clientID"` - Online bool `json:"online"` - Timestamp string `json:"timestamp"` - ClientInfo Info `json:"info"` - LastMsgTime int64 `json:"lastMsg"` + ClientID string `json:"clientID"` + Online bool `json:"online"` + Timestamp string `json:"timestamp"` + ClientInfo Info `json:"info"` + LastMsgTime int64 `json:"lastMsg"` } func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64) { @@ -727,10 +727,10 @@ func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64 packet.Qos = 0 msg := OnlineOfflineMsg{ - ClientID: info.ClientID, - Online: online, - Timestamp: time.Now().UTC().Format(time.RFC3339), - ClientInfo: info, + ClientID: info.ClientID, + Online: online, + Timestamp: time.Now().UTC().Format(time.RFC3339), + ClientInfo: info, LastMsgTime: lastMsg, } diff --git a/broker/client.go b/broker/client.go index 223efc9..c04e4fb 100644 --- a/broker/client.go +++ b/broker/client.go @@ -79,7 +79,7 @@ type client struct { mqueue *queue.Queue retryTimer *time.Timer retryTimerLock sync.Mutex - lastMsgTime int64 + lastMsgTime int64 } type InflightStatus uint8 @@ -113,16 +113,16 @@ type info struct { } type PubPacket struct { - TopicName string `json:"topicName"` - Payload []byte `json:"payload"` + TopicName string `json:"topicName"` + Payload []byte `json:"payload"` } type Info struct { - ClientID string `json:"clientId"` - Username string `json:"username"` - Password []byte `json:"password"` - Keepalive uint16 `json:"keepalive"` - WillMsg PubPacket `json:"willMsg"` + ClientID string `json:"clientId"` + Username string `json:"username"` + Password []byte `json:"password"` + Keepalive uint16 `json:"keepalive"` + WillMsg PubPacket `json:"willMsg"` } type route struct { @@ -136,7 +136,7 @@ var ( ) func (c *client) init() { - c.lastMsgTime = time.Now().Unix() //mark the connection packet time as last time messaged + c.lastMsgTime = time.Now().Unix() //mark the connection packet time as last time messaged c.status = Connected c.info.localIP, _, _ = net.SplitHostPort(c.conn.LocalAddr().String()) remoteAddr := c.conn.RemoteAddr() @@ -867,11 +867,11 @@ func (c *client) Close() { } pubInfo := Info{ - ClientID: c.info.clientID, - Username: c.info.username, - Password: c.info.password, + ClientID: c.info.clientID, + Username: c.info.username, + Password: c.info.password, Keepalive: c.info.keepalive, - WillMsg: pubPack, + WillMsg: pubPack, } //offline notification b.OnlineOfflineNotification(pubInfo, false, c.lastMsgTime)