Skip to content

Commit

Permalink
feat!: clusters improvements (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando authored Aug 21, 2023
1 parent 66ac0ad commit 6948ba4
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 21 deletions.
13 changes: 7 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,17 @@ on:
jobs:
test:
runs-on: ubuntu-latest
services:
redis:
image: redis
ports:
- 6379:6379
options: --entrypoint redis-server
strategy:
matrix:
node-version: [14.x, 16.x, 18.x]

steps:
- uses: actions/checkout@v3

- name: Start Redis
working-directory: ./docker
run: docker-compose up -d

- name: Use Node.js
uses: actions/setup-node@v3
with:
Expand All @@ -32,3 +30,6 @@ jobs:
- name: Run tests
run: |
npm run test
- name: Run tests - clusters
run: |
node test-clusters.js
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ aedesPersistenceRedis({
}, {
port: 6380,
host: '127.0.0.1'
}])
}]),
cluster: true
})
```

Expand Down
7 changes: 7 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Upgrade

## x.x.x to 9.x.x

The database schema has changed between 8.x.x to 9.x.x.

Start with a clean database if you migrate from x.x.x to 9.x.x

# x.x.x to 10.x.x

The database schema has changed between 9.x.x to 10.x.x **IF YOU ARE USING CLUSTERS**.

Start with a clean database **IF YOU ARE USING CLUSTERS** migrate from x.x.x to 10.x.x or use `migrations.js` `from9to10` function.
22 changes: 22 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3'

services:
redis-default:
image: redis:6.2.5-alpine
command: redis-server --port 6379 --appendonly yes
network_mode: "host"

redis-cluster:
image: gsccheng/redis-cluster
ports:
- '6378:7000'
- '6380:7001'
- '6381:7002'
- '6382:7003'
- '6383:7004'
- '6384:7005'
environment:
SENTINEL: 'true'
INITIAL_PORT: 7000,
MASTERS: 3,
SLAVES_PER_MASTER: 1
46 changes: 46 additions & 0 deletions migrations.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
async function from9to10 (db, cb) {
// move retained messages from hash to keys
const RETAINEDKEY = 'retained'
function retainedKey (topic) {
return `${RETAINEDKEY}:${encodeURIComponent(topic)}`
}

// get all topics
db.hkeys(RETAINEDKEY, function (err, topics) {
if (err) {
return cb(err)
}

Promise.all(topics.map(t => {
return new Promise((resolve, reject) => {
// get packet payload
db.hgetBuffer(RETAINEDKEY, t, function (err, payload) {
if (err) {
return reject(err)
}
// set packet with new format
db.set(retainedKey(t), payload, function (err) {
if (err) {
return reject(err)
}
// remove old packet
db.hdel(RETAINEDKEY, t, function (err) {
if (err) {
return reject(err)
}
resolve()
})
})
})
})
})).then(() => {
cb(null)
}).catch(err => {
cb(err)
})
})
}

module.exports = {
from9to10
}
95 changes: 81 additions & 14 deletions persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const CLIENTSKEY = 'clients'
const WILLSKEY = 'will'
const WILLKEY = 'will:'
const RETAINEDKEY = 'retained'
const ALL_RETAINEDKEYS = `${RETAINEDKEY}:*`
const OUTGOINGKEY = 'outgoing:'
const OUTGOINGIDKEY = 'outgoing-id:'
const INCOMINGKEY = 'incoming:'
Expand Down Expand Up @@ -52,6 +53,10 @@ function packetKey (brokerId, brokerCounter) {
return `${PACKETKEY}${brokerId}:${brokerCounter}`
}

function retainedKey (topic) {
return `${RETAINEDKEY}:${encodeURIComponent(topic)}`
}

function packetCountKey (brokerId, brokerCounter) {
return `${PACKETKEY}${brokerId}:${brokerCounter}:offlineCount`
}
Expand All @@ -64,25 +69,70 @@ class RedisPersistence extends CachedPersistence {

this.messageIdCache = HLRU(100000)

if (opts.cluster) {
if (opts.cluster && Array.isArray(opts.cluster)) {
this._db = new Redis.Cluster(opts.cluster)
} else {
this._db = opts.conn || new Redis(opts)
}

this._getRetainedChunkBound = this._getRetainedChunk.bind(this)
this.hasClusters = !!opts.cluster
this._getRetainedChunkBound = (this.hasClusters ? this._getRetainedChunkCluster : this._getRetainedChunk).bind(this)
this._getRetainedKeysBound = (this.hasClusters ? this._getRetainedKeysCluster : this._getRetainedKeys).bind(this)
}

storeRetained (packet, cb) {
/**
* When using clusters we store it using a compound key instead of an hash
* to spread the load across the clusters. See issue #85.
*/
_storeRetainedCluster (packet, cb) {
if (packet.payload.length === 0) {
this._db.del(retainedKey(packet.topic), cb)
} else {
this._db.set(retainedKey(packet.topic), msgpack.encode(packet), cb)
}
}

_storeRetained (packet, cb) {
if (packet.payload.length === 0) {
this._db.hdel(RETAINEDKEY, packet.topic, cb)
} else {
this._db.hset(RETAINEDKEY, packet.topic, msgpack.encode(packet), cb)
}
}

_getRetainedChunk (chunk, enc, cb) {
this._db.hgetBuffer(RETAINEDKEY, chunk, cb)
storeRetained (packet, cb) {
if (this.hasClusters) {
this._storeRetainedCluster(packet, cb)
} else {
this._storeRetained(packet, cb)
}
}

_getRetainedChunkCluster (topic, enc, cb) {
this._db.getBuffer(retainedKey(topic), cb)
}

_getRetainedChunk (topic, enc, cb) {
this._db.hgetBuffer(RETAINEDKEY, topic, cb)
}

_getRetainedKeysCluster (cb) {
// Get keys of all the masters:
const masters = this._db.nodes('master')
Promise.all(
masters
.map((node) => node.keys(ALL_RETAINEDKEYS))
).then((keys) => {
// keys: [['key1', 'key2'], ['key3', 'key4']]
// flatten the array
cb(null, keys.reduce((acc, val) => acc.concat(val), []))
}).catch((err) => {
cb(err)
})
}

_getRetainedKeys (cb) {
this._db.hkeys(RETAINEDKEY, cb)
}

createRetainedStreamCombi (patterns) {
Expand All @@ -95,11 +145,11 @@ class RedisPersistence extends CachedPersistence {

const stream = through.obj(that._getRetainedChunkBound)

this._db.hkeys(RETAINEDKEY, function getKeys (err, keys) {
this._getRetainedKeysBound(function getKeys (err, keys) {
if (err) {
stream.emit('error', err)
} else {
matchRetained(stream, keys, qlobber)
matchRetained(stream, keys, qlobber, that.hasClusters)
}
})

Expand Down Expand Up @@ -269,7 +319,15 @@ class RedisPersistence extends CachedPersistence {

const encoded = msgpack.encode(new Packet(packet))

this._db.mset(pktKey, encoded, countKey, subs.length, finish)
if (this.hasClusters) {
// do not do this using `mset`, fails in clusters
outstanding += 1
this._db.set(pktKey, encoded, finish)
this._db.set(countKey, subs.length, finish)
} else {
this._db.mset(pktKey, encoded, countKey, subs.length, finish)
}

if (ttl > 0) {
outstanding += 2
this._db.expire(pktKey, ttl, finish)
Expand Down Expand Up @@ -319,6 +377,7 @@ class RedisPersistence extends CachedPersistence {
}

let count = 0
let expected = 3
let errored = false

// TODO can be cached in case of wildcard deliveries
Expand Down Expand Up @@ -354,7 +413,14 @@ class RedisPersistence extends CachedPersistence {
return cb(err)
}
if (remained === 0) {
that._db.del(pktKey, countKey, finish)
if (that.hasClusters) {
expected++
// do not remove multiple keys at once, fails in clusters
that._db.del(pktKey, finish)
that._db.del(countKey, finish)
} else {
that._db.del(pktKey, countKey, finish)
}
} else {
finish()
}
Expand All @@ -366,7 +432,7 @@ class RedisPersistence extends CachedPersistence {
errored = err
return cb(err)
}
if (count === 3 && !errored) {
if (count === expected && !errored) {
cb(err, origPacket)
}
}
Expand Down Expand Up @@ -525,10 +591,11 @@ class RedisPersistence extends CachedPersistence {
}
}

function matchRetained (stream, keys, qlobber) {
for (const key of keys) {
if (qlobber.test(key)) {
stream.write(key)
function matchRetained (stream, topics, qlobber, hasClusters) {
for (let t of topics) {
t = hasClusters ? decodeURIComponent(t.split(':')[1]) : t
if (qlobber.test(t)) {
stream.write(t)
}
}
stream.end()
Expand Down
63 changes: 63 additions & 0 deletions test-clusters.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
const test = require('tape').test
const persistence = require('./persistence')
const Redis = require('ioredis')
const mqemitterRedis = require('mqemitter-redis')
const abs = require('aedes-cached-persistence/abstract')

function unref () {
this.connector.stream.unref()
}

const nodes = [
{ host: 'localhost', port: 6378 },
{ host: 'localhost', port: 6380 },
{ host: 'localhost', port: 6381 },
{ host: 'localhost', port: 6382 },
{ host: 'localhost', port: 6383 },
{ host: 'localhost', port: 6384 }
]

const db = new Redis.Cluster(nodes)

db.on('error', e => {
console.trace(e)
})

db.on('ready', function () {
abs({
test,
buildEmitter () {
const emitter = mqemitterRedis()
emitter.subConn.on('connect', unref)
emitter.pubConn.on('connect', unref)

return emitter
},
persistence (cb) {
const slaves = db.nodes('master')
Promise.all(slaves.map(function (node) {
return node.flushdb().catch(err => {
console.error('flushRedisKeys-error:', err)
})
})).then(() => {
const conn = new Redis.Cluster(nodes)

conn.on('error', e => {
console.trace(e)
})

conn.on('ready', function () {
cb(null, persistence({
conn,
cluster: true
}))
})
})
},
waitForReady: true
})

test.onFinish(() => {
process.exit(0)
})
})

0 comments on commit 6948ba4

Please sign in to comment.