Skip to content

Commit

Permalink
Retained messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
heynemann committed Jan 26, 2017
1 parent 691d45f commit 16269e6
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ run-tests: kill-containers run-containers
@make run-test coverage
@make kill-containers

run-test:
run-test unit:
@ginkgo -r --cover .

test-coverage-run:
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ There is a small gotcha to sending hierarchical topics, the slashes must be esca

Sends the MQTT message `{"message":"hello","number":1}` to the topic `topic`

### Retained Messages

Arkadiko supports sending retained messages. In order to specify that the message being published should be retained you just need to send a querystring parameter of `retained=true`, like:

`echo '{"message": "hello", "number": 1}' | curl -d @- localhost:8890/sendmqtt/topic?retained=true`

Any other value passed to retained (`retained=false, retained=else, retained=`) will be treated as false.

### Testing

Run `make test`
Expand Down
14 changes: 12 additions & 2 deletions api/sendmqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func SendMqttHandler(app *App) func(c echo.Context) error {
zap.String("handler", "SendMqttHandler"),
)

retainedValue := c.QueryParam("retained")
retained := true
if retainedValue != "true" {
retained = false
}

body := c.Request().Body
b, err := ioutil.ReadAll(body)
if err != nil {
Expand Down Expand Up @@ -54,7 +60,7 @@ func SendMqttHandler(app *App) func(c echo.Context) error {
return FailWith(400, err.Error(), c)
}
lg = lg.With(zap.String("topic", topic), zap.String("payload", string(b)))
workingString := fmt.Sprintf(`{"topic": "%s", "payload": %v}`, topic, string(b))
workingString := fmt.Sprintf(`{"topic": "%s", "retained": %t, "payload": %v}`, topic, retained, string(b))

log.I(lg, "sending message on topic")

Expand All @@ -63,7 +69,11 @@ func SendMqttHandler(app *App) func(c echo.Context) error {

err = WithSegment("mqtt", c, func() error {
beforeMqttTime = time.Now()
err = app.MqttClient.SendMessage(topic, string(b))
if retained {
err = app.MqttClient.SendRetainedMessage(topic, string(b))
} else {
err = app.MqttClient.SendMessage(topic, string(b))
}
afterMqttTime = time.Now()
return err
})
Expand Down
112 changes: 87 additions & 25 deletions api/sendmqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,50 +9,112 @@ package api_test

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
uuid "github.com/satori/go.uuid"
"github.com/topfreegames/arkadiko/api"
. "github.com/topfreegames/arkadiko/testing"
)

var _ = Describe("Send to MQTT Handler", func() {
Describe("Specs", func() {
It("Should respond with 200 for a valid message", func() {
a := GetDefaultTestApp()
testJSON := map[string]interface{}{
"message": "hello",
}
response := `{"topic": "test", "payload": {"message":"hello"}}`
status, body := PostJSON(a, "/sendmqtt/test", testJSON)
Describe("Regular Message", func() {
It("Should respond with 200 for a valid message", func() {
a := GetDefaultTestApp()
testJSON := map[string]interface{}{
"message": "hello",
}
response := `{"topic": "test", "retained": false, "payload": {"message":"hello"}}`
status, body := PostJSON(a, "/sendmqtt/test", testJSON)

Expect(status).To(Equal(http.StatusOK))
Expect(body).To(Equal(response))
})
Expect(status).To(Equal(http.StatusOK))
Expect(body).To(Equal(response))
})

It("Should respond with 200 for a valid message with hierarchical topic", func() {
a := GetDefaultTestApp()
testJSON := map[string]interface{}{
"message": "hello",
}
response := `{"topic": "test/topic", "payload": {"message":"hello"}}`
url := "/sendmqtt/test/topic"
status, body := PostJSON(a, url, testJSON)
It("Should respond with 200 for a valid message with hierarchical topic", func() {
a := GetDefaultTestApp()
testJSON := map[string]interface{}{
"message": "hello",
}
response := `{"topic": "test/topic", "retained": false, "payload": {"message":"hello"}}`
url := "/sendmqtt/test/topic"
status, body := PostJSON(a, url, testJSON)

Expect(status).To(Equal(http.StatusOK))
Expect(body).To(Equal(response))
})

Expect(status).To(Equal(http.StatusOK))
Expect(body).To(Equal(response))
It("Should respond with 400 if malformed JSON", func() {
a := GetDefaultTestApp()
testJSON := `{"message": "hello"}}`
status, _ := PostBody(a, "/sendmqtt/test/topic", testJSON)

Expect(status).To(Equal(400))
})
})
Describe("Retained Message", func() {
It("Should respond with 200 for a valid message", func() {
a := GetDefaultTestApp()
client := a.MqttClient
testJSON := map[string]interface{}{
"message": "hello",
}
topic := uuid.NewV4().String()
expectedMsg := `{"message":"hello"}`
response := fmt.Sprintf(
`{"topic": "%s", "retained": true, "payload": %s}`,
topic,
expectedMsg,
)
url := fmt.Sprintf("/sendmqtt/%s?retained=true", topic)
status, body := PostJSON(a, url, testJSON)

Expect(status).To(Equal(http.StatusOK))
Expect(body).To(Equal(response))

var msg mqtt.Message
var onMessageHandler = func(client mqtt.Client, message mqtt.Message) {
msg = message
}
client.MqttClient.Subscribe(topic, 2, onMessageHandler)

//Have to wait so the goroutine can call our handler
time.Sleep(50 * time.Millisecond)

It("Should respond with 400 if malformed JSON", func() {
a := GetDefaultTestApp()
testJSON := `{"message": "hello"}}`
status, _ := PostBody(a, "/sendmqtt/test/topic", testJSON)
Expect(msg).NotTo(BeNil())
Expect(msg.Retained()).To(BeTrue())
Expect(string(msg.Payload())).To(Equal(expectedMsg))
})

Expect(status).To(Equal(400))
It("Should respond with 200 for a valid message with hierarchical topic", func() {
a := GetDefaultTestApp()
testJSON := map[string]interface{}{
"message": "hello",
}
response := `{"topic": "test/topic", "retained": true, "payload": {"message":"hello"}}`
url := "/sendmqtt/test/topic?retained=true"
status, body := PostJSON(a, url, testJSON)

Expect(status).To(Equal(http.StatusOK))
Expect(body).To(Equal(response))
})

It("Should respond with 400 if malformed JSON", func() {
a := GetDefaultTestApp()
testJSON := `{"message": "hello"}}`
status, _ := PostBody(a, "/sendmqtt/test/topic?retained=true", testJSON)

Expect(status).To(Equal(400))
})
})
})

Describe("Perf", func() {
HTTPMeasure("send to MQTT", func(data map[string]interface{}) {
testJSON := map[string]interface{}{
Expand Down
2 changes: 1 addition & 1 deletion api/unauthorizeuser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var _ = Describe("Unauthorize Handler", func() {
app := data["app"].(*api.App)
status, body := PostBody(app, "/unauthorize_user", payload)
Expect(status).To(Equal(http.StatusOK), string(body))
}, 0.01)
}, 0.05)
})

})
37 changes: 31 additions & 6 deletions mqttclient/mqttclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,38 @@ func GetMqttClient(configPath string, onConnectHandler mqtt.OnConnectHandler, l

// SendMessage sends the message with the given payload to topic
func (mc *MqttClient) SendMessage(topic string, message string) error {
if token := mc.MqttClient.Publish(topic, 2, false, message); token.Wait() && token.Error() != nil {
return mc.publishMessage(topic, message, false)
}

// SendRetainedMessage sends the message with the given payload to topic
func (mc *MqttClient) SendRetainedMessage(topic string, message string) error {
return mc.publishMessage(topic, message, true)
}

func (mc *MqttClient) publishMessage(topic string, message string, retained bool) error {
if token := mc.MqttClient.Publish(topic, 2, retained, message); token.Wait() && token.Error() != nil {
mc.Logger.Error(fmt.Sprintf("%v", token.Error()))
return token.Error()
}
return nil
}

//WaitForConnection to mqtt server
func (mc *MqttClient) WaitForConnection(timeout int) error {
start := time.Now()
timedOut := func() bool {
return time.Now().Sub(start) > time.Duration(timeout)*time.Millisecond
}
for !mc.MqttClient.IsConnected() || timedOut() {
time.Sleep(1 * time.Millisecond)
}

if timedOut() {
return fmt.Errorf("Connection to MQTT timed out.")
}
return nil
}

func (mc *MqttClient) configure(l zap.Logger) {
mc.Logger = l

Expand Down Expand Up @@ -94,17 +119,17 @@ func (mc *MqttClient) start(onConnectHandler mqtt.OnConnectHandler) {
mc.Logger.Info("Initializing mqtt client", zap.String("host", mc.MqttServerHost),
zap.Int("port", mc.MqttServerPort), zap.String("ca_cert_file", mc.Config.GetString("mqttserver.ca_cert_file")))

useTls := mc.Config.GetBool("mqttserver.usetls")
useTLS := mc.Config.GetBool("mqttserver.usetls")

protocol := "tcp"
if useTls {
if useTLS {
protocol = "ssl"
}

clientId := fmt.Sprintf("arkadiko-%s", uuid.NewV4().String())
opts := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("%s://%s:%d", protocol, mc.MqttServerHost, mc.MqttServerPort)).SetClientID(clientId)
clientID := fmt.Sprintf("arkadiko-%s", uuid.NewV4().String())
opts := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("%s://%s:%d", protocol, mc.MqttServerHost, mc.MqttServerPort)).SetClientID(clientID)

if useTls {
if useTLS {
mc.Logger.Info("using tls", zap.Bool("insecure_skip_verify", mc.Config.GetBool("mqttserver.insecure_tls")))
certpool := x509.NewCertPool()
if mc.Config.GetString("mqttserver.ca_cert_file") != "" {
Expand Down
40 changes: 36 additions & 4 deletions mqttclient/mqttclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/eclipse/paho.mqtt.golang"
uuid "github.com/satori/go.uuid"
"github.com/topfreegames/arkadiko/mqttclient"
"github.com/uber-go/zap"

Expand All @@ -36,12 +37,43 @@ var _ = Describe("MQTT Client", func() {
mc := mqttclient.GetMqttClient("../config/test.yml", onConnectHandler, logger)

Expect(mc.ConfigPath).To(Equal("../config/test.yml"))
for !connected {
time.Sleep(100 * time.Millisecond)
}
err := mc.SendMessage("test", `{"message": "hello"}`)

err := mc.WaitForConnection(100)
Expect(err).NotTo(HaveOccurred())

err = mc.SendMessage("test", `{"message": "hello"}`)
Expect(err).To(BeNil())
})

It("It should send retained message", func() {
mc := mqttclient.GetMqttClient("../config/test.yml", nil, logger)

Expect(mc.ConfigPath).To(Equal("../config/test.yml"))

err := mc.WaitForConnection(100)
Expect(err).NotTo(HaveOccurred())

topic := uuid.NewV4().String()
expectedMsg := `{"message": "hello"}`

err = mc.SendRetainedMessage(topic, expectedMsg)
Expect(err).NotTo(HaveOccurred())
//TODO: REALLY need to wait 50ms?
time.Sleep(50 * time.Millisecond)

var msg mqtt.Message
var onMessageHandler = func(client mqtt.Client, message mqtt.Message) {
msg = message
}
mc.MqttClient.Subscribe(topic, 2, onMessageHandler)

//Have to wait so the goroutine can call our handler
time.Sleep(1 * time.Millisecond)

Expect(msg).NotTo(BeNil())
Expect(msg.Retained()).To(BeTrue())
Expect(string(msg.Payload())).To(Equal(expectedMsg))
})
})

Describe("Perf", func() {
Expand Down

0 comments on commit 16269e6

Please sign in to comment.