Skip to content

Commit

Permalink
dc/mqtt use shared lib, update envs accordingly
Browse files Browse the repository at this point in the history
  • Loading branch information
clezag committed Nov 19, 2024
1 parent 123b8f1 commit 42fc2e4
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 86 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/dc-mqtt-client-environment-a22.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ jobs:
tag: ${{ env.DOCKER_TAG }}
pullPolicy: "IfNotPresent"
env:
APP_MQTT_CLIENTID: airQuinoRawTestv2
APP_MQTT_TOPIC: AirQuino/RawData
APP_LOGLEVEL: INFO
APP_PROVIDER: a22/environment
MQTT_CLIENTID: airQuinoRawTestv2
MQTT_TOPIC: AirQuino/RawData
LOG_LEVEL: INFO
PROVIDER: a22/environment
envSecret:
APP_MQTT_PASS: ${{ secrets.MQTT_ENV_A22_PASS}}
APP_MQTT_URI: ${{ secrets.MQTT_ENV_A22_URI}}
MQTT_PASS: ${{ secrets.MQTT_ENV_A22_PASS}}
MQTT_URI: ${{ secrets.MQTT_ENV_A22_URI}}
EOF
# Merge yaml files https://mikefarah.gitbook.io/yq/operators/multiply-merge
Expand Down
14 changes: 7 additions & 7 deletions .github/workflows/dc-mqtt-client-parking-on-bz.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ jobs:
tag: ${{ env.DOCKER_TAG }}
pullPolicy: "IfNotPresent"
env:
APP_LOGLEVEL: INFO
APP_PROVIDER: parking-onstreet/bz-axians
APP_MQTT_URI: mqtts://xsona-broker.axians.it:8883
APP_MQTT_CLIENTID: NOI-testing
APP_MQTT_TOPIC: "application/6f599389-c6b0-4a76-843d-8d493192c4c0/device/+/event/up"
LOG_LEVEL: INFO
PROVIDER: parking-onstreet/bz-axians
MQTT_URI: mqtts://xsona-broker.axians.it:8883
MQTT_CLIENTID: NOI-testing
MQTT_TOPIC: "application/6f599389-c6b0-4a76-843d-8d493192c4c0/device/+/event/up"
envSecret:
APP_MQTT_USER: '${{ secrets.MQTT_PARKING_ON_BZ_USER }}'
APP_MQTT_PASS: '${{ secrets.MQTT_PARKING_ON_BZ_PASS }}'
MQTT_USER: '${{ secrets.MQTT_PARKING_ON_BZ_USER }}'
MQTT_PASS: '${{ secrets.MQTT_PARKING_ON_BZ_PASS }}'
EOF
# Merge yaml files https://mikefarah.gitbook.io/yq/operators/multiply-merge
Expand Down
20 changes: 10 additions & 10 deletions collectors/mqtt-client/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
#
# SPDX-License-Identifier: CC0-1.0

APP_RABBITMQ_URI=amqp://guest:guest@rabbitmq
APP_RABBITMQ_EXCHANGE=ingress
APP_RABBITMQ_CLIENTNAME=dc-mqtt-client-dev
APP_MQTT_USER=
APP_MQTT_PASS=
APP_MQTT_URI=
APP_MQTT_CLIENTID=airQuinoLinearizationDev
APP_MQTT_TOPIC=AirQuino/RawData
APP_LOGLEVEL=INFO
APP_PROVIDER=test/mqtt
MQ_URI=amqp://guest:guest@rabbitmq
MQ_EXCHANGE=ingress
MQ_CLIENT=dc-mqtt-client-dev
MQTT_USER=
MQTT_PASS=
MQTT_URI=
MQTT_CLIENTID=airQuinoLinearizationDev
MQTT_TOPIC=AirQuino/RawData
LOG_LEVEL=INFO
PROVIDER=test/mqtt
20 changes: 10 additions & 10 deletions collectors/mqtt-client/infrastructure/helm/env-a22.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ image:
tag: "latest" # Set this when upgrading chart with --set-value

env:
APP_RABBITMQ_EXCHANGE: ingress
APP_RABBITMQ_CLIENTNAME: dc-mqtt-client-env-a22
APP_MQTT_USER: airQuino
APP_MQTT_CLIENTID: airQuinoRaw
APP_MQTT_TOPIC: AirQuino/RawData
APP_LOGLEVEL: INFO
APP_PROVIDER: a22/environment
LOG_LEVEL: INFO
MQTT_CLIENTID: airQuinoRaw
MQTT_TOPIC: AirQuino/RawData
MQTT_USER: airQuino
MQ_CLIENT: dc-mqtt-client-env-a22
MQ_EXCHANGE: ingress
PROVIDER: a22/environment
envSecret:
APP_MQTT_PASS:
APP_MQTT_URI:
MQTT_PASS:
MQTT_URI:
envSecretRef:
- name: APP_RABBITMQ_URI
- name: MQ_URI
secret: rabbitmq-svcbind
key: uri
20 changes: 10 additions & 10 deletions collectors/mqtt-client/infrastructure/helm/parking-on-bz.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ image:
tag: "latest" # Set this when upgrading chart with --set-value

env:
APP_RABBITMQ_EXCHANGE: ingress
APP_RABBITMQ_CLIENTNAME: dc-mqtt-client-env-a22
APP_LOGLEVEL: INFO
APP_PROVIDER: parking-onstreet/bz-axians
APP_MQTT_URI: xsona-broker.axians.it:8883
APP_MQTT_CLIENTID: NOI-testing
APP_MQTT_TOPIC:
LOG_LEVEL: INFO
MQTT_CLIENTID: NOI-testing
MQTT_TOPIC:
MQTT_URI: xsona-broker.axians.it:8883
MQ_CLIENT: dc-mqtt-client-env-a22
MQ_EXCHANGE: ingress
PROVIDER: parking-onstreet/bz-axians
envSecret:
APP_MQTT_PASS:
APP_MQTT_USER:
MQTT_PASS:
MQTT_USER:
envSecretRef:
- name: APP_RABBITMQ_URI
- name: MQ_URI
secret: rabbitmq-svcbind
key: uri
13 changes: 7 additions & 6 deletions collectors/mqtt-client/src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,24 @@ go 1.22.0
require (
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/rabbitmq/amqp091-go v1.10.0
)

require (
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/noi-techpark/go-opendatahub-ingest v0.0.0-20241119065535-d68a9695ad44 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rabbitmq/amqp091-go v1.9.0 // indirect
)

require (
github.com/ThreeDotsLabs/watermill v1.3.5
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.2
github.com/ThreeDotsLabs/watermill v1.4.1
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.3
github.com/gorilla/websocket v1.5.3 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/sync v0.9.0 // indirect
)
14 changes: 14 additions & 0 deletions collectors/mqtt-client/src/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg=
github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY=
github.com/ThreeDotsLabs/watermill v1.4.1 h1:gjP6yZH+otMPjV0KsV07pl9TeMm9UQV/gqiuiuG5Drs=
github.com/ThreeDotsLabs/watermill v1.4.1/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.2 h1:80CPTETpNm2BYaxK0Ru0Go6MNokhiioYYstxeVWJoJI=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.2/go.mod h1:MCNoh0HUg4w0bY64on9BnhUodHeimz8+vMfXrzyuWN8=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.3 h1:fkhmiBtaLn+rz5lbkPD1h8tXHfKy3gX0vMtGmxNtAsk=
github.com/ThreeDotsLabs/watermill-amqp/v2 v2.1.3/go.mod h1:xy2qXKcJpgrJURRT6YwgRyGL3qIi6/sOHrDI0MO/r5I=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -12,6 +16,8 @@ github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTq
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand All @@ -26,6 +32,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/noi-techpark/go-opendatahub-ingest v0.0.0-20241119065535-d68a9695ad44 h1:rr/a0Lha81kj2sase8SXQrFrWHeFduUehgMAjiZwRC0=
github.com/noi-techpark/go-opendatahub-ingest v0.0.0-20241119065535-d68a9695ad44/go.mod h1:OR71Qot9WXWSNwLjNNuhhdIts1KbDfDzbwv2F/Q6KOk=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -34,6 +42,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand All @@ -44,8 +54,12 @@ go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo=
golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ=
golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
53 changes: 16 additions & 37 deletions collectors/mqtt-client/src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,58 +7,41 @@ package main
import (
"encoding/json"
"log/slog"
"os"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/kelseyhightower/envconfig"
"github.com/noi-techpark/go-opendatahub-ingest/dc"
"github.com/noi-techpark/go-opendatahub-ingest/dto"
"github.com/noi-techpark/go-opendatahub-ingest/ms"
"github.com/rabbitmq/amqp091-go"
)

type mqMsg struct {
Provider string `json:"provider"`
Timestamp time.Time `json:"timestamp"`
Rawdata struct {
MsgId uint16
Topic string
Payload string
} `json:"rawdata"`
type Rawdata struct {
MsgId uint16
Topic string
Payload string
}

var cfg struct {
RABBITMQ_URI string
RABBITMQ_Exchange string
RABBITMQ_Clientname string

dc.Env
MQTT_user string
MQTT_pass string
MQTT_uri string
MQTT_clientid string
MQTT_topic string

Provider string

LogLevel string `default:"INFO"`
}

func initLog(lv string) {
level := &slog.LevelVar{}
level.UnmarshalText([]byte(lv))
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
})))
}

func main() {
envconfig.MustProcess("APP", &cfg)
initLog(cfg.LogLevel)
envconfig.MustProcess("", &cfg)
ms.InitLog(cfg.LOG_LEVEL)

slog.Info("Started with config", "cfg", cfg)

rabbit := NewRabbitPublisher(cfg.RABBITMQ_URI, cfg.RABBITMQ_Exchange, cfg.RABBITMQ_Clientname, cfg.Provider)
rabbit := NewRabbitPublisher(cfg.MQ_URI, cfg.MQ_EXCHANGE, cfg.MQ_CLIENT, cfg.PROVIDER)

opts := mqtt.NewClientOptions()
opts.AddBroker(cfg.MQTT_uri)
Expand All @@ -71,14 +54,10 @@ func main() {
c.Subscribe(cfg.MQTT_topic, 1, func(c mqtt.Client, m mqtt.Message) {
// We assume the payload is a string (json probably)
slog.Debug("got MQTT message", "id", m.MessageID(), "topic", m.Topic(), "payload", string(m.Payload()))
msg := mqMsg{
Provider: cfg.Provider,
msg := dto.RawAny{
Provider: cfg.PROVIDER,
Timestamp: time.Now(),
Rawdata: struct {
MsgId uint16
Topic string
Payload string
}{
Rawdata: Rawdata{
MsgId: m.MessageID(),
Topic: m.Topic(),
Payload: string(m.Payload()),
Expand All @@ -96,7 +75,7 @@ func main() {
select {}
}

func NewRabbitPublisher(uri string, exchange string, client string, routingkey string) chan<- mqMsg {
func NewRabbitPublisher(uri string, exchange string, client string, routingkey string) chan<- dto.RawAny {
pubConfig := amqp.NewDurablePubSubConfig(uri, nil)
pubConfig.Connection.AmqpConfig = &amqp091.Config{}
pubConfig.Connection.AmqpConfig.Properties = amqp091.Table{}
Expand All @@ -108,7 +87,7 @@ func NewRabbitPublisher(uri string, exchange string, client string, routingkey s
panic(err)
}

ch := make(chan mqMsg)
ch := make(chan dto.RawAny)

go func() {
for msg := range ch {
Expand Down

0 comments on commit 42fc2e4

Please sign in to comment.