diff --git a/broker/broker.go b/broker/broker.go index a7321c64..0571ade7 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -253,7 +253,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { connack.SessionPresent = msg.CleanSession err = connack.Write(conn) if err != nil { - log.Error("send connack error, ", err) + log.Error("send connack error, ", err, " clientID = ", msg.ClientIdentifier) return } @@ -295,10 +295,11 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { c.mp = msgPool old, exist = b.clients.Load(cid) if exist { - log.Warn("client exist, close old...") + log.Warn("client exist, close old...", " clientID = ", c.info.clientID) ol, ok := old.(*client) if ok { - ol.Close() + msg := &Message{client: c, packet: DisconnectdPacket} + ol.mp.queue <- msg } } b.clients.Store(cid, c) diff --git a/broker/client.go b/broker/client.go index c9966bbd..532f6043 100644 --- a/broker/client.go +++ b/broker/client.go @@ -102,7 +102,7 @@ func (c *client) readLoop() { } packet, err := packets.ReadPacket(nc) if err != nil { - log.Error("read packet error: ", err) + log.Error("read packet error: ", err, " clientID = ", c.info.clientID) break } // log.Info("recv buf: ", packet) @@ -124,45 +124,33 @@ func ProcessMessage(msg *Message) { if ca == nil { return } + log.Debug("Recv message: ", ca.String(), " clientID = ", c.info.clientID) switch ca.(type) { case *packets.ConnackPacket: - // log.Info("Recv conack message..........") + case *packets.ConnectPacket: - // log.Info("Recv connect message..........") case *packets.PublishPacket: - // log.Info("Recv publish message..........") packet := ca.(*packets.PublishPacket) c.ProcessPublish(packet) case *packets.PubackPacket: - //log.Info("Recv publish ack message..........") case *packets.PubrecPacket: - //log.Info("Recv publish rec message..........") case *packets.PubrelPacket: - //log.Info("Recv publish rel message..........") case *packets.PubcompPacket: - //log.Info("Recv publish ack message..........") case *packets.SubscribePacket: - // log.Info("Recv subscribe message.....") packet := ca.(*packets.SubscribePacket) c.ProcessSubscribe(packet) case *packets.SubackPacket: - // log.Info("Recv suback message.....") case *packets.UnsubscribePacket: - // log.Info("Recv unsubscribe message.....") packet := ca.(*packets.UnsubscribePacket) c.ProcessUnSubscribe(packet) case *packets.UnsubackPacket: - //log.Info("Recv unsuback message.....") case *packets.PingreqPacket: - // log.Info("Recv PINGREQ message..........") c.ProcessPing() case *packets.PingrespPacket: - //log.Info("Recv PINGRESP message..........") case *packets.DisconnectPacket: - // log.Info("Recv DISCONNECT message.......") c.Close() default: - log.Info("Recv Unknow message.......") + log.Info("Recv Unknow message.......", " clientID = ", c.info.clientID) } } @@ -173,7 +161,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) { topic := packet.TopicName if !c.CheckTopicAuth(PUB, topic) { - log.Error("Pub Topics Auth failed, ", topic) + log.Error("Pub Topics Auth failed, ", topic, " clientID = ", c.info.clientID) return } @@ -184,21 +172,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) { puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) puback.MessageID = packet.MessageID if err := c.WriterPacket(puback); err != nil { - log.Error("send puback error, ", err) + log.Error("send puback error, ", err, " clientID = ", c.info.clientID) return } c.ProcessPublishMessage(packet) case QosExactlyOnce: return default: - log.Error("publish with unknown qos") + log.Error("publish with unknown qos", " clientID = ", c.info.clientID) return } if packet.Retain { if b := c.broker; b != nil { err := b.rl.Insert(topic, packet) if err != nil { - log.Error("Insert Retain Message error: ", err) + log.Error("Insert Retain Message error: ", err, " clientID = ", c.info.clientID) } } } @@ -232,7 +220,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - log.Error("process message for psub error, ", err) + log.Error("process message for psub error, ", err, " clientID = ", c.info.clientID) } } } @@ -258,7 +246,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - log.Error("send publish error, ", err) + log.Error("send publish error, ", err, " clientID = ", c.info.clientID) } } @@ -312,7 +300,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { t := topic //check topic auth for client if !c.CheckTopicAuth(SUB, topic) { - log.Error("Sub topic Auth failed: ", topic) + log.Error("Sub topic Auth failed: ", topic, " clientID = ", c.info.clientID) retcodes = append(retcodes, QosFailure) continue } @@ -359,7 +347,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { } err := b.sl.Insert(sub) if err != nil { - log.Error("Insert subscription error: ", err) + log.Error("Insert subscription error: ", err, " clientID = ", c.info.clientID) retcodes = append(retcodes, QosFailure) } else { retcodes = append(retcodes, qoss[i]) @@ -369,7 +357,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { err := c.WriterPacket(suback) if err != nil { - log.Error("send suback error, ", err) + log.Error("send suback error, ", err, " clientID = ", c.info.clientID) return } //broadcast subscribe message @@ -381,7 +369,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { for _, t := range topics { packets := b.rl.Match(t) for _, packet := range packets { - log.Info("process retain message: ", packet) + log.Info("process retain message: ", packet, " clientID = ", c.info.clientID) if packet != nil { c.WriterPacket(packet) } @@ -432,7 +420,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) { err := c.WriterPacket(unsuback) if err != nil { - log.Error("send unsuback error, ", err) + log.Error("send unsuback error, ", err, " clientID = ", c.info.clientID) return } // //process ubsubscribe message @@ -461,7 +449,7 @@ func (c *client) ProcessPing() { resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket) err := c.WriterPacket(resp) if err != nil { - log.Error("send PingResponse error, ", err) + log.Error("send PingResponse error, ", err, " clientID = ", c.info.clientID) return } } @@ -489,7 +477,7 @@ func (c *client) Close() { for _, sub := range subs { err := b.sl.Remove(sub) if err != nil { - log.Error("closed client but remove sublist error, ", err) + log.Error("closed client but remove sublist error, ", err, " clientID = ", c.info.clientID) } } if c.typ == CLIENT {