Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #1254 #18

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1195,9 +1195,10 @@ MqttClient.prototype._cleanUp = function (forced, done) {
* @param {Object} packet - packet options
* @param {Function} cb - callback when the packet is sent
* @param {Function} cbStorePut - called when message is put into outgoingStore
* @param {Boolean} noStore - send without put to the store
* @api private
*/
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut, noStore) {
debug('_sendPacket :: (%s) :: start', this.options.clientId)
cbStorePut = cbStorePut || nop
cb = cb || nop
Expand All @@ -1224,6 +1225,11 @@ MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
// When sending a packet, reschedule the ping timer
this._shiftPingInterval()

if (noStore) {
sendPacket(this, packet, cb)
return
}

switch (packet.cmd) {
case 'publish':
break
Expand Down Expand Up @@ -1840,7 +1846,7 @@ MqttClient.prototype._onConnect = function (packet) {
}
that._packetIdsDuringStoreProcessing[packet.messageId] = true
if (that.messageIdProvider.register(packet.messageId)) {
that._sendPacket(packet)
that._sendPacket(packet, undefined, undefined, true)
} else {
debug('messageId: %d has already used.', packet.messageId)
}
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@
"standard": "^16.0.4",
"tape": "^5.5.2",
"terser": "^5.14.2",
"typescript": "^4.5.5"
"typescript": "^4.5.5",
"mqtt-level-store": "^3.1.0"
},
"standard": {
"env": [
Expand Down
73 changes: 73 additions & 0 deletions test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const Store = require('./../lib/store')
const assert = require('chai').assert
const ports = require('./helpers/port_list')
const serverBuilder = require('./server_helpers_for_client_tests').serverBuilder
const fs = require('fs')
const levelStore = require('mqtt-level-store')

module.exports = function (server, config) {
const version = config.protocolVersion || 4
Expand Down Expand Up @@ -652,6 +654,77 @@ module.exports = function (server, config) {
})
})

it('should not overtake the messages stored in the level-db-store', function (done) {
var storePath = './temp'
var store = levelStore(storePath)
var client = null
var incomingStore = store.incoming
var outgoingStore = store.outgoing
var publishCount = 0

var server2 = serverBuilder(config.protocol, function (serverClient) {
serverClient.on('connect', function () {
var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
serverClient.connack(connack)
})
serverClient.on('publish', function (packet) {
if (packet.qos !== 0) {
serverClient.puback({messageId: packet.messageId})
}

switch (publishCount++) {
case 0:
assert.strictEqual(packet.payload.toString(), 'payload1')
break
case 1:
assert.strictEqual(packet.payload.toString(), 'payload2')
break
case 2:
assert.strictEqual(packet.payload.toString(), 'payload3')

server2.close()
fs.rmdirSync(storePath, {recursive: true})
done()
break
}
})
})

const clientOptions = {
port: ports.PORTAND72,
host: 'localhost',
clean: false,
clientId: 'cid1',
reconnectPeriod: 0,
incomingStore: incomingStore,
outgoingStore: outgoingStore,
queueQoSZero: true
}

server2.listen(ports.PORTAND72, function () {
client = connect(clientOptions)

client.once('close', function () {
client.once('connect', function () {
client.publish('test', 'payload2', {qos: 1})
client.publish('test', 'payload3', {qos: 1}, function () {
client.end(false)
})
})
// reconecting
client.reconnect(clientOptions)
})

// publish and close
client.publish('test', 'payload1', {
qos: 1,
cbStorePut: function () {
client.end(true)
}
})
})
})

it('should call cb if an outgoing QoS 0 message is not sent', function (done) {
const client = connect({ queueQoSZero: false })
let called = false
Expand Down