forked from absmach/mgate
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimple.go
95 lines (78 loc) · 2.71 KB
/
simple.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package simple
import (
"context"
"errors"
"log/slog"
"github.com/absmach/mgate/pkg/session"
)
var errSessionMissing = errors.New("session is missing")
var _ session.Handler = (*Handler)(nil)
// Handler implements mqtt.Handler interface
type Handler struct {
logger *slog.Logger
}
// New creates new Event entity
func New(logger *slog.Logger) *Handler {
return &Handler{
logger: logger,
}
}
// AuthConnect is called on device connection,
// prior forwarding to the MQTT broker
func (h *Handler) AuthConnect(ctx context.Context) error {
return h.logAction(ctx, "AuthConnect", nil, nil)
}
// AuthPublish is called on device publish,
// prior forwarding to the MQTT broker
func (h *Handler) AuthPublish(ctx context.Context, topic *string, payload *[]byte) error {
return h.logAction(ctx, "AuthPublish", &[]string{*topic}, payload)
}
// AuthSubscribe is called on device publish,
// prior forwarding to the MQTT broker
func (h *Handler) AuthSubscribe(ctx context.Context, topics *[]string) error {
return h.logAction(ctx, "AuthSubscribe", topics, nil)
}
// Connect - after client successfully connected
func (h *Handler) Connect(ctx context.Context) error {
return h.logAction(ctx, "Connect", nil, nil)
}
// Publish - after client successfully published
func (h *Handler) Publish(ctx context.Context, topic *string, payload *[]byte) error {
return h.logAction(ctx, "Publish", &[]string{*topic}, payload)
}
// Subscribe - after client successfully subscribed
func (h *Handler) Subscribe(ctx context.Context, topics *[]string) error {
return h.logAction(ctx, "Subscribe", topics, nil)
}
// Unsubscribe - after client unsubscribed
func (h *Handler) Unsubscribe(ctx context.Context, topics *[]string) error {
return h.logAction(ctx, "Unsubscribe", topics, nil)
}
// Disconnect on connection lost
func (h *Handler) Disconnect(ctx context.Context) error {
return h.logAction(ctx, "Disconnect", nil, nil)
}
func (h *Handler) logAction(ctx context.Context, action string, topics *[]string, payload *[]byte) error {
s, ok := session.FromContext(ctx)
args := []interface{}{
slog.Group("session", slog.String("id", s.ID), slog.String("username", s.Username)),
}
if s.Cert.Subject.CommonName != "" {
args = append(args, slog.Group("cert", slog.String("cn", s.Cert.Subject.CommonName)))
}
if topics != nil {
args = append(args, slog.Any("topics", *topics))
}
if payload != nil {
args = append(args, slog.Any("payload", *payload))
}
if !ok {
args = append(args, slog.Any("error", errSessionMissing))
h.logger.Error(action+"() failed to complete", args...)
return errSessionMissing
}
h.logger.Info(action+"() completed successfully", args...)
return nil
}