From 6948ba4731b46e83a20d44976fb649553de000f3 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 21 Aug 2023 16:40:37 +0200 Subject: [PATCH] feat!: clusters improvements (#109) --- .github/workflows/ci.yml | 13 +++--- README.md | 3 +- UPGRADE.md | 7 +++ docker/docker-compose.yml | 22 +++++++++ migrations.js | 46 +++++++++++++++++++ persistence.js | 95 +++++++++++++++++++++++++++++++++------ test-clusters.js | 63 ++++++++++++++++++++++++++ 7 files changed, 228 insertions(+), 21 deletions(-) create mode 100644 docker/docker-compose.yml create mode 100644 migrations.js create mode 100644 test-clusters.js diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c15a760..9333103 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,12 +9,6 @@ on: jobs: test: runs-on: ubuntu-latest - services: - redis: - image: redis - ports: - - 6379:6379 - options: --entrypoint redis-server strategy: matrix: node-version: [14.x, 16.x, 18.x] @@ -22,6 +16,10 @@ jobs: steps: - uses: actions/checkout@v3 + - name: Start Redis + working-directory: ./docker + run: docker-compose up -d + - name: Use Node.js uses: actions/setup-node@v3 with: @@ -32,3 +30,6 @@ jobs: - name: Run tests run: | npm run test + - name: Run tests - clusters + run: | + node test-clusters.js diff --git a/README.md b/README.md index 4b891aa..afee9d8 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,8 @@ aedesPersistenceRedis({ }, { port: 6380, host: '127.0.0.1' - }]) + }]), + cluster: true }) ``` diff --git a/UPGRADE.md b/UPGRADE.md index 88011c6..b8cdafe 100644 --- a/UPGRADE.md +++ b/UPGRADE.md @@ -1,6 +1,13 @@ # Upgrade ## x.x.x to 9.x.x + The database schema has changed between 8.x.x to 9.x.x. Start with a clean database if you migrate from x.x.x to 9.x.x + +# x.x.x to 10.x.x + +The database schema has changed between 9.x.x to 10.x.x **IF YOU ARE USING CLUSTERS**. + +Start with a clean database **IF YOU ARE USING CLUSTERS** migrate from x.x.x to 10.x.x or use `migrations.js` `from9to10` function. diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..8c1a80b --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,22 @@ +version: '3' + +services: + redis-default: + image: redis:6.2.5-alpine + command: redis-server --port 6379 --appendonly yes + network_mode: "host" + + redis-cluster: + image: gsccheng/redis-cluster + ports: + - '6378:7000' + - '6380:7001' + - '6381:7002' + - '6382:7003' + - '6383:7004' + - '6384:7005' + environment: + SENTINEL: 'true' + INITIAL_PORT: 7000, + MASTERS: 3, + SLAVES_PER_MASTER: 1 \ No newline at end of file diff --git a/migrations.js b/migrations.js new file mode 100644 index 0000000..70c4533 --- /dev/null +++ b/migrations.js @@ -0,0 +1,46 @@ +async function from9to10 (db, cb) { + // move retained messages from hash to keys + const RETAINEDKEY = 'retained' + function retainedKey (topic) { + return `${RETAINEDKEY}:${encodeURIComponent(topic)}` + } + + // get all topics + db.hkeys(RETAINEDKEY, function (err, topics) { + if (err) { + return cb(err) + } + + Promise.all(topics.map(t => { + return new Promise((resolve, reject) => { + // get packet payload + db.hgetBuffer(RETAINEDKEY, t, function (err, payload) { + if (err) { + return reject(err) + } + // set packet with new format + db.set(retainedKey(t), payload, function (err) { + if (err) { + return reject(err) + } + // remove old packet + db.hdel(RETAINEDKEY, t, function (err) { + if (err) { + return reject(err) + } + resolve() + }) + }) + }) + }) + })).then(() => { + cb(null) + }).catch(err => { + cb(err) + }) + }) +} + +module.exports = { + from9to10 +} diff --git a/persistence.js b/persistence.js index 40aa4ed..b527c88 100644 --- a/persistence.js +++ b/persistence.js @@ -19,6 +19,7 @@ const CLIENTSKEY = 'clients' const WILLSKEY = 'will' const WILLKEY = 'will:' const RETAINEDKEY = 'retained' +const ALL_RETAINEDKEYS = `${RETAINEDKEY}:*` const OUTGOINGKEY = 'outgoing:' const OUTGOINGIDKEY = 'outgoing-id:' const INCOMINGKEY = 'incoming:' @@ -52,6 +53,10 @@ function packetKey (brokerId, brokerCounter) { return `${PACKETKEY}${brokerId}:${brokerCounter}` } +function retainedKey (topic) { + return `${RETAINEDKEY}:${encodeURIComponent(topic)}` +} + function packetCountKey (brokerId, brokerCounter) { return `${PACKETKEY}${brokerId}:${brokerCounter}:offlineCount` } @@ -64,16 +69,30 @@ class RedisPersistence extends CachedPersistence { this.messageIdCache = HLRU(100000) - if (opts.cluster) { + if (opts.cluster && Array.isArray(opts.cluster)) { this._db = new Redis.Cluster(opts.cluster) } else { this._db = opts.conn || new Redis(opts) } - this._getRetainedChunkBound = this._getRetainedChunk.bind(this) + this.hasClusters = !!opts.cluster + this._getRetainedChunkBound = (this.hasClusters ? this._getRetainedChunkCluster : this._getRetainedChunk).bind(this) + this._getRetainedKeysBound = (this.hasClusters ? this._getRetainedKeysCluster : this._getRetainedKeys).bind(this) } - storeRetained (packet, cb) { + /** + * When using clusters we store it using a compound key instead of an hash + * to spread the load across the clusters. See issue #85. + */ + _storeRetainedCluster (packet, cb) { + if (packet.payload.length === 0) { + this._db.del(retainedKey(packet.topic), cb) + } else { + this._db.set(retainedKey(packet.topic), msgpack.encode(packet), cb) + } + } + + _storeRetained (packet, cb) { if (packet.payload.length === 0) { this._db.hdel(RETAINEDKEY, packet.topic, cb) } else { @@ -81,8 +100,39 @@ class RedisPersistence extends CachedPersistence { } } - _getRetainedChunk (chunk, enc, cb) { - this._db.hgetBuffer(RETAINEDKEY, chunk, cb) + storeRetained (packet, cb) { + if (this.hasClusters) { + this._storeRetainedCluster(packet, cb) + } else { + this._storeRetained(packet, cb) + } + } + + _getRetainedChunkCluster (topic, enc, cb) { + this._db.getBuffer(retainedKey(topic), cb) + } + + _getRetainedChunk (topic, enc, cb) { + this._db.hgetBuffer(RETAINEDKEY, topic, cb) + } + + _getRetainedKeysCluster (cb) { + // Get keys of all the masters: + const masters = this._db.nodes('master') + Promise.all( + masters + .map((node) => node.keys(ALL_RETAINEDKEYS)) + ).then((keys) => { + // keys: [['key1', 'key2'], ['key3', 'key4']] + // flatten the array + cb(null, keys.reduce((acc, val) => acc.concat(val), [])) + }).catch((err) => { + cb(err) + }) + } + + _getRetainedKeys (cb) { + this._db.hkeys(RETAINEDKEY, cb) } createRetainedStreamCombi (patterns) { @@ -95,11 +145,11 @@ class RedisPersistence extends CachedPersistence { const stream = through.obj(that._getRetainedChunkBound) - this._db.hkeys(RETAINEDKEY, function getKeys (err, keys) { + this._getRetainedKeysBound(function getKeys (err, keys) { if (err) { stream.emit('error', err) } else { - matchRetained(stream, keys, qlobber) + matchRetained(stream, keys, qlobber, that.hasClusters) } }) @@ -269,7 +319,15 @@ class RedisPersistence extends CachedPersistence { const encoded = msgpack.encode(new Packet(packet)) - this._db.mset(pktKey, encoded, countKey, subs.length, finish) + if (this.hasClusters) { + // do not do this using `mset`, fails in clusters + outstanding += 1 + this._db.set(pktKey, encoded, finish) + this._db.set(countKey, subs.length, finish) + } else { + this._db.mset(pktKey, encoded, countKey, subs.length, finish) + } + if (ttl > 0) { outstanding += 2 this._db.expire(pktKey, ttl, finish) @@ -319,6 +377,7 @@ class RedisPersistence extends CachedPersistence { } let count = 0 + let expected = 3 let errored = false // TODO can be cached in case of wildcard deliveries @@ -354,7 +413,14 @@ class RedisPersistence extends CachedPersistence { return cb(err) } if (remained === 0) { - that._db.del(pktKey, countKey, finish) + if (that.hasClusters) { + expected++ + // do not remove multiple keys at once, fails in clusters + that._db.del(pktKey, finish) + that._db.del(countKey, finish) + } else { + that._db.del(pktKey, countKey, finish) + } } else { finish() } @@ -366,7 +432,7 @@ class RedisPersistence extends CachedPersistence { errored = err return cb(err) } - if (count === 3 && !errored) { + if (count === expected && !errored) { cb(err, origPacket) } } @@ -525,10 +591,11 @@ class RedisPersistence extends CachedPersistence { } } -function matchRetained (stream, keys, qlobber) { - for (const key of keys) { - if (qlobber.test(key)) { - stream.write(key) +function matchRetained (stream, topics, qlobber, hasClusters) { + for (let t of topics) { + t = hasClusters ? decodeURIComponent(t.split(':')[1]) : t + if (qlobber.test(t)) { + stream.write(t) } } stream.end() diff --git a/test-clusters.js b/test-clusters.js new file mode 100644 index 0000000..5cac38e --- /dev/null +++ b/test-clusters.js @@ -0,0 +1,63 @@ +const test = require('tape').test +const persistence = require('./persistence') +const Redis = require('ioredis') +const mqemitterRedis = require('mqemitter-redis') +const abs = require('aedes-cached-persistence/abstract') + +function unref () { + this.connector.stream.unref() +} + +const nodes = [ + { host: 'localhost', port: 6378 }, + { host: 'localhost', port: 6380 }, + { host: 'localhost', port: 6381 }, + { host: 'localhost', port: 6382 }, + { host: 'localhost', port: 6383 }, + { host: 'localhost', port: 6384 } +] + +const db = new Redis.Cluster(nodes) + +db.on('error', e => { + console.trace(e) +}) + +db.on('ready', function () { + abs({ + test, + buildEmitter () { + const emitter = mqemitterRedis() + emitter.subConn.on('connect', unref) + emitter.pubConn.on('connect', unref) + + return emitter + }, + persistence (cb) { + const slaves = db.nodes('master') + Promise.all(slaves.map(function (node) { + return node.flushdb().catch(err => { + console.error('flushRedisKeys-error:', err) + }) + })).then(() => { + const conn = new Redis.Cluster(nodes) + + conn.on('error', e => { + console.trace(e) + }) + + conn.on('ready', function () { + cb(null, persistence({ + conn, + cluster: true + })) + }) + }) + }, + waitForReady: true + }) + + test.onFinish(() => { + process.exit(0) + }) +})