diff --git a/hooks/storage/badger/badger.go b/hooks/storage/badger/badger.go index b6f95648..c18bb987 100644 --- a/hooks/storage/badger/badger.go +++ b/hooks/storage/badger/badger.go @@ -292,6 +292,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) { TopicName: pk.TopicName, Payload: pk.Payload, Created: pk.Created, + Client: cl.ID, Origin: pk.Origin, Properties: storage.MessageProperties{ PayloadFormat: props.PayloadFormat, @@ -319,6 +320,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese in := &storage.Message{ ID: inflightKey(cl, pk), T: storage.InflightKey, + Client: cl.ID, Origin: pk.Origin, PacketID: pk.PacketID, FixedHeader: pk.FixedHeader, diff --git a/hooks/storage/bolt/bolt.go b/hooks/storage/bolt/bolt.go index cf9dae42..67366deb 100644 --- a/hooks/storage/bolt/bolt.go +++ b/hooks/storage/bolt/bolt.go @@ -260,6 +260,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) { TopicName: pk.TopicName, Payload: pk.Payload, Created: pk.Created, + Client: cl.ID, Origin: pk.Origin, Properties: storage.MessageProperties{ PayloadFormat: props.PayloadFormat, @@ -287,6 +288,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese in := &storage.Message{ ID: inflightKey(cl, pk), T: storage.InflightKey, + Client: cl.ID, Origin: pk.Origin, FixedHeader: pk.FixedHeader, TopicName: pk.TopicName, diff --git a/hooks/storage/pebble/pebble.go b/hooks/storage/pebble/pebble.go index 9dbf2108..83b9622a 100644 --- a/hooks/storage/pebble/pebble.go +++ b/hooks/storage/pebble/pebble.go @@ -268,6 +268,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) { TopicName: pk.TopicName, Payload: pk.Payload, Created: pk.Created, + Client: cl.ID, Origin: pk.Origin, Properties: storage.MessageProperties{ PayloadFormat: props.PayloadFormat, @@ -295,6 +296,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese in := &storage.Message{ ID: inflightKey(cl, pk), T: storage.InflightKey, + Client: cl.ID, Origin: pk.Origin, PacketID: pk.PacketID, FixedHeader: pk.FixedHeader, diff --git a/hooks/storage/redis/redis.go b/hooks/storage/redis/redis.go index 10246630..77153697 100644 --- a/hooks/storage/redis/redis.go +++ b/hooks/storage/redis/redis.go @@ -287,6 +287,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) { TopicName: pk.TopicName, Payload: pk.Payload, Created: pk.Created, + Client: cl.ID, Origin: pk.Origin, Properties: storage.MessageProperties{ PayloadFormat: props.PayloadFormat, @@ -317,6 +318,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese in := &storage.Message{ ID: inflightKey(cl, pk), T: storage.InflightKey, + Client: cl.ID, Origin: pk.Origin, FixedHeader: pk.FixedHeader, TopicName: pk.TopicName, diff --git a/hooks/storage/storage.go b/hooks/storage/storage.go index c8b7d5df..4afda265 100644 --- a/hooks/storage/storage.go +++ b/hooks/storage/storage.go @@ -89,6 +89,7 @@ type Message struct { Payload []byte `json:"payload"` // the message payload (if retained) T string `json:"t,omitempty"` // the data type ID string `json:"id,omitempty" storm:"id"` // the storage key + Client string `json:"client,omitempty"` // the client id the message is for Origin string `json:"origin,omitempty"` // the id of the client who sent the message TopicName string `json:"topic_name,omitempty"` // the topic the message was sent to (if retained) FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message diff --git a/server.go b/server.go index ffffc5ca..0c2edd79 100644 --- a/server.go +++ b/server.go @@ -1672,7 +1672,7 @@ func (s *Server) loadClients(v []storage.Client) { // loadInflight restores inflight messages from the datastore. func (s *Server) loadInflight(v []storage.Message) { for _, msg := range v { - if client, ok := s.Clients.Get(msg.Origin); ok { + if client, ok := s.Clients.Get(msg.Client); ok { client.State.Inflight.Set(msg.ToPacket()) } } diff --git a/server_test.go b/server_test.go index b1cbf66d..ab0830a4 100644 --- a/server_test.go +++ b/server_test.go @@ -3416,10 +3416,10 @@ func TestServerLoadInflightMessages(t *testing.T) { require.Equal(t, 3, s.Clients.Len()) v := []storage.Message{ - {Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"}, - {Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"}, - {Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"}, - {Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"}, + {Client: "mochi", Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"}, + {Client: "mochi", Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"}, + {Client: "zen", Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"}, + {Client: "mochi-co", Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"}, } s.loadInflight(v)