-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt.go
88 lines (81 loc) · 2.13 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package main
import (
"fmt"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func DefineSourceMqttClient(
broker string,
port string,
clientId string,
username string,
password string,
caCert string,
clientCert string,
clientKey string,
targetClient mqtt.Client,
targetTopic string,
) mqtt.Client {
opts := buildClientOptions(broker, port, clientId, username, password, caCert, clientCert, clientKey)
opts.AddBroker(fmt.Sprintf("ssl://%s:%s", broker, port))
opts.SetDefaultPublishHandler(PublishToTargetBrokerOnMessageReceived(targetClient, targetTopic))
return mqtt.NewClient(opts)
}
func DefineTargetMqttClient(
broker string,
port string,
clientId string,
username string,
password string,
caCert string,
clientCert string,
clientKey string,
) mqtt.Client {
opts := buildClientOptions(broker, port, clientId, username, password, caCert, clientCert, clientKey)
return mqtt.NewClient(opts)
}
func ConnectClient(client mqtt.Client) {
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Printf("Connect lost: %v", token.Error())
fmt.Println("Reconnecting...")
for {
token := client.Connect()
if token.Wait() && token.Error() == nil {
fmt.Println("Successfully reconnected")
break
} else {
fmt.Printf("Failed to reconnect: %v. Retrying in 5 seconds...\n", token.Error())
time.Sleep(5 * time.Second)
}
}
}
}
func Subscribe(client mqtt.Client, topic string) {
token := client.Subscribe(topic, 1, nil)
if token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
fmt.Println("Subscribed to topic: %s", topic)
}
func buildClientOptions(
broker string,
port string,
clientId string,
username string,
password string,
caCert string,
clientCert string,
clientKey string,
) *mqtt.ClientOptions {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("ssl://%s:%s", broker, port))
tlsConfig := NewTlsConfig(caCert, clientCert, clientKey)
opts.SetTLSConfig(tlsConfig)
opts.SetClientID(clientId)
opts.SetUsername(username)
opts.SetPassword(password)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
return opts
}