Skip to content

Commit

Permalink
Upgrade Level to 9.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
seriousme committed Feb 21, 2025
1 parent bd81caa commit ddc1496
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 60 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"aedes-persistence": "^9.1.1",
"concat-stream": "^2.0.0",
"faucet": "0.0.1",
"level": "^8.0.0",
"level": "^9.0.0",
"mqemitter": "^4.5.0",
"pre-commit": "^1.2.2",
"release-it": "^15.0.0",
Expand Down
134 changes: 75 additions & 59 deletions persistence.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const Packet = require('aedes-packet')
const msgpack = require('msgpack-lite')
const { EventEmitter } = require('events')
const { Readable } = require('stream')
const { EventEmitter } = require('node:events')
const { Readable } = require('node:stream')
const QlobberSub = require('qlobber/aedes/qlobber-sub')
const { QlobberTrue } = require('qlobber')

Expand All @@ -21,6 +21,7 @@ const WILL = 'will:'
const encodingOption = {
valueEncoding: 'buffer'
}
const LEVEL_NOT_FOUND = 'LEVEL_NOT_FOUND'

async function * decodedDbValues (db, start) {
const opts = Object.assign({
Expand Down Expand Up @@ -136,6 +137,12 @@ function toSubKey (sub) {
return `${subByClientKey(sub.clientId)}:${sub.topic}`
}

function callCb (cb, err, value) {
if (typeof cb === 'function') {
cb(err, value)
}
}

class LevelPersistence extends EventEmitter {
// private class members start with #
#db
Expand All @@ -148,34 +155,38 @@ class LevelPersistence extends EventEmitter {
this.#trie = new QlobberSub(QlobberOpts)
this.#ready = false

const that = this

loadSubscriptions(this.#db, this.#trie)
.then(() => {
that.#ready = true
that.emit('ready')
this.#ready = true
this.emit('ready')
})
.catch((err) => {
that.emit('error', err)
this.emit('error', err)
})
}

#dbGet (key, cb) {
this.#db.get(key, encodingOption, (err, blob) => {
cb(err, (!err) ? msgpack.decode(blob) : null)
})
async #dbGet (key, cb) {
const blob = await this.#db.get(key, encodingOption)
if (blob !== undefined) {
cb(undefined, msgpack.decode(blob))
return
}
cb(LEVEL_NOT_FOUND, null)
}

#dbPut (key, value, cb) {
this.#db.put(key, msgpack.encode(value), encodingOption, cb)
async #dbPut (key, value, cb) {
await this.#db.put(key, msgpack.encode(value), encodingOption)
callCb(cb)
}

#dbDel (key, cb) {
this.#db.del(key, cb)
async #dbDel (key, cb) {
await this.#db.del(key)
callCb(cb)
}

#dbBatch (opArray, cb) {
this.#db.batch(opArray, encodingOption, cb)
async #dbBatch (opArray, cb) {
await this.#db.batch(opArray, encodingOption)
callCb(cb)
}

storeRetained (packet, cb) {
Expand Down Expand Up @@ -261,11 +272,10 @@ class LevelPersistence extends EventEmitter {
}

cleanSubscriptions (client, cb) {
const that = this
this.subscriptionsByClient(client, (err, subs) => {
if (err || !subs) return cb(err, client)

that.removeSubscriptions(
this.removeSubscriptions(
client,
subs.map(sub => sub.topic),
(err) => {
Expand Down Expand Up @@ -300,32 +310,33 @@ class LevelPersistence extends EventEmitter {
}

outgoingUpdate (client, packet, cb) {
const that = this
if (packet.brokerId) {
that.#updateWithBrokerData(client, packet, cb)
this.#updateWithBrokerData(client, packet, cb)
} else {
this.#augmentWithBrokerData(client, packet, (err) => {
if (err) return cb(err, client, packet)
that.#updateWithBrokerData(client, packet, cb)
this.#updateWithBrokerData(client, packet, cb)
})
}
}

outgoingClearMessageId (client, packet, cb) {
const that = this
const key = outgoingByIdKey(client.id, packet.messageId)
this.#dbGet(key, (err, packet) => {
if (err?.notFound) {
return cb(null)
} else if (err) {
return cb(err)
if (err === LEVEL_NOT_FOUND) {
cb(null)
return
}
if (err) {
cb(err)
return
}

const prekey = outgoingKey(client.id, packet.brokerId, packet.brokerCounter)
const batch = that.#db.batch()
const batch = this.#db.batch()
batch.del(key)
batch.del(prekey)
batch.write((err) => {
batch.write().then(err => {
cb(err, packet)
})
})
Expand All @@ -348,13 +359,15 @@ class LevelPersistence extends EventEmitter {
incomingGetPacket (client, packet, cb) {
const key = incomingKey(client.id, packet.messageId)
this.#dbGet(key, (err, packet) => {
if (err && err.notFound) {
if (err === LEVEL_NOT_FOUND) {
cb(new Error('no such packet'), client)
} else if (err) {
return
}
if (err) {
cb(err, client)
} else {
cb(null, packet, client)
return
}
cb(null, packet, client)
})
}

Expand All @@ -377,22 +390,22 @@ class LevelPersistence extends EventEmitter {
getWill (client, cb) {
const key = willKey(client.id)
this.#dbGet(key, (err, will) => {
if (err && err.notFound) {
if (err === LEVEL_NOT_FOUND) {
cb(null, null, client)
} else {
cb(err, will, client)
return
}
cb(err, will, client)
})
}

delWill (client, cb) {
const key = willKey(client.id)
const that = this
this.#dbGet(key, (err, will) => {
if (err) {
return cb(err, null, client)
cb(err, null, client)
return
}
that.#dbDel(key, (err) => {
this.#dbDel(key, (err) => {
cb(err, will, client)
})
})
Expand All @@ -409,40 +422,43 @@ class LevelPersistence extends EventEmitter {
#updateWithBrokerData (client, packet, cb) {
const prekey = outgoingKey(client.id, packet.brokerId, packet.brokerCounter)
const postkey = outgoingByIdKey(client.id, packet.messageId)
const that = this

this.#dbGet(prekey, (err, decoded) => {
if (err && err.notFound) {
if (err === LEVEL_NOT_FOUND) {
cb(new Error('no such packet'), client, packet)
return
} else if (err) {
}
if (err) {
cb(err, client, packet)
return
}

if (decoded.messageId > 0) {
that.#dbDel(outgoingByIdKey(client.id, decoded.messageId))
this.#dbDel(outgoingByIdKey(client.id, decoded.messageId))
}

that.#dbPut(postkey, packet, (err) => {
this.#dbPut(postkey, packet, (err) => {
if (err) {
cb(err, client, packet)
} else {
that.#dbPut(prekey, packet, (err) => {
cb(err, client, packet)
})
return
}
this.#dbPut(prekey, packet, (err) => {
cb(err, client, packet)
})
})
})
}

#augmentWithBrokerData (client, packet, cb) {
const postkey = outgoingByIdKey(client.id, packet.messageId)
this.#dbGet(postkey, (err, decoded) => {
if (err && err.notFound) {
return cb(new Error('no such packet'))
} else if (err) {
return cb(err)
if (err === LEVEL_NOT_FOUND) {
cb(new Error('no such packet'))
return
}
if (err) {
cb(err)
return
}

packet.brokerId = decoded.brokerId
Expand All @@ -451,8 +467,9 @@ class LevelPersistence extends EventEmitter {
})
}

destroy (cb) {
this.#db.close(cb)
async destroy (cb) {
await this.#db.close()
callCb(cb)
}
}

Expand All @@ -466,11 +483,10 @@ function addSubToTrie (trie, sub) {
if (match.qos === sub.qos) {
add = false
break
} else {
trie.remove(match.topic, match)
if (sub.qos === 0) {
add = false
}
}
trie.remove(match.topic, match)
if (sub.qos === 0) {
add = false
}
}
}
Expand Down

0 comments on commit ddc1496

Please sign in to comment.