Skip to content

Commit

Permalink
add clientID in log for debug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyuyan committed Nov 2, 2017
1 parent 208a7cf commit 6a89b62
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 32 deletions.
7 changes: 4 additions & 3 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
connack.SessionPresent = msg.CleanSession
err = connack.Write(conn)
if err != nil {
log.Error("send connack error, ", err)
log.Error("send connack error, ", err, " clientID = ", msg.ClientIdentifier)
return
}

Expand Down Expand Up @@ -295,10 +295,11 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
c.mp = msgPool
old, exist = b.clients.Load(cid)
if exist {
log.Warn("client exist, close old...")
log.Warn("client exist, close old...", " clientID = ", c.info.clientID)
ol, ok := old.(*client)
if ok {
ol.Close()
msg := &Message{client: c, packet: DisconnectdPacket}
ol.mp.queue <- msg
}
}
b.clients.Store(cid, c)
Expand Down
46 changes: 17 additions & 29 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c *client) readLoop() {
}
packet, err := packets.ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", err)
log.Error("read packet error: ", err, " clientID = ", c.info.clientID)
break
}
// log.Info("recv buf: ", packet)
Expand All @@ -124,45 +124,33 @@ func ProcessMessage(msg *Message) {
if ca == nil {
return
}
log.Debug("Recv message: ", ca.String(), " clientID = ", c.info.clientID)
switch ca.(type) {
case *packets.ConnackPacket:
// log.Info("Recv conack message..........")

case *packets.ConnectPacket:
// log.Info("Recv connect message..........")
case *packets.PublishPacket:
// log.Info("Recv publish message..........")
packet := ca.(*packets.PublishPacket)
c.ProcessPublish(packet)
case *packets.PubackPacket:
//log.Info("Recv publish ack message..........")
case *packets.PubrecPacket:
//log.Info("Recv publish rec message..........")
case *packets.PubrelPacket:
//log.Info("Recv publish rel message..........")
case *packets.PubcompPacket:
//log.Info("Recv publish ack message..........")
case *packets.SubscribePacket:
// log.Info("Recv subscribe message.....")
packet := ca.(*packets.SubscribePacket)
c.ProcessSubscribe(packet)
case *packets.SubackPacket:
// log.Info("Recv suback message.....")
case *packets.UnsubscribePacket:
// log.Info("Recv unsubscribe message.....")
packet := ca.(*packets.UnsubscribePacket)
c.ProcessUnSubscribe(packet)
case *packets.UnsubackPacket:
//log.Info("Recv unsuback message.....")
case *packets.PingreqPacket:
// log.Info("Recv PINGREQ message..........")
c.ProcessPing()
case *packets.PingrespPacket:
//log.Info("Recv PINGRESP message..........")
case *packets.DisconnectPacket:
// log.Info("Recv DISCONNECT message.......")
c.Close()
default:
log.Info("Recv Unknow message.......")
log.Info("Recv Unknow message.......", " clientID = ", c.info.clientID)
}
}

Expand All @@ -173,7 +161,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {

topic := packet.TopicName
if !c.CheckTopicAuth(PUB, topic) {
log.Error("Pub Topics Auth failed, ", topic)
log.Error("Pub Topics Auth failed, ", topic, " clientID = ", c.info.clientID)
return
}

Expand All @@ -184,21 +172,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
puback.MessageID = packet.MessageID
if err := c.WriterPacket(puback); err != nil {
log.Error("send puback error, ", err)
log.Error("send puback error, ", err, " clientID = ", c.info.clientID)
return
}
c.ProcessPublishMessage(packet)
case QosExactlyOnce:
return
default:
log.Error("publish with unknown qos")
log.Error("publish with unknown qos", " clientID = ", c.info.clientID)
return
}
if packet.Retain {
if b := c.broker; b != nil {
err := b.rl.Insert(topic, packet)
if err != nil {
log.Error("Insert Retain Message error: ", err)
log.Error("Insert Retain Message error: ", err, " clientID = ", c.info.clientID)
}
}
}
Expand Down Expand Up @@ -232,7 +220,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", err)
log.Error("process message for psub error, ", err, " clientID = ", c.info.clientID)
}
}
}
Expand All @@ -258,7 +246,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("send publish error, ", err)
log.Error("send publish error, ", err, " clientID = ", c.info.clientID)
}
}

Expand Down Expand Up @@ -312,7 +300,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
t := topic
//check topic auth for client
if !c.CheckTopicAuth(SUB, topic) {
log.Error("Sub topic Auth failed: ", topic)
log.Error("Sub topic Auth failed: ", topic, " clientID = ", c.info.clientID)
retcodes = append(retcodes, QosFailure)
continue
}
Expand Down Expand Up @@ -359,7 +347,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
}
err := b.sl.Insert(sub)
if err != nil {
log.Error("Insert subscription error: ", err)
log.Error("Insert subscription error: ", err, " clientID = ", c.info.clientID)
retcodes = append(retcodes, QosFailure)
} else {
retcodes = append(retcodes, qoss[i])
Expand All @@ -369,7 +357,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {

err := c.WriterPacket(suback)
if err != nil {
log.Error("send suback error, ", err)
log.Error("send suback error, ", err, " clientID = ", c.info.clientID)
return
}
//broadcast subscribe message
Expand All @@ -381,7 +369,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
for _, t := range topics {
packets := b.rl.Match(t)
for _, packet := range packets {
log.Info("process retain message: ", packet)
log.Info("process retain message: ", packet, " clientID = ", c.info.clientID)
if packet != nil {
c.WriterPacket(packet)
}
Expand Down Expand Up @@ -432,7 +420,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {

err := c.WriterPacket(unsuback)
if err != nil {
log.Error("send unsuback error, ", err)
log.Error("send unsuback error, ", err, " clientID = ", c.info.clientID)
return
}
// //process ubsubscribe message
Expand Down Expand Up @@ -461,7 +449,7 @@ func (c *client) ProcessPing() {
resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket)
err := c.WriterPacket(resp)
if err != nil {
log.Error("send PingResponse error, ", err)
log.Error("send PingResponse error, ", err, " clientID = ", c.info.clientID)
return
}
}
Expand Down Expand Up @@ -489,7 +477,7 @@ func (c *client) Close() {
for _, sub := range subs {
err := b.sl.Remove(sub)
if err != nil {
log.Error("closed client but remove sublist error, ", err)
log.Error("closed client but remove sublist error, ", err, " clientID = ", c.info.clientID)
}
}
if c.typ == CLIENT {
Expand Down

0 comments on commit 6a89b62

Please sign in to comment.