Use MQTT with RXJS in node or in the browser with MQTT over WebSocket. Docs
The API is similar to the rxjs internal WebSocket implementation for consistency but replaces the multiplex operator with topics.]
yarn add musquette
# or
npm install musquette
Import via
import { MQTTSubject } from 'musquette'
// or
import { MQTTSubject } from 'musquette/dist/lib/musquette'
It is assumed that the payload is JSON parseable string.
import { MQTTSubject } from 'musquette'
let mqtt = new MQTTSubject(`ws://localhost:9001`)
let topic = mqtt.topic(`test/topic`)
topic.subscribe({
next: message => console.log(message.test) // "test",
error: console.error
})
// or if you are not interested in errors
topic.subscribe(message => console.log(message.test)) // "test
// publish when call on a topic only expects a payload
//
topic.publish({
test: 'test'
})
import { MQTTSubject } from 'musquette'
let mqtt = new MQTTSubject(`ws://localhost:9001`)
// next expects an MQTTMessage object that consists at least
// of a topic and a message property
mqtt.next({
topic: 'test/topic',
message: {
test: 'test'
}
})
// publish on a connection expects two arguments: a topic and payload
mqtt.publish('test/topic', {
test: 'test'
}
})
This is equivalent to the first method but does not subscribe to the topic
import { Subject, Observable, merge } from 'rxjs'
import { mapTo } from 'rxjs/operators'
import { MQTTSubject } from 'musquette/dist/lib/musquette.js'
let connected$ = new Subject()
let disconnecting$ = new Subject()
let disconnected$ = new Subject()
merge(
connected$,
disconnecting$.pipe(mapTo('disconnecting')),
disconnected$.pipe(mapTo('disconnected'))
).subscribe(console.log)
let mqtt = new MQTTSubject({
url: `ws://localhost:9001`,
// mqtt.js options
//
options: {
keepalive: 3000,
clientId:
'mqttjs_' +
Math.random()
.toString(16)
.substr(2, 8)
},
// function that packs the payload that is sent
// (T) => Buffer
//
serializer: value => Buffer.from(JSON.stringify(value)),
// function that unpacks the payload
// (Buffer) => T
//
deserializer: message => JSON.parse(message.toString()),
// Observer that is called when connection is established
//
connectObserver: connected$,
// Observer that is called when disconnect is imminent
//
disconnectingObserver: disconnecting$,
// Observer that is notified when connection has ended
//
disconnectObserver: disconnected$
})
mqtt.subscribe(console.log)
setTimeout(() => {
// disconnect
//
mqtt.complete()
}, 5000)
npm t
: Run test suitenpm start
: Runnpm run build
in watch modenpm run test:watch
: Run test suite in interactive watch modenpm run test:prod
: Run linting and generate coveragenpm run build
: Generate bundles and typings, create docsnpm run lint
: Lints codenpm run commit
: Commit using conventional commit style (husky will tell you to use it if you haven't 😉)
This library uses the copy pasted mqtt-wildcard library from Sebastian Raff.
Based on typescript library starter from @alexjoverm.
Contributors (emoji key):
This project follows the all-contributors specification. Contributions of any kind are welcome!