diff --git a/README.md b/README.md index ac6394461f1..38615ee519e 100644 --- a/README.md +++ b/README.md @@ -235,6 +235,24 @@ of sending a `QUIT` command to the server, the client can simply close the netwo client.destroy(); ``` +### Client Side Caching + +Node Redis v5 adds support for [Client Side Caching](https://redis.io/docs/manual/client-side-caching/), which enables clients to cache query results locally. The Redis server will notify the client when cached results are no longer valid. + +```typescript +// Enable client side caching with RESP3 +const client = createClient({ + RESP: 3, + clientSideCache: { + ttl: 0, // Time-to-live (0 = no expiration) + maxEntries: 0, // Maximum entries (0 = unlimited) + evictPolicy: "LRU" // Eviction policy: "LRU" or "FIFO" + } +}); +``` + +See the [V5 documentation](./docs/v5.md#client-side-caching) for more details and advanced usage. + ### Auto-Pipelining Node Redis will automatically pipeline requests that are made during the same "tick". diff --git a/docs/v5.md b/docs/v5.md index a3d0ab68389..1784ae5bd74 100644 --- a/docs/v5.md +++ b/docs/v5.md @@ -89,3 +89,100 @@ await multi.exec(); // Array await multi.exec<'typed'>(); // [string] await multi.execTyped(); // [string] ``` + +# Client Side Caching + +Node Redis v5 adds support for [Client Side Caching](https://redis.io/docs/manual/client-side-caching/), which enables clients to cache query results locally. The server will notify the client when cached results are no longer valid. + +Client Side Caching is only supported with RESP3. + +## Usage + +There are two ways to implement client side caching: + +### Anonymous Cache + +```javascript +const client = createClient({ + RESP: 3, + clientSideCache: { + ttl: 0, // Time-to-live in milliseconds (0 = no expiration) + maxEntries: 0, // Maximum entries to store (0 = unlimited) + evictPolicy: "LRU" // Eviction policy: "LRU" or "FIFO" + } +}); +``` + +In this instance, the cache is managed internally by the client. + +### Controllable Cache + +```javascript +import { BasicClientSideCache } from 'redis'; + +const cache = new BasicClientSideCache({ + ttl: 0, + maxEntries: 0, + evictPolicy: "LRU" +}); + +const client = createClient({ + RESP: 3, + clientSideCache: cache +}); +``` + +With this approach, you have direct access to the cache object for more control: + +```javascript +// Manually invalidate keys +cache.invalidate(key); + +// Clear the entire cache +cache.clear(); + +// Get cache metrics +// `cache.stats()` returns a `CacheStats` object with comprehensive statistics. +const statistics = cache.stats(); + +// Key metrics: +const hits = statistics.hitCount; // Number of cache hits +const misses = statistics.missCount; // Number of cache misses +const hitRate = statistics.hitRate(); // Cache hit rate (0.0 to 1.0) + +// Many other metrics are available on the `statistics` object, e.g.: +// statistics.missRate(), statistics.loadSuccessCount, +// statistics.averageLoadPenalty(), statistics.requestCount() +``` + +## Pooled Caching + +Client side caching also works with client pools. For pooled clients, the cache is shared across all clients in the pool: + +```javascript +const client = createClientPool({RESP: 3}, { + clientSideCache: { + ttl: 0, + maxEntries: 0, + evictPolicy: "LRU" + }, + minimum: 5 +}); +``` + +For a controllable pooled cache: + +```javascript +import { BasicPooledClientSideCache } from 'redis'; + +const cache = new BasicPooledClientSideCache({ + ttl: 0, + maxEntries: 0, + evictPolicy: "LRU" +}); + +const client = createClientPool({RESP: 3}, { + clientSideCache: cache, + minimum: 5 +}); +``` diff --git a/packages/client/index.ts b/packages/client/index.ts index 1f05bc30341..2deb0c39b47 100644 --- a/packages/client/index.ts +++ b/packages/client/index.ts @@ -34,3 +34,6 @@ export { GEO_REPLY_WITH, GeoReplyWith } from './lib/commands/GEOSEARCH_WITH'; export { SetOptions } from './lib/commands/SET'; export { REDIS_FLUSH_MODES } from './lib/commands/FLUSHALL'; + +export { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache'; + diff --git a/packages/client/lib/RESP/types.ts b/packages/client/lib/RESP/types.ts index 692c433a49d..8749bbdc7b0 100644 --- a/packages/client/lib/RESP/types.ts +++ b/packages/client/lib/RESP/types.ts @@ -314,11 +314,17 @@ export interface CommanderConfig< functions?: F; scripts?: S; /** - * TODO + * Specifies the Redis Serialization Protocol version to use. + * RESP2 is the default (value 2), while RESP3 (value 3) provides + * additional data types and features introduced in Redis 6.0. */ RESP?: RESP; /** - * TODO + * When set to true, enables commands that have unstable RESP3 implementations. + * When using RESP3 protocol, commands marked as having unstable RESP3 support + * will throw an error unless this flag is explicitly set to true. + * This primarily affects modules like Redis Search where response formats + * in RESP3 mode may change in future versions. */ unstableResp3?: boolean; } diff --git a/packages/client/lib/client/cache.spec.ts b/packages/client/lib/client/cache.spec.ts new file mode 100644 index 00000000000..55f2672c26c --- /dev/null +++ b/packages/client/lib/client/cache.spec.ts @@ -0,0 +1,700 @@ +import assert from "assert"; +import testUtils, { GLOBAL } from "../test-utils" +import { BasicClientSideCache, BasicPooledClientSideCache, CacheStats } from "./cache" +import { REDIS_FLUSH_MODES } from "../commands/FLUSHALL"; +import { once } from 'events'; + +describe("Client Side Cache", () => { + describe('Basic Cache', () => { + const csc = new BasicClientSideCache({ maxEntries: 10 }); + + testUtils.testWithClient('Basic Cache Miss', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + + assert.equal(csc.stats().missCount, 1, "Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + + assert.equal(csc.stats().missCount, 1, "Cache Misses"); + assert.equal(csc.stats().hitCount, 1, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Max Cache Entries', async client => { + csc.clear(); + + await client.set('1', 1); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('2'), null); + assert.equal(await client.get('3'), null); + assert.equal(await client.get('4'), null); + assert.equal(await client.get('5'), null); + assert.equal(await client.get('6'), null); + assert.equal(await client.get('7'), null); + assert.equal(await client.get('8'), null); + assert.equal(await client.get('9'), null); + assert.equal(await client.get('10'), null); + assert.equal(await client.get('11'), null); + assert.equal(await client.get('1'), '1'); + + assert.equal(csc.stats().missCount, 12, "Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('LRU works correctly', async client => { + csc.clear(); + + await client.set('1', 1); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('2'), null); + assert.equal(await client.get('3'), null); + assert.equal(await client.get('4'), null); + assert.equal(await client.get('5'), null); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('6'), null); + assert.equal(await client.get('7'), null); + assert.equal(await client.get('8'), null); + assert.equal(await client.get('9'), null); + assert.equal(await client.get('10'), null); + assert.equal(await client.get('11'), null); + assert.equal(await client.get('1'), '1'); + + assert.equal(csc.stats().missCount, 11, "Cache Misses"); + assert.equal(csc.stats().hitCount, 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Basic Cache Clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + csc.clear(); + await client.get("x"); + + assert.equal(csc.stats().missCount, 1, "Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Null Invalidate acts as clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + csc.invalidate(null); + await client.get("x"); + + assert.equal(2, csc.stats().missCount, "Cache Misses"); + assert.equal(0, csc.stats().hitCount, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('flushdb causes an invalidate null', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + await client.flushDb(REDIS_FLUSH_MODES.SYNC); + assert.equal(await client.get("x"), null); + + assert.equal(csc.stats().missCount, 2, "Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Basic Cache Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1', 'first get'); + await client.set("x", 2); + assert.equal(await client.get("x"), '2', 'second get'); + await client.set("x", 3); + assert.equal(await client.get("x"), '3', 'third get'); + + assert.equal(csc.stats().missCount, 3, "Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient("Cached Replies Don't Mutate", async client => { + csc.clear(); + + await client.set("x", 1); + await client.set('y', 2); + const ret1 = await client.mGet(['x', 'y']); + assert.deepEqual(ret1, ['1', '2'], 'first mGet'); + ret1[0] = '4'; + const ret2 = await client.mGet(['x', 'y']); + assert.deepEqual(ret2, ['1', '2'], 'second mGet'); + ret2[0] = '8'; + const ret3 = await client.mGet(['x', 'y']); + assert.deepEqual(ret3, ['1', '2'], 'third mGet'); + + assert.equal(csc.stats().missCount, 1, "Cache Misses"); + assert.equal(csc.stats().hitCount, 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient("Cached cleared on disconnect", async client => { + csc.clear(); + + await client.set("x", 1); + await client.set('y', 2); + const ret1 = await client.mGet(['x', 'y']); + assert.deepEqual(ret1, ['1', '2'], 'first mGet'); + + assert.equal(csc.stats().missCount, 1, "first Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "first Cache Hits"); + + await client.close(); + + await client.connect(); + + const ret2 = await client.mGet(['x', 'y']); + assert.deepEqual(ret2, ['1', '2'], 'second mGet'); + + assert.equal(csc.stats().missCount, 1, "second Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "second Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + }); + + describe("Pooled Cache", () => { + const csc = new BasicPooledClientSideCache(); + + testUtils.testWithClient('Virtual Pool Disconnect', async client1 => { + const client2 = client1.duplicate(); + await client2.connect() + + assert.equal(await client2.get("x"), null); + assert.equal(await client1.get("x"), null); + + assert.equal(1, csc.stats().missCount, "Cache Misses"); + assert.equal(1, csc.stats().hitCount, "Cache Hits"); + + await client2.close(); + + assert.equal(await client1.get("x"), null); + assert.equal(await client1.get("x"), null); + + assert.equal(2, csc.stats().missCount, "Cache Misses"); + assert.equal(2, csc.stats().hitCount, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClientPool('Basic Cache Miss and Clear', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + + assert.equal(1, csc.stats().missCount, "Cache Misses"); + assert.equal(0, csc.stats().hitCount, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + testUtils.testWithClientPool('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + + assert.equal(csc.stats().missCount, 1, "Cache Misses"); + assert.equal(csc.stats().hitCount, 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + testUtils.testWithClientPool('Basic Cache Manually Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + + assert.equal(await client.get("x"), '1', 'first get'); + + let p: Promise> = once(csc, 'invalidate'); + await client.set("x", 2); + let [i] = await p; + + assert.equal(await client.get("x"), '2', 'second get'); + + p = once(csc, 'invalidate'); + await client.set("x", 3); + [i] = await p; + + assert.equal(await client.get("x"), '3'); + + assert.equal(csc.stats().missCount, 3, "Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + testUtils.testWithClientPool('Basic Cache Invalidate via message', async client => { + csc.clear(); + + await client.set('x', 1); + await client.set('y', 2); + + assert.deepEqual(await client.mGet(['x', 'y']), ['1', '2'], 'first mGet'); + + assert.equal(csc.stats().missCount, 1, "Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "Cache Hits"); + + let p: Promise> = once(csc, 'invalidate'); + await client.set("x", 3); + let [i] = await p; + + assert.equal(i, 'x'); + + assert.deepEqual(await client.mGet(['x', 'y']), ['3', '2'], 'second mGet'); + + assert.equal(csc.stats().missCount, 2, "Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "Cache Hits"); + + p = once(csc, 'invalidate'); + await client.set("y", 4); + [i] = await p; + + assert.equal(i, 'y'); + + assert.deepEqual(await client.mGet(['x', 'y']), ['3', '4'], 'second mGet'); + + assert.equal(csc.stats().missCount, 3, "Cache Misses"); + assert.equal(csc.stats().hitCount, 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + }); + + describe('Cluster Caching', () => { + const csc = new BasicPooledClientSideCache(); + + testUtils.testWithCluster('Basic Cache Miss and Clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + await client.set("y", 1); + await client.get("y"); + + assert.equal(2, csc.stats().missCount, "Cache Misses"); + assert.equal(0, csc.stats().hitCount, "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + + testUtils.testWithCluster('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + await client.set("y", 1); + assert.equal(await client.get("y"), '1'); + assert.equal(await client.get("y"), '1'); + assert.equal(await client.get("y"), '1'); + + assert.equal(2, csc.stats().missCount, "Cache Misses"); + assert.equal(4, csc.stats().hitCount, "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + + testUtils.testWithCluster('Basic Cache Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + await client.set("x", 2); + assert.equal(await client.get("x"), '2'); + await client.set("x", 3); + assert.equal(await client.get("x"), '3'); + + await client.set("y", 1); + assert.equal(await client.get("y"), '1'); + await client.set("y", 2); + assert.equal(await client.get("y"), '2'); + await client.set("y", 3); + assert.equal(await client.get("y"), '3'); + + assert.equal(6, csc.stats().missCount, "Cache Misses"); + assert.equal(0, csc.stats().hitCount, "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + }); + describe("CacheStats", () => { + describe("CacheStats.of()", () => { + it("should correctly initialize stats and calculate derived values", () => { + const stats = CacheStats.of(10, 5, 8, 2, 100, 3); + assert.strictEqual(stats.hitCount, 10, "hitCount should be 10"); + assert.strictEqual(stats.missCount, 5, "missCount should be 5"); + assert.strictEqual(stats.loadSuccessCount, 8, "loadSuccessCount should be 8"); + assert.strictEqual(stats.loadFailureCount, 2, "loadFailureCount should be 2"); + assert.strictEqual(stats.totalLoadTime, 100, "totalLoadTime should be 100"); + assert.strictEqual(stats.evictionCount, 3, "evictionCount should be 3"); + + assert.strictEqual(stats.requestCount(), 15, "requestCount should be 15 (10 hits + 5 misses)"); + assert.strictEqual(stats.hitRate(), 10 / 15, "hitRate should be 10/15"); + assert.strictEqual(stats.missRate(), 5 / 15, "missRate should be 5/15"); + assert.strictEqual(stats.loadCount(), 10, "loadCount should be 10 (8 success + 2 failure)"); + assert.strictEqual(stats.loadFailureRate(), 2 / 10, "loadFailureRate should be 2/10"); + assert.strictEqual(stats.averageLoadPenalty(), 100 / 10, "averageLoadPenalty should be 10 (100 time / 10 loads)"); + }); + + it("should handle zero values and division by zero for derived values", () => { + const stats = CacheStats.of(0, 0, 0, 0, 0, 0); + assert.strictEqual(stats.hitCount, 0, "hitCount"); + assert.strictEqual(stats.missCount, 0, "missCount"); + assert.strictEqual(stats.loadSuccessCount, 0, "loadSuccessCount"); + assert.strictEqual(stats.loadFailureCount, 0, "loadFailureCount"); + assert.strictEqual(stats.totalLoadTime, 0, "totalLoadTime"); + assert.strictEqual(stats.evictionCount, 0, "evictionCount"); + + assert.strictEqual(stats.requestCount(), 0, "requestCount should be 0"); + assert.strictEqual(stats.hitRate(), 1, "hitRate should be 1 for 0 requests"); + assert.strictEqual(stats.missRate(), 0, "missRate should be 0 for 0 requests"); + assert.strictEqual(stats.loadCount(), 0, "loadCount should be 0"); + assert.strictEqual(stats.loadFailureRate(), 0, "loadFailureRate should be 0 for 0 loads"); + assert.strictEqual(stats.averageLoadPenalty(), 0, "averageLoadPenalty should be 0 for 0 loads"); + }); + }); + + describe("CacheStats.empty()", () => { + it("should return stats with all zero counts and 0 for rates/penalties", () => { + const stats = CacheStats.empty(); + assert.strictEqual(stats.hitCount, 0, "empty.hitCount"); + assert.strictEqual(stats.missCount, 0, "empty.missCount"); + assert.strictEqual(stats.loadSuccessCount, 0, "empty.loadSuccessCount"); + assert.strictEqual(stats.loadFailureCount, 0, "empty.loadFailureCount"); + assert.strictEqual(stats.totalLoadTime, 0, "empty.totalLoadTime"); + assert.strictEqual(stats.evictionCount, 0, "empty.evictionCount"); + + assert.strictEqual(stats.requestCount(), 0, "empty.requestCount"); + assert.strictEqual(stats.hitRate(), 1, "empty.hitRate should be 1"); + assert.strictEqual(stats.missRate(), 0, "empty.missRate should be 0"); + assert.strictEqual(stats.loadCount(), 0, "empty.loadCount"); + assert.strictEqual(stats.loadFailureRate(), 0, "empty.loadFailureRate should be 0"); + assert.strictEqual(stats.averageLoadPenalty(), 0, "empty.averageLoadPenalty should be 0"); + }); + }); + + describe("instance methods", () => { + const stats1 = CacheStats.of(10, 5, 8, 2, 100, 3); + const stats2 = CacheStats.of(20, 10, 12, 3, 200, 5); + + describe("plus()", () => { + it("should correctly add two CacheStats instances", () => { + const sum = stats1.plus(stats2); + assert.strictEqual(sum.hitCount, 30); + assert.strictEqual(sum.missCount, 15); + assert.strictEqual(sum.loadSuccessCount, 20); + assert.strictEqual(sum.loadFailureCount, 5); + assert.strictEqual(sum.totalLoadTime, 300); + assert.strictEqual(sum.evictionCount, 8); + }); + + it("should correctly sum large numbers", () => { + const statsC = CacheStats.of(Number.MAX_VALUE, 1, 1, 1, 1, 1); + const statsD = CacheStats.of(Number.MAX_VALUE, 1, 1, 1, 1, 1); + const sum = statsC.plus(statsD); + assert.strictEqual(sum.hitCount, Infinity, "Summing MAX_VALUE should result in Infinity"); + }); + }); + + describe("minus()", () => { + it("should correctly subtract one CacheStats instance from another, flooring at 0", () => { + const diff = stats2.minus(stats1); + assert.strictEqual(diff.hitCount, 10); + assert.strictEqual(diff.missCount, 5); + assert.strictEqual(diff.loadSuccessCount, 4); + assert.strictEqual(diff.loadFailureCount, 1); + assert.strictEqual(diff.totalLoadTime, 100); + assert.strictEqual(diff.evictionCount, 2); + }); + + it("should floor results at 0 if minuend is smaller than subtrahend", () => { + const sSmall = CacheStats.of(5, 2, 1, 0, 10, 1); + const sLarge = CacheStats.of(10, 5, 2, 1, 20, 2); + const diff = sSmall.minus(sLarge); + assert.strictEqual(diff.hitCount, 0, "hitCount should be floored at 0 (5 - 10)"); + assert.strictEqual(diff.missCount, 0, "missCount should be floored at 0 (2 - 5)"); + assert.strictEqual(diff.loadSuccessCount, 0, "loadSuccessCount should be floored at 0 (1 - 2)"); + assert.strictEqual(diff.loadFailureCount, 0, "loadFailureCount should be floored at 0 (0 - 1)"); + assert.strictEqual(diff.totalLoadTime, 0, "totalLoadTime should be floored at 0 (10 - 20)"); + assert.strictEqual(diff.evictionCount, 0, "evictionCount should be floored at 0 (1 - 2)"); + }); + }); + + describe("hitRate()", () => { + it("should return 0 if requestCount is 0", () => { + const stats = CacheStats.of(0, 0, 0, 0, 0, 0); + assert.strictEqual(stats.hitRate(), 1); + }); + it("should return 0 if hitCount is 0 but missCount > 0", () => { + const stats = CacheStats.of(0, 1, 0, 0, 0, 0); + assert.strictEqual(stats.hitRate(), 0); + }); + it("should return 1 if missCount is 0 but hitCount > 0", () => { + const stats = CacheStats.of(1, 0, 0, 0, 0, 0); + assert.strictEqual(stats.hitRate(), 1); + }); + }); + + describe("missRate()", () => { + it("should return 0 if requestCount is 0", () => { + const stats = CacheStats.of(0, 0, 0, 0, 0, 0); + assert.strictEqual(stats.missRate(), 0); + }); + it("should return 1 if hitCount is 0 but missCount > 0", () => { + const stats = CacheStats.of(0, 1, 0, 0, 0, 0); + assert.strictEqual(stats.missRate(), 1); + }); + it("should return 0 if missCount is 0 but hitCount > 0", () => { + const stats = CacheStats.of(1, 0, 0, 0, 0, 0); + assert.strictEqual(stats.missRate(), 0); + }); + }); + + describe("loadFailureRate()", () => { + it("should return 0 if loadCount is 0", () => { + const stats = CacheStats.of(0, 0, 0, 0, 0, 0); + assert.strictEqual(stats.loadFailureRate(), 0); + }); + it("should return 0 if loadFailureCount is 0 but loadSuccessCount > 0", () => { + const stats = CacheStats.of(0, 0, 1, 0, 10, 0); + assert.strictEqual(stats.loadFailureRate(), 0); + }); + it("should return 1 if loadSuccessCount is 0 but loadFailureCount > 0", () => { + const stats = CacheStats.of(0, 0, 0, 1, 10, 0); + assert.strictEqual(stats.loadFailureRate(), 1); + }); + }); + + describe("averageLoadPenalty()", () => { + it("should return 0 if loadCount is 0, even if totalLoadTime > 0", () => { + const stats = CacheStats.of(0, 0, 0, 0, 100, 0); + assert.strictEqual(stats.averageLoadPenalty(), 0); + }); + it("should return 0 if totalLoadTime is 0 and loadCount > 0", () => { + const stats = CacheStats.of(0, 0, 1, 1, 0, 0); + assert.strictEqual(stats.averageLoadPenalty(), 0); + }); + }); + }); + }); + it('should reflect comprehensive cache operations in stats via BasicClientSideCache', async function () { + + const csc = new BasicClientSideCache({ + maxEntries: 2, // Small size to easily trigger evictions + }); + + testUtils.testWithClient('comprehensive_stats_run', async client => { + + // --- Phase 1: Initial misses and loads --- + await client.set('keyA', 'valueA_1'); + assert.strictEqual(await client.get('keyA'), 'valueA_1', "Get keyA first time"); + assert.strictEqual(csc.stats().missCount, 1); + assert.strictEqual(csc.stats().loadSuccessCount, 1); + + await client.set('keyB', 'valueB_1'); + assert.strictEqual(await client.get('keyB'), 'valueB_1', "Get keyB first time"); + assert.strictEqual(csc.stats().missCount, 2); + assert.strictEqual(csc.stats().loadSuccessCount, 2); + + // --- Phase 2: Cache hits --- + assert.strictEqual(await client.get('keyA'), 'valueA_1', "Get keyA second time (hit)"); + assert.strictEqual(csc.stats().hitCount, 1); + + assert.strictEqual(await client.get('keyB'), 'valueB_1', "Get keyB second time (hit)"); + assert.strictEqual(csc.stats().hitCount, 2); + + + // --- Phase 3: Trigger evictions and more misses/loads --- + await client.set('keyC', 'valueC_1'); + assert.strictEqual(await client.get('keyC'), 'valueC_1', "Get keyC first time (evicts keyA)"); + assert.strictEqual(csc.stats().missCount, 3); + assert.strictEqual(csc.stats().loadSuccessCount, 3); + assert.strictEqual(csc.stats().evictionCount, 1); + + + assert.strictEqual(await client.get('keyA'), 'valueA_1', "Get keyA again (miss after eviction)"); + assert.strictEqual(csc.stats().missCount, 4); + assert.strictEqual(csc.stats().loadSuccessCount, 4); + assert.strictEqual(csc.stats().evictionCount, 2); + + + // --- Phase 4: More hits --- + assert.strictEqual(await client.get('keyC'), 'valueC_1', "Get keyC again (hit)"); + assert.strictEqual(csc.stats().hitCount, 3); + + // --- Phase 5: Update a key (results in invalidation, then miss/load on next GET) --- + // Note: A SET operation on an existing cached key should invalidate it. + // The invalidation itself isn't directly a "hit" or "miss" for stats, + // but the *next* GET will be a miss. + await client.set('keyA', 'valueA_2'); + assert.strictEqual(await client.get('keyA'), 'valueA_2', "Get keyA after SET (miss due to invalidation)"); + + assert.strictEqual(csc.stats().hitCount, 3); + assert.strictEqual(csc.stats().loadSuccessCount, 5); + + + + const stats = csc.stats() + + assert.strictEqual(stats.hitCount, 3, "Final hitCount"); + assert.strictEqual(stats.missCount, 5, "Final missCount"); + assert.strictEqual(stats.loadSuccessCount, 5, "Final loadSuccessCount"); + assert.strictEqual(stats.loadFailureCount, 0, "Final loadFailureCount (expected 0 for this test)"); + assert.strictEqual(stats.evictionCount, 2, "Final evictionCount"); + assert.ok(stats.totalLoadTime >= 0, "Final totalLoadTime should be non-negative"); + + assert.strictEqual(stats.requestCount(), 8, "Final requestCount (5 misses + 3 hits)"); + assert.strictEqual(stats.hitRate(), 3 / 8, "Final hitRate"); + assert.strictEqual(stats.missRate(), 5 / 8, "Final missRate"); + + assert.strictEqual(stats.loadCount(), 5, "Final loadCount (5 success + 0 failure)"); + assert.strictEqual(stats.loadFailureRate(), 0, "Final loadFailureRate (0 failures / 5 loads)"); + + if (stats.loadCount() > 0) { + assert.ok(stats.averageLoadPenalty() >= 0, "Final averageLoadPenalty should be non-negative"); + assert.strictEqual(stats.averageLoadPenalty(), stats.totalLoadTime / stats.loadCount(), "Average load penalty calculation"); + } else { + assert.strictEqual(stats.averageLoadPenalty(), 0, "Final averageLoadPenalty should be 0 if no loads"); + } + + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + }); +}); diff --git a/packages/client/lib/client/cache.ts b/packages/client/lib/client/cache.ts new file mode 100644 index 00000000000..7254352ee8f --- /dev/null +++ b/packages/client/lib/client/cache.ts @@ -0,0 +1,870 @@ +import { EventEmitter } from 'stream'; +import RedisClient from '.'; +import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types'; +import { BasicCommandParser } from './parser'; + +/** + * A snapshot of cache statistics. + * + * This class provides an immutable view of the cache's operational statistics at a particular + * point in time. It is heavily inspired by the statistics reporting capabilities found in + * Ben Manes's Caffeine cache (https://github.com/ben-manes/caffeine). + * + * Instances of `CacheStats` are typically obtained from a {@link StatsCounter} and can be used + * for performance monitoring, debugging, or logging. It includes metrics such as hit rate, + * miss rate, load success/failure rates, average load penalty, and eviction counts. + * + * All statistics are non-negative. Rates and averages are typically in the range `[0.0, 1.0]`, + * or `0` if the an operation has not occurred (e.g. hit rate is 0 if there are no requests). + * + * Cache statistics are incremented according to specific rules: + * - When a cache lookup encounters an existing entry, hitCount is incremented. + * - When a cache lookup encounters a missing entry, missCount is incremented. + * - When a new entry is successfully loaded, loadSuccessCount is incremented and the + * loading time is added to totalLoadTime. + * - When an entry fails to load, loadFailureCount is incremented and the + * loading time is added to totalLoadTime. + * - When an entry is evicted due to size constraints or expiration, + * evictionCount is incremented. + */ +export class CacheStats { + /** + * Creates a new CacheStats instance with the specified statistics. + */ + private constructor( + public readonly hitCount: number, + public readonly missCount: number, + public readonly loadSuccessCount: number, + public readonly loadFailureCount: number, + public readonly totalLoadTime: number, + public readonly evictionCount: number + ) { + if ( + hitCount < 0 || + missCount < 0 || + loadSuccessCount < 0 || + loadFailureCount < 0 || + totalLoadTime < 0 || + evictionCount < 0 + ) { + throw new Error('All statistics values must be non-negative'); + } + } + + /** + * Creates a new CacheStats instance with the specified statistics. + * + * @param hitCount - Number of cache hits + * @param missCount - Number of cache misses + * @param loadSuccessCount - Number of successful cache loads + * @param loadFailureCount - Number of failed cache loads + * @param totalLoadTime - Total load time in milliseconds + * @param evictionCount - Number of cache evictions + */ + static of( + hitCount = 0, + missCount = 0, + loadSuccessCount = 0, + loadFailureCount = 0, + totalLoadTime = 0, + evictionCount = 0 + ): CacheStats { + return new CacheStats( + hitCount, + missCount, + loadSuccessCount, + loadFailureCount, + totalLoadTime, + evictionCount + ); + } + + /** + * Returns a statistics instance where no cache events have been recorded. + * + * @returns An empty statistics instance + */ + static empty(): CacheStats { + return CacheStats.EMPTY_STATS; + } + + /** + * An empty stats instance with all counters set to zero. + */ + private static readonly EMPTY_STATS = new CacheStats(0, 0, 0, 0, 0, 0); + + /** + * Returns the total number of times cache lookup methods have returned + * either a cached or uncached value. + * + * @returns Total number of requests (hits + misses) + */ + requestCount(): number { + return this.hitCount + this.missCount; + } + + /** + * Returns the hit rate of the cache. + * This is defined as hitCount / requestCount, or 1.0 when requestCount is 0. + * + * @returns The ratio of cache requests that were hits (between 0.0 and 1.0) + */ + hitRate(): number { + const requestCount = this.requestCount(); + return requestCount === 0 ? 1.0 : this.hitCount / requestCount; + } + + /** + * Returns the miss rate of the cache. + * This is defined as missCount / requestCount, or 0.0 when requestCount is 0. + * + * @returns The ratio of cache requests that were misses (between 0.0 and 1.0) + */ + missRate(): number { + const requestCount = this.requestCount(); + return requestCount === 0 ? 0.0 : this.missCount / requestCount; + } + + /** + * Returns the total number of load operations (successful + failed). + * + * @returns Total number of load operations + */ + loadCount(): number { + return this.loadSuccessCount + this.loadFailureCount; + } + + /** + * Returns the ratio of cache loading attempts that failed. + * This is defined as loadFailureCount / loadCount, or 0.0 when loadCount is 0. + * + * @returns Ratio of load operations that failed (between 0.0 and 1.0) + */ + loadFailureRate(): number { + const loadCount = this.loadCount(); + return loadCount === 0 ? 0.0 : this.loadFailureCount / loadCount; + } + + /** + * Returns the average time spent loading new values, in milliseconds. + * This is defined as totalLoadTime / loadCount, or 0.0 when loadCount is 0. + * + * @returns Average load time in milliseconds + */ + averageLoadPenalty(): number { + const loadCount = this.loadCount(); + return loadCount === 0 ? 0.0 : this.totalLoadTime / loadCount; + } + + /** + * Returns a new CacheStats representing the difference between this CacheStats + * and another. Negative values are rounded up to zero. + * + * @param other - The statistics to subtract from this instance + * @returns The difference between this instance and other + */ + minus(other: CacheStats): CacheStats { + return CacheStats.of( + Math.max(0, this.hitCount - other.hitCount), + Math.max(0, this.missCount - other.missCount), + Math.max(0, this.loadSuccessCount - other.loadSuccessCount), + Math.max(0, this.loadFailureCount - other.loadFailureCount), + Math.max(0, this.totalLoadTime - other.totalLoadTime), + Math.max(0, this.evictionCount - other.evictionCount) + ); + } + + /** + * Returns a new CacheStats representing the sum of this CacheStats and another. + * + * @param other - The statistics to add to this instance + * @returns The sum of this instance and other + */ + plus(other: CacheStats): CacheStats { + return CacheStats.of( + this.hitCount + other.hitCount, + this.missCount + other.missCount, + this.loadSuccessCount + other.loadSuccessCount, + this.loadFailureCount + other.loadFailureCount, + this.totalLoadTime + other.totalLoadTime, + this.evictionCount + other.evictionCount + ); + } +} + +/** + * An accumulator for cache statistics. + * + * This interface defines the contract for objects that record cache-related events + * such as hits, misses, loads (successes and failures), and evictions. The design + * is inspired by the statistics collection mechanisms in Ben Manes's Caffeine cache + * (https://github.com/ben-manes/caffeine). + * + * Implementations of this interface are responsible for aggregating these events. + * A snapshot of the current statistics can be obtained by calling the `snapshot()` + * method, which returns an immutable {@link CacheStats} object. + * + * Common implementations include `DefaultStatsCounter` for active statistics collection + * and `DisabledStatsCounter` for a no-op version when stats are not needed. + */ +export interface StatsCounter { + /** + * Records cache hits. This should be called when a cache request returns a cached value. + * + * @param count - The number of hits to record + */ + recordHits(count: number): void; + + /** + * Records cache misses. This should be called when a cache request returns a value that was not + * found in the cache. + * + * @param count - The number of misses to record + */ + recordMisses(count: number): void; + + /** + * Records the successful load of a new entry. This method should be called when a cache request + * causes an entry to be loaded and the loading completes successfully. + * + * @param loadTime - The number of milliseconds the cache spent computing or retrieving the new value + */ + recordLoadSuccess(loadTime: number): void; + + /** + * Records the failed load of a new entry. This method should be called when a cache request + * causes an entry to be loaded, but an exception is thrown while loading the entry. + * + * @param loadTime - The number of milliseconds the cache spent computing or retrieving the new value + * prior to the failure + */ + recordLoadFailure(loadTime: number): void; + + /** + * Records the eviction of an entry from the cache. This should only be called when an entry is + * evicted due to the cache's eviction strategy, and not as a result of manual invalidations. + * + * @param count - The number of evictions to record + */ + recordEvictions(count: number): void; + + /** + * Returns a snapshot of this counter's values. Note that this may be an inconsistent view, as it + * may be interleaved with update operations. + * + * @return A snapshot of this counter's values + */ + snapshot(): CacheStats; +} + +/** + * A StatsCounter implementation that does nothing and always returns empty stats. + */ +class DisabledStatsCounter implements StatsCounter { + static readonly INSTANCE = new DisabledStatsCounter(); + + private constructor() { } + + recordHits(count: number): void { } + recordMisses(count: number): void { } + recordLoadSuccess(loadTime: number): void { } + recordLoadFailure(loadTime: number): void { } + recordEvictions(count: number): void { } + snapshot(): CacheStats { return CacheStats.empty(); } +} + +/** + * Returns a StatsCounter that does not record any cache events. + * + * @return A StatsCounter that does not record metrics + */ +function disabledStatsCounter(): StatsCounter { + return DisabledStatsCounter.INSTANCE; +} + +/** + * A StatsCounter implementation that maintains cache statistics. + */ +class DefaultStatsCounter implements StatsCounter { + #hitCount = 0; + #missCount = 0; + #loadSuccessCount = 0; + #loadFailureCount = 0; + #totalLoadTime = 0; + #evictionCount = 0; + + /** + * Records cache hits. + * + * @param count - The number of hits to record + */ + recordHits(count: number): void { + this.#hitCount += count; + } + + /** + * Records cache misses. + * + * @param count - The number of misses to record + */ + recordMisses(count: number): void { + this.#missCount += count; + } + + /** + * Records the successful load of a new entry. + * + * @param loadTime - The number of milliseconds spent loading the entry + */ + recordLoadSuccess(loadTime: number): void { + this.#loadSuccessCount++; + this.#totalLoadTime += loadTime; + } + + /** + * Records the failed load of a new entry. + * + * @param loadTime - The number of milliseconds spent attempting to load the entry + */ + recordLoadFailure(loadTime: number): void { + this.#loadFailureCount++; + this.#totalLoadTime += loadTime; + } + + /** + * Records cache evictions. + * + * @param count - The number of evictions to record + */ + recordEvictions(count: number): void { + this.#evictionCount += count; + } + + /** + * Returns a snapshot of the current statistics. + * + * @returns A snapshot of the current statistics + */ + snapshot(): CacheStats { + return CacheStats.of( + this.#hitCount, + this.#missCount, + this.#loadSuccessCount, + this.#loadFailureCount, + this.#totalLoadTime, + this.#evictionCount + ); + } + + /** + * Creates a new DefaultStatsCounter. + * + * @returns A new DefaultStatsCounter instance + */ + static create(): DefaultStatsCounter { + return new DefaultStatsCounter(); + } +} + +type CachingClient = RedisClient; +type CmdFunc = () => Promise; + +type EvictionPolicy = "LRU" | "FIFO" + +/** + * Configuration options for Client Side Cache + */ +export interface ClientSideCacheConfig { + /** + * Time-to-live in milliseconds for cached entries. + * Use 0 for no expiration. + * @default 0 + */ + ttl?: number; + + /** + * Maximum number of entries to store in the cache. + * Use 0 for unlimited entries. + * @default 0 + */ + maxEntries?: number; + + /** + * Eviction policy to use when the cache reaches its capacity. + * - "LRU" (Least Recently Used): Evicts least recently accessed entries first + * - "FIFO" (First In First Out): Evicts oldest entries first + * @default "LRU" + */ + evictPolicy?: EvictionPolicy; + + /** + * Whether to collect statistics about cache operations. + * @default true + */ + recordStats?: boolean; +} + +interface CacheCreator { + epoch: number; + client: CachingClient; +} + +interface ClientSideCacheEntry { + invalidate(): void; + validate(): boolean; +} + +/** + * Generates a unique cache key from Redis command arguments + * + * @param redisArgs - Array of Redis command arguments + * @returns A unique string key for caching + */ +function generateCacheKey(redisArgs: ReadonlyArray): string { + const tmp = new Array(redisArgs.length * 2); + + for (let i = 0; i < redisArgs.length; i++) { + tmp[i] = redisArgs[i].length; + tmp[i + redisArgs.length] = redisArgs[i]; + } + + return tmp.join('_'); +} + +abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry { + #invalidated = false; + readonly #expireTime: number; + + constructor(ttl: number) { + if (ttl == 0) { + this.#expireTime = 0; + } else { + this.#expireTime = Date.now() + ttl; + } + } + + invalidate(): void { + this.#invalidated = true; + } + + validate(): boolean { + return !this.#invalidated && (this.#expireTime == 0 || (Date.now() < this.#expireTime)) + } +} + +class ClientSideCacheEntryValue extends ClientSideCacheEntryBase { + readonly #value: any; + + get value() { + return this.#value; + } + + constructor(ttl: number, value: any) { + super(ttl); + this.#value = value; + } +} + +class ClientSideCacheEntryPromise extends ClientSideCacheEntryBase { + readonly #sendCommandPromise: Promise; + + get promise() { + return this.#sendCommandPromise; + } + + constructor(ttl: number, sendCommandPromise: Promise) { + super(ttl); + this.#sendCommandPromise = sendCommandPromise; + } +} + +export abstract class ClientSideCacheProvider extends EventEmitter { + abstract handleCache(client: CachingClient, parser: BasicCommandParser, fn: CmdFunc, transformReply: TransformReply | undefined, typeMapping: TypeMapping | undefined): Promise; + abstract trackingOn(): Array; + abstract invalidate(key: RedisArgument | null): void; + abstract clear(): void; + abstract stats(): CacheStats; + abstract onError(): void; + abstract onClose(): void; +} + +export class BasicClientSideCache extends ClientSideCacheProvider { + #cacheKeyToEntryMap: Map; + #keyToCacheKeySetMap: Map>; + readonly ttl: number; + readonly maxEntries: number; + readonly lru: boolean; + #statsCounter: StatsCounter; + + + recordEvictions(count: number): void { + this.#statsCounter.recordEvictions(count); + } + + recordHits(count: number): void { + this.#statsCounter.recordHits(count); + } + + recordMisses(count: number): void { + this.#statsCounter.recordMisses(count); + } + + constructor(config?: ClientSideCacheConfig) { + super(); + + this.#cacheKeyToEntryMap = new Map(); + this.#keyToCacheKeySetMap = new Map>(); + this.ttl = config?.ttl ?? 0; + this.maxEntries = config?.maxEntries ?? 0; + this.lru = config?.evictPolicy !== "FIFO"; + + const recordStats = config?.recordStats !== false; + this.#statsCounter = recordStats ? DefaultStatsCounter.create() : disabledStatsCounter(); + } + + /* logic of how caching works: + + 1. commands use a CommandParser + it enables us to define/retrieve + cacheKey - a unique key that corresponds to this command and its arguments + redisKeys - an array of redis keys as strings that if the key is modified, will cause redis to invalidate this result when cached + 2. check if cacheKey is in our cache + 2b1. if its a value cacheEntry - return it + 2b2. if it's a promise cache entry - wait on promise and then go to 3c. + 3. if cacheEntry is not in cache + 3a. send the command save the promise into a a cacheEntry and then wait on result + 3b. transform reply (if required) based on transformReply + 3b. check the cacheEntry is still valid - in cache and hasn't been deleted) + 3c. if valid - overwrite with value entry + 4. return previously non cached result + */ + override async handleCache( + client: CachingClient, + parser: BasicCommandParser, + fn: CmdFunc, + transformReply?: TransformReply, + typeMapping?: TypeMapping + ) { + let reply: ReplyUnion; + + const cacheKey = generateCacheKey(parser.redisArgs); + + // "2" + let cacheEntry = this.get(cacheKey); + if (cacheEntry) { + // If instanceof is "too slow", can add a "type" and then use an "as" cast to call proper getters. + if (cacheEntry instanceof ClientSideCacheEntryValue) { // "2b1" + this.#statsCounter.recordHits(1); + + return structuredClone(cacheEntry.value); + } else if (cacheEntry instanceof ClientSideCacheEntryPromise) { // 2b2 + // This counts as a miss since the value hasn't been fully loaded yet. + this.#statsCounter.recordMisses(1); + reply = await cacheEntry.promise; + } else { + throw new Error("unknown cache entry type"); + } + } else { // 3/3a + this.#statsCounter.recordMisses(1); + + const startTime = performance.now(); + const promise = fn(); + + cacheEntry = this.createPromiseEntry(client, promise); + this.set(cacheKey, cacheEntry, parser.keys); + + try { + reply = await promise; + const loadTime = performance.now() - startTime; + this.#statsCounter.recordLoadSuccess(loadTime); + } catch (err) { + const loadTime = performance.now() - startTime; + this.#statsCounter.recordLoadFailure(loadTime); + + if (cacheEntry.validate()) { + this.delete(cacheKey!); + } + + throw err; + } + } + + // 3b + let val; + if (transformReply) { + val = transformReply(reply, parser.preserve, typeMapping); + } else { + val = reply; + } + + // 3c + if (cacheEntry.validate()) { // revalidating promise entry (dont save value, if promise entry has been invalidated) + // 3d + cacheEntry = this.createValueEntry(client, val); + this.set(cacheKey, cacheEntry, parser.keys); + this.emit("cached-key", cacheKey); + } else { + // cache entry for key got invalidated between execution and saving, so not saving + } + + return structuredClone(val); + } + + override trackingOn() { + return ['CLIENT', 'TRACKING', 'ON']; + } + + override invalidate(key: RedisArgument | null) { + if (key === null) { + this.clear(false); + this.emit("invalidate", key); + + return; + } + + const keySet = this.#keyToCacheKeySetMap.get(key.toString()); + if (keySet) { + for (const cacheKey of keySet) { + const entry = this.#cacheKeyToEntryMap.get(cacheKey); + if (entry) { + entry.invalidate(); + } + this.#cacheKeyToEntryMap.delete(cacheKey); + } + this.#keyToCacheKeySetMap.delete(key.toString()); + } + + this.emit('invalidate', key); + } + + override clear(resetStats = true) { + const oldSize = this.#cacheKeyToEntryMap.size; + this.#cacheKeyToEntryMap.clear(); + this.#keyToCacheKeySetMap.clear(); + + if (resetStats) { + if (!(this.#statsCounter instanceof DisabledStatsCounter)) { + this.#statsCounter = DefaultStatsCounter.create(); + } + } else { + // If old entries were evicted due to clear, record them as evictions + if (oldSize > 0) { + this.#statsCounter.recordEvictions(oldSize); + } + } + } + + get(cacheKey: string) { + const val = this.#cacheKeyToEntryMap.get(cacheKey); + + if (val && !val.validate()) { + this.delete(cacheKey); + this.#statsCounter.recordEvictions(1); + this.emit("cache-evict", cacheKey); + + return undefined; + } + + if (val !== undefined && this.lru) { + this.#cacheKeyToEntryMap.delete(cacheKey); + this.#cacheKeyToEntryMap.set(cacheKey, val); + } + + return val; + } + + delete(cacheKey: string) { + const entry = this.#cacheKeyToEntryMap.get(cacheKey); + if (entry) { + entry.invalidate(); + this.#cacheKeyToEntryMap.delete(cacheKey); + } + } + + has(cacheKey: string) { + return this.#cacheKeyToEntryMap.has(cacheKey); + } + + set(cacheKey: string, cacheEntry: ClientSideCacheEntry, keys: Array) { + let count = this.#cacheKeyToEntryMap.size; + const oldEntry = this.#cacheKeyToEntryMap.get(cacheKey); + + if (oldEntry) { + count--; // overwriting, so not incrementig + oldEntry.invalidate(); + } + + if (this.maxEntries > 0 && count >= this.maxEntries) { + this.deleteOldest(); + this.#statsCounter.recordEvictions(1); + } + + this.#cacheKeyToEntryMap.set(cacheKey, cacheEntry); + + for (const key of keys) { + if (!this.#keyToCacheKeySetMap.has(key.toString())) { + this.#keyToCacheKeySetMap.set(key.toString(), new Set()); + } + + const cacheKeySet = this.#keyToCacheKeySetMap.get(key.toString()); + cacheKeySet!.add(cacheKey); + } + } + + size() { + return this.#cacheKeyToEntryMap.size; + } + + createValueEntry(client: CachingClient, value: any): ClientSideCacheEntryValue { + return new ClientSideCacheEntryValue(this.ttl, value); + } + + createPromiseEntry(client: CachingClient, sendCommandPromise: Promise): ClientSideCacheEntryPromise { + return new ClientSideCacheEntryPromise(this.ttl, sendCommandPromise); + } + + override stats(): CacheStats { + return this.#statsCounter.snapshot(); + } + + override onError(): void { + this.clear(); + } + + override onClose() { + this.clear(); + } + + /** + * @internal + */ + deleteOldest() { + const it = this.#cacheKeyToEntryMap[Symbol.iterator](); + const n = it.next(); + if (!n.done) { + const key = n.value[0]; + const entry = this.#cacheKeyToEntryMap.get(key); + if (entry) { + entry.invalidate(); + } + this.#cacheKeyToEntryMap.delete(key); + } + } + + /** + * Get cache entries for debugging + * @internal + */ + entryEntries(): IterableIterator<[string, ClientSideCacheEntry]> { + return this.#cacheKeyToEntryMap.entries(); + } + + /** + * Get key set entries for debugging + * @internal + */ + keySetEntries(): IterableIterator<[string, Set]> { + return this.#keyToCacheKeySetMap.entries(); + } +} + +export abstract class PooledClientSideCacheProvider extends BasicClientSideCache { + #disabled = false; + + disable(): void { + this.#disabled = true; + } + + enable(): void { + this.#disabled = false; + } + + override get(cacheKey: string): ClientSideCacheEntry | undefined { + if (this.#disabled) { + return undefined; + } + + return super.get(cacheKey); + } + + override has(cacheKey: string): boolean { + if (this.#disabled) { + return false; + } + + return super.has(cacheKey); + } + + onPoolClose(): void { + this.clear(); + } +} + +export class BasicPooledClientSideCache extends PooledClientSideCacheProvider { + override onError() { + this.clear(false); + } + + override onClose() { + this.clear(false); + } +} + +class PooledClientSideCacheEntryValue extends ClientSideCacheEntryValue { + #creator: CacheCreator; + + constructor(ttl: number, creator: CacheCreator, value: any) { + super(ttl, value); + + this.#creator = creator; + } + + override validate(): boolean { + let ret = super.validate(); + if (this.#creator) { + ret = ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch + } + + return ret; + } +} + +class PooledClientSideCacheEntryPromise extends ClientSideCacheEntryPromise { + #creator: CacheCreator; + + constructor(ttl: number, creator: CacheCreator, sendCommandPromise: Promise) { + super(ttl, sendCommandPromise); + + this.#creator = creator; + } + + override validate(): boolean { + let ret = super.validate(); + + return ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch + } +} + +export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache { + override createValueEntry(client: CachingClient, value: any): ClientSideCacheEntryValue { + const creator = { + epoch: client.socketEpoch, + client: client + }; + + return new PooledClientSideCacheEntryValue(this.ttl, creator, value); + } + + override createPromiseEntry(client: CachingClient, sendCommandPromise: Promise): ClientSideCacheEntryPromise { + const creator = { + epoch: client.socketEpoch, + client: client + }; + + return new PooledClientSideCacheEntryPromise(this.ttl, creator, sendCommandPromise); + } + + override onError() { } + + override onClose() { } +} diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 15e8a747b98..78c0a01b203 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -56,6 +56,8 @@ export default class RedisCommandsQueue { return this.#pubSub.isActive; } + #invalidateCallback?: (key: RedisArgument | null) => unknown; + constructor( respVersion: RespVersions, maxLength: number | null | undefined, @@ -107,15 +109,34 @@ export default class RedisCommandsQueue { return new Decoder({ onReply: reply => this.#onReply(reply), onErrorReply: err => this.#onErrorReply(err), + //TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used onPush: push => { if (!this.#onPush(push)) { - + // currently only supporting "invalidate" over RESP3 push messages + switch (push[0].toString()) { + case "invalidate": { + if (this.#invalidateCallback) { + if (push[1] !== null) { + for (const key of push[1]) { + this.#invalidateCallback(key); + } + } else { + this.#invalidateCallback(null); + } + } + break; + } + } } }, getTypeMapping: () => this.#getTypeMapping() }); } + setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) { + this.#invalidateCallback = callback; + } + addCommand( args: ReadonlyArray, options?: CommandOptions diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index c71cf1a1fad..cc052dd5b51 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -24,10 +24,44 @@ export const SQUARE_SCRIPT = defineScript({ }); describe('Client', () => { + describe('initialization', () => { + describe('clientSideCache validation', () => { + const clientSideCacheConfig = { ttl: 0, maxEntries: 0 }; + + it('should throw error when clientSideCache is enabled with RESP 2', () => { + assert.throws( + () => new RedisClient({ + clientSideCache: clientSideCacheConfig, + RESP: 2, + }), + new Error('Client Side Caching is only supported with RESP3') + ); + }); + + it('should throw error when clientSideCache is enabled with RESP undefined', () => { + assert.throws( + () => new RedisClient({ + clientSideCache: clientSideCacheConfig, + }), + new Error('Client Side Caching is only supported with RESP3') + ); + }); + + it('should not throw when clientSideCache is enabled with RESP 3', () => { + assert.doesNotThrow(() => + new RedisClient({ + clientSideCache: clientSideCacheConfig, + RESP: 3, + }) + ); + }); + }); + }); + describe('parseURL', () => { it('redis://user:secret@localhost:6379/0', async () => { const result = RedisClient.parseURL('redis://user:secret@localhost:6379/0'); - const expected : RedisClientOptions = { + const expected: RedisClientOptions = { socket: { host: 'localhost', port: 6379 @@ -51,8 +85,8 @@ describe('Client', () => { // Compare non-function properties assert.deepEqual(resultRest, expectedRest); - if(result.credentialsProvider.type === 'async-credentials-provider' - && expected.credentialsProvider.type === 'async-credentials-provider') { + if (result?.credentialsProvider?.type === 'async-credentials-provider' + && expected?.credentialsProvider?.type === 'async-credentials-provider') { // Compare the actual output of the credentials functions const resultCreds = await result.credentialsProvider.credentials(); @@ -91,10 +125,10 @@ describe('Client', () => { // Compare non-function properties assert.deepEqual(resultRest, expectedRest); - assert.equal(resultCredProvider.type, expectedCredProvider.type); + assert.equal(resultCredProvider?.type, expectedCredProvider?.type); - if (result.credentialsProvider.type === 'async-credentials-provider' && - expected.credentialsProvider.type === 'async-credentials-provider') { + if (result?.credentialsProvider?.type === 'async-credentials-provider' && + expected?.credentialsProvider?.type === 'async-credentials-provider') { // Compare the actual output of the credentials functions const resultCreds = await result.credentialsProvider.credentials(); @@ -150,11 +184,11 @@ describe('Client', () => { testUtils.testWithClient('Client can authenticate using the streaming credentials provider for initial token acquisition', async client => { - assert.equal( - await client.ping(), - 'PONG' - ); - }, GLOBAL.SERVERS.STREAMING_AUTH); + assert.equal( + await client.ping(), + 'PONG' + ); + }, GLOBAL.SERVERS.STREAMING_AUTH); testUtils.testWithClient('should execute AUTH before SELECT', async client => { assert.equal( @@ -408,7 +442,7 @@ describe('Client', () => { }); testUtils.testWithClient('functions', async client => { - const [,, reply] = await Promise.all([ + const [, , reply] = await Promise.all([ loadMathFunction(client), client.set('key', '2'), client.math.square('key') @@ -522,8 +556,8 @@ describe('Client', () => { const hash: Record = {}; const expectedFields: Array = []; for (let i = 0; i < 100; i++) { - hash[i.toString()] = i.toString(); - expectedFields.push(i.toString()); + hash[i.toString()] = i.toString(); + expectedFields.push(i.toString()); } await client.hSet('key', hash); @@ -842,7 +876,7 @@ describe('Client', () => { testUtils.testWithClient('should be able to go back to "normal mode"', async client => { await Promise.all([ - client.monitor(() => {}), + client.monitor(() => { }), client.reset() ]); await assert.doesNotReject(client.ping()); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index f3e72a3a172..c7f94fe680a 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -16,6 +16,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN'; import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; import { RedisPoolOptions, RedisClientPool } from './pool'; import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers'; +import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache'; import { BasicCommandParser, CommandParser } from './parser'; import SingleEntryCache from '../single-entry-cache'; @@ -78,45 +79,98 @@ export interface RedisClientOptions< */ pingInterval?: number; /** - * TODO + * Default command options to be applied to all commands executed through this client. + * + * These options can be overridden on a per-command basis when calling specific commands. + * + * @property {symbol} [chainId] - Identifier for chaining commands together + * @property {boolean} [asap] - When true, the command is executed as soon as possible + * @property {AbortSignal} [abortSignal] - AbortSignal to cancel the command + * @property {TypeMapping} [typeMapping] - Custom type mappings between RESP and JavaScript types + * + * @example Setting default command options + * ``` + * const client = createClient({ + * commandOptions: { + * asap: true, + * typeMapping: { + * // Custom type mapping configuration + * } + * } + * }); + * ``` */ commandOptions?: CommandOptions; + /** + * Client Side Caching configuration. + * + * Enables Redis Servers and Clients to work together to cache results from commands + * sent to a server. The server will notify the client when cached results are no longer valid. + * + * Note: Client Side Caching is only supported with RESP3. + * + * @example Anonymous cache configuration + * ``` + * const client = createClient({ + * RESP: 3, + * clientSideCache: { + * ttl: 0, + * maxEntries: 0, + * evictPolicy: "LRU" + * } + * }); + * ``` + * + * @example Using a controllable cache + * ``` + * const cache = new BasicClientSideCache({ + * ttl: 0, + * maxEntries: 0, + * evictPolicy: "LRU" + * }); + * const client = createClient({ + * RESP: 3, + * clientSideCache: cache + * }); + * ``` + */ + clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig; } type WithCommands< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING>; -}; + [P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING>; + }; type WithModules< M extends RedisModules, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof M]: { - [C in keyof M[P]]: CommandSignature; + [P in keyof M]: { + [C in keyof M[P]]: CommandSignature; + }; }; -}; type WithFunctions< F extends RedisFunctions, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [L in keyof F]: { - [C in keyof F[L]]: CommandSignature; + [L in keyof F]: { + [C in keyof F[L]]: CommandSignature; + }; }; -}; type WithScripts< S extends RedisScripts, RESP extends RespVersions, TYPE_MAPPING extends TypeMapping > = { - [P in keyof S]: CommandSignature; -}; + [P in keyof S]: CommandSignature; + }; export type RedisClientExtensions< M extends RedisModules = {}, @@ -125,11 +179,11 @@ export type RedisClientExtensions< RESP extends RespVersions = 2, TYPE_MAPPING extends TypeMapping = {} > = ( - WithCommands & - WithModules & - WithFunctions & - WithScripts -); + WithCommands & + WithModules & + WithFunctions & + WithScripts + ); export type RedisClientType< M extends RedisModules = {}, @@ -138,9 +192,9 @@ export type RedisClientType< RESP extends RespVersions = 2, TYPE_MAPPING extends TypeMapping = {} > = ( - RedisClient & - RedisClientExtensions -); + RedisClient & + RedisClientExtensions + ); type ProxyClient = RedisClient; @@ -309,14 +363,17 @@ export default class RedisClient< #monitorCallback?: MonitorCallback; private _self = this; private _commandOptions?: CommandOptions; - // flag used to annotate that the client - // was in a watch transaction when + // flag used to annotate that the client + // was in a watch transaction when // a topology change occured #dirtyWatch?: string; - #epoch: number; - #watchEpoch?: number; - + #watchEpoch?: number; + #clientSideCache?: ClientSideCacheProvider; #credentialsSubscription: Disposable | null = null; + get clientSideCache() { + return this._self.#clientSideCache; + } + get options(): RedisClientOptions | undefined { return this._self.#options; @@ -334,6 +391,10 @@ export default class RedisClient< return this._self.#queue.isPubSubActive; } + get socketEpoch() { + return this._self.#socket.socketEpoch; + } + get isWatching() { return this._self.#watchEpoch !== undefined; } @@ -358,12 +419,28 @@ export default class RedisClient< constructor(options?: RedisClientOptions) { super(); + this.#validateOptions(options) this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); this.#socket = this.#initiateSocket(); - this.#epoch = 0; + + if (options?.clientSideCache) { + if (options.clientSideCache instanceof ClientSideCacheProvider) { + this.#clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = new BasicClientSideCache(cscConfig); + } + this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache)); + } } + #validateOptions(options?: RedisClientOptions) { + if (options?.clientSideCache && options?.RESP !== 3) { + throw new Error('Client Side Caching is only supported with RESP3'); + } + + } #initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { // Convert username/password to credentialsProvider if no credentialsProvider is already in place @@ -421,7 +498,7 @@ export default class RedisClient< } } - #subscribeForStreamingCredentials(cp: StreamingCredentialsProvider): Promise<[BasicAuth, Disposable]> { + #subscribeForStreamingCredentials(cp: StreamingCredentialsProvider): Promise<[BasicAuth, Disposable]> { return cp.subscribe({ onNext: credentials => { this.reAuthenticate(credentials).catch(error => { @@ -456,7 +533,7 @@ export default class RedisClient< if (cp && cp.type === 'streaming-credentials-provider') { - const [credentials, disposable] = await this.#subscribeForStreamingCredentials(cp) + const [credentials, disposable] = await this.#subscribeForStreamingCredentials(cp) this.#credentialsSubscription = disposable; if (credentials.password) { @@ -492,7 +569,7 @@ export default class RedisClient< if (cp && cp.type === 'streaming-credentials-provider') { - const [credentials, disposable] = await this.#subscribeForStreamingCredentials(cp) + const [credentials, disposable] = await this.#subscribeForStreamingCredentials(cp) this.#credentialsSubscription = disposable; if (credentials.username || credentials.password) { @@ -522,6 +599,10 @@ export default class RedisClient< ); } + if (this.#clientSideCache) { + commands.push(this.#clientSideCache.trackingOn()); + } + return commands; } @@ -575,6 +656,7 @@ export default class RedisClient< }) .on('error', err => { this.emit('error', err); + this.#clientSideCache?.onError(); if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { this.#queue.flushWaitingForReply(err); } else { @@ -583,7 +665,6 @@ export default class RedisClient< }) .on('connect', () => this.emit('connect')) .on('ready', () => { - this.#epoch++; this.emit('ready'); this.#setPingTimer(); this.#maybeScheduleWrite(); @@ -711,14 +792,21 @@ export default class RedisClient< commandOptions: CommandOptions | undefined, transformReply: TransformReply | undefined, ) { - const reply = await this.sendCommand(parser.redisArgs, commandOptions); + const csc = this._self.#clientSideCache; + const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions; - if (transformReply) { - const res = transformReply(reply, parser.preserve, commandOptions?.typeMapping); - return res - } + const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) }; + + if (csc && command.CACHEABLE && defaultTypeMapping) { + return await csc.handleCache(this._self, parser as BasicCommandParser, fn, transformReply, commandOptions?.typeMapping); + } else { + const reply = await fn(); - return reply; + if (transformReply) { + return transformReply(reply, parser.preserve, commandOptions?.typeMapping); + } + return reply; + } } /** @@ -883,7 +971,7 @@ export default class RedisClient< const reply = await this._self.sendCommand( pushVariadicArguments(['WATCH'], key) ); - this._self.#watchEpoch ??= this._self.#epoch; + this._self.#watchEpoch ??= this._self.socketEpoch; return reply as unknown as ReplyWithTypeMapping, TYPE_MAPPING>; } @@ -942,7 +1030,7 @@ export default class RedisClient< * @internal */ async _executePipeline( - commands: Array, + commands: Array, selectedDB?: number ) { if (!this._self.#socket.isOpen) { @@ -986,15 +1074,15 @@ export default class RedisClient< throw new WatchError(dirtyWatch); } - if (watchEpoch && watchEpoch !== this._self.#epoch) { + if (watchEpoch && watchEpoch !== this._self.socketEpoch) { throw new WatchError('Client reconnected after WATCH'); } const typeMapping = this._commandOptions?.typeMapping; const chainId = Symbol('MULTI Chain'); const promises = [ - this._self.#queue.addCommand(['MULTI'], { chainId }), - ]; + this._self.#queue.addCommand(['MULTI'], { chainId }), + ]; for (const { args } of commands) { promises.push( @@ -1210,6 +1298,7 @@ export default class RedisClient< return new Promise(resolve => { clearTimeout(this._self.#pingTimer); this._self.#socket.close(); + this._self.#clientSideCache?.onClose(); if (this._self.#queue.isEmpty()) { this._self.#socket.destroySocket(); @@ -1236,6 +1325,7 @@ export default class RedisClient< clearTimeout(this._self.#pingTimer); this._self.#queue.flushAll(new DisconnectsClientError()); this._self.#socket.destroy(); + this._self.#clientSideCache?.onClose(); this._self.#credentialsSubscription?.dispose(); this._self.#credentialsSubscription = null; } diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index ac1d021be91..29678f027b5 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -114,6 +114,7 @@ export class DoublyLinkedList { export interface SinglyLinkedNode { value: T; next: SinglyLinkedNode | undefined; + removed: boolean; } export class SinglyLinkedList { @@ -140,7 +141,8 @@ export class SinglyLinkedList { const node = { value, - next: undefined + next: undefined, + removed: false }; if (this.#head === undefined) { @@ -151,6 +153,9 @@ export class SinglyLinkedList { } remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined) { + if (node.removed) { + throw new Error("node already removed"); + } --this.#length; if (this.#head === node) { @@ -165,6 +170,8 @@ export class SinglyLinkedList { } else { parent!.next = node.next; } + + node.removed = true; } shift() { @@ -177,6 +184,7 @@ export class SinglyLinkedList { this.#head = node.next; } + node.removed = true; return node.value; } diff --git a/packages/client/lib/client/parser.ts b/packages/client/lib/client/parser.ts index 12eec457739..3e820230429 100644 --- a/packages/client/lib/client/parser.ts +++ b/packages/client/lib/client/parser.ts @@ -33,6 +33,17 @@ export class BasicCommandParser implements CommandParser { return this.#keys[0]; } + get cacheKey() { + const tmp = new Array(this.#redisArgs.length*2); + + for (let i = 0; i < this.#redisArgs.length; i++) { + tmp[i] = this.#redisArgs[i].length; + tmp[i+this.#redisArgs.length] = this.#redisArgs[i]; + } + + return tmp.join('_'); + } + push(...arg: Array) { this.#redisArgs.push(...arg); }; diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index ec89f0c39e3..6f633c9caa7 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -7,6 +7,7 @@ import { TimeoutError } from '../errors'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { CommandOptions } from './commands-queue'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; +import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider } from './cache'; import { BasicCommandParser } from './parser'; import SingleEntryCache from '../single-entry-cache'; @@ -24,11 +25,55 @@ export interface RedisPoolOptions { */ acquireTimeout: number; /** - * TODO + * The delay in milliseconds before a cleanup operation is performed on idle clients. + * + * After this delay, the pool will check if there are too many idle clients and destroy + * excess ones to maintain optimal pool size. */ cleanupDelay: number; /** - * TODO + * Client Side Caching configuration for the pool. + * + * Enables Redis Servers and Clients to work together to cache results from commands + * sent to a server. The server will notify the client when cached results are no longer valid. + * In pooled mode, the cache is shared across all clients in the pool. + * + * Note: Client Side Caching is only supported with RESP3. + * + * @example Anonymous cache configuration + * ``` + * const client = createClientPool({RESP: 3}, { + * clientSideCache: { + * ttl: 0, + * maxEntries: 0, + * evictPolicy: "LRU" + * }, + * minimum: 5 + * }); + * ``` + * + * @example Using a controllable cache + * ``` + * const cache = new BasicPooledClientSideCache({ + * ttl: 0, + * maxEntries: 0, + * evictPolicy: "LRU" + * }); + * const client = createClientPool({RESP: 3}, { + * clientSideCache: cache, + * minimum: 5 + * }); + * ``` + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; + /** + * Enable experimental support for RESP3 module commands. + * + * When enabled, allows the use of module commands that have been adapted + * for the RESP3 protocol. This is an unstable feature and may change in + * future versions. + * + * @default false */ unstableResp3Modules?: boolean; } @@ -120,7 +165,7 @@ export class RedisClientPool< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping = {} >( - clientOptions?: RedisClientOptions, + clientOptions?: Omit, "clientSideCache">, options?: Partial ) { @@ -142,7 +187,7 @@ export class RedisClientPool< // returning a "proxy" to prevent the namespaces._self to leak between "proxies" return Object.create( new Pool( - RedisClient.factory(clientOptions).bind(undefined, clientOptions), + clientOptions, options ) ) as RedisClientPoolType; @@ -216,22 +261,41 @@ export class RedisClientPool< return this._self.#isClosing; } + #clientSideCache?: PooledClientSideCacheProvider; + get clientSideCache() { + return this._self.#clientSideCache; + } + /** * You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`}, * {@link RedisClientPool.fromClient `RedisClientPool.fromClient`}, * or {@link RedisClientPool.fromOptions `RedisClientPool.fromOptions`}... */ constructor( - clientFactory: () => RedisClientType, + clientOptions?: RedisClientOptions, options?: Partial ) { super(); - this.#clientFactory = clientFactory; this.#options = { ...RedisClientPool.#DEFAULTS, ...options }; + if (options?.clientSideCache) { + if (clientOptions === undefined) { + clientOptions = {}; + } + + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.#clientSideCache = clientOptions.clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); +// this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); + } + } + + this.#clientFactory = RedisClient.factory(clientOptions).bind(undefined, clientOptions) as () => RedisClientType; } private _self = this; @@ -295,7 +359,6 @@ export class RedisClientPool< async connect() { if (this._self.#isOpen) return; // TODO: throw error? - this._self.#isOpen = true; const promises = []; @@ -305,11 +368,12 @@ export class RedisClientPool< try { await Promise.all(promises); - return this as unknown as RedisClientPoolType; } catch (err) { this.destroy(); throw err; } + + return this as unknown as RedisClientPoolType; } async #create() { @@ -319,7 +383,8 @@ export class RedisClientPool< ); try { - await node.value.connect(); + const client = node.value; + await client.connect(); } catch (err) { this._self.#clientsInUse.remove(node); throw err; @@ -408,7 +473,8 @@ export class RedisClientPool< const toDestroy = Math.min(this.#idleClients.length, this.totalClients - this.#options.minimum); for (let i = 0; i < toDestroy; i++) { // TODO: shift vs pop - this.#idleClients.shift()!.destroy(); + const client = this.#idleClients.shift()! + client.destroy(); } } @@ -446,8 +512,10 @@ export class RedisClientPool< for (const client of this._self.#clientsInUse) { promises.push(client.close()); } - + await Promise.all(promises); + + this.#clientSideCache?.onPoolClose(); this._self.#idleClients.reset(); this._self.#clientsInUse.reset(); @@ -467,6 +535,9 @@ export class RedisClientPool< for (const client of this._self.#clientsInUse) { client.destroy(); } + + this._self.#clientSideCache?.onPoolClose(); + this._self.#clientsInUse.reset(); this._self.#isOpen = false; diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 36afa36c04a..603416cf9ed 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -72,6 +72,12 @@ export default class RedisSocket extends EventEmitter { #isSocketUnrefed = false; + #socketEpoch = 0; + + get socketEpoch() { + return this.#socketEpoch; + } + constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { super(); @@ -212,6 +218,7 @@ export default class RedisSocket extends EventEmitter { throw err; } this.#isReady = true; + this.#socketEpoch++; this.emit('ready'); } catch (err) { const retryIn = this.#shouldReconnect(retries++, err as Error); diff --git a/packages/client/lib/cluster/cluster-slots.spec.ts b/packages/client/lib/cluster/cluster-slots.spec.ts new file mode 100644 index 00000000000..bea1453037a --- /dev/null +++ b/packages/client/lib/cluster/cluster-slots.spec.ts @@ -0,0 +1,48 @@ +import { strict as assert } from 'node:assert'; +import { EventEmitter } from 'node:events'; +import { RedisClusterOptions, RedisClusterClientOptions } from './index'; +import RedisClusterSlots from './cluster-slots'; + +describe('RedisClusterSlots', () => { + describe('initialization', () => { + + describe('clientSideCache validation', () => { + const mockEmit = ((_event: string | symbol, ..._args: any[]): boolean => true) as EventEmitter['emit']; + const clientSideCacheConfig = { ttl: 0, maxEntries: 0 }; + const rootNodes: Array = [ + { socket: { host: 'localhost', port: 30001 } } + ]; + + it('should throw error when clientSideCache is enabled with RESP 2', () => { + assert.throws( + () => new RedisClusterSlots({ + rootNodes, + clientSideCache: clientSideCacheConfig, + RESP: 2 as const, + }, mockEmit), + new Error('Client Side Caching is only supported with RESP3') + ); + }); + + it('should throw error when clientSideCache is enabled with RESP undefined', () => { + assert.throws( + () => new RedisClusterSlots({ + rootNodes, + clientSideCache: clientSideCacheConfig, + }, mockEmit), + new Error('Client Side Caching is only supported with RESP3') + ); + }); + + it('should not throw when clientSideCache is enabled with RESP 3', () => { + assert.doesNotThrow(() => + new RedisClusterSlots({ + rootNodes, + clientSideCache: clientSideCacheConfig, + RESP: 3 as const, + }, mockEmit) + ); + }); + }); + }); +}); diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 0679b200349..84486112320 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -6,6 +6,7 @@ import { ChannelListeners, PUBSUB_TYPE, PubSubTypeListeners } from '../client/pu import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types'; import calculateSlot from 'cluster-key-slot'; import { RedisSocketOptions } from '../client/socket'; +import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; interface NodeAddress { host: string; @@ -111,6 +112,7 @@ export default class RedisClusterSlots< replicas = new Array>(); readonly nodeByAddress = new Map | ShardNode>(); pubSubNode?: PubSubNode; + clientSideCache?: PooledClientSideCacheProvider; #isOpen = false; @@ -118,12 +120,28 @@ export default class RedisClusterSlots< return this.#isOpen; } + #validateOptions(options?: RedisClusterOptions) { + if (options?.clientSideCache && options?.RESP !== 3) { + throw new Error('Client Side Caching is only supported with RESP3'); + } + } + constructor( options: RedisClusterOptions, emit: EventEmitter['emit'] ) { + this.#validateOptions(options); this.#options = options; - this.#clientFactory = RedisClient.factory(options); + + if (options?.clientSideCache) { + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.clientSideCache = options.clientSideCache; + } else { + this.clientSideCache = new BasicPooledClientSideCache(options.clientSideCache) + } + } + + this.#clientFactory = RedisClient.factory(this.#options); this.#emit = emit; } @@ -164,6 +182,8 @@ export default class RedisClusterSlots< } async #discover(rootNode: RedisClusterClientOptions) { + this.clientSideCache?.clear(); + this.clientSideCache?.disable(); try { const addressesInUse = new Set(), promises: Array> = [], @@ -219,6 +239,7 @@ export default class RedisClusterSlots< } await Promise.all(promises); + this.clientSideCache?.enable(); return true; } catch (err) { @@ -314,6 +335,8 @@ export default class RedisClusterSlots< #createClient(node: ShardNode, readonly = node.readonly) { return this.#clientFactory( this.#clientOptionsDefaults({ + clientSideCache: this.clientSideCache, + RESP: this.#options.RESP, socket: this.#getNodeAddress(node.address) ?? { host: node.host, port: node.port diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 8b37f9c1aa7..f7385629262 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -9,11 +9,10 @@ import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi- import { PubSubListener } from '../client/pub-sub'; import { ErrorReply } from '../errors'; import { RedisTcpSocketOptions } from '../client/socket'; -import ASKING from '../commands/ASKING'; +import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache'; import { BasicCommandParser } from '../client/parser'; -import { parseArgs } from '../commands/generic-transformers'; -import SingleEntryCache from '../single-entry-cache'; - +import { ASKING_CMD } from '../commands/ASKING'; +import SingleEntryCache from '../single-entry-cache' interface ClusterCommander< M extends RedisModules, F extends RedisFunctions, @@ -67,6 +66,41 @@ export interface RedisClusterOptions< * Useful when the cluster is running on another network */ nodeAddressMap?: NodeAddressMap; + /** + * Client Side Caching configuration for the pool. + * + * Enables Redis Servers and Clients to work together to cache results from commands + * sent to a server. The server will notify the client when cached results are no longer valid. + * In pooled mode, the cache is shared across all clients in the pool. + * + * Note: Client Side Caching is only supported with RESP3. + * + * @example Anonymous cache configuration + * ``` + * const client = createCluster({ + * clientSideCache: { + * ttl: 0, + * maxEntries: 0, + * evictPolicy: "LRU" + * }, + * minimum: 5 + * }); + * ``` + * + * @example Using a controllable cache + * ``` + * const cache = new BasicPooledClientSideCache({ + * ttl: 0, + * maxEntries: 0, + * evictPolicy: "LRU" + * }); + * const client = createCluster({ + * clientSideCache: cache, + * minimum: 5 + * }); + * ``` + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; } // remove once request & response policies are ready @@ -149,6 +183,7 @@ export default class RedisCluster< > extends EventEmitter { static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); + return async function (this: ProxyCluster, ...args: Array) { const parser = new BasicCommandParser(); command.parseCommand(parser, ...args); @@ -273,6 +308,10 @@ export default class RedisCluster< return this._self.#slots.slots; } + get clientSideCache() { + return this._self.#slots.clientSideCache; + } + /** * An array of the cluster masters. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node. @@ -390,6 +429,27 @@ export default class RedisCluster< // return this._commandOptionsProxy('policies', policies); // } + #handleAsk( + fn: (client: RedisClientType, opts?: ClusterCommandOptions) => Promise + ) { + return async (client: RedisClientType, options?: ClusterCommandOptions) => { + const chainId = Symbol("asking chain"); + const opts = options ? {...options} : {}; + opts.chainId = chainId; + + + + const ret = await Promise.all( + [ + client.sendCommand([ASKING_CMD], {chainId: chainId}), + fn(client, opts) + ] + ); + + return ret[1]; + }; + } + async #execute( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, @@ -399,14 +459,15 @@ export default class RedisCluster< const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; let client = await this.#slots.getClient(firstKey, isReadonly); let i = 0; - let myOpts = options; + + let myFn = fn; while (true) { try { - return await fn(client, myOpts); + return await myFn(client, options); } catch (err) { - // reset to passed in options, if changed by an ask request - myOpts = options; + myFn = fn; + // TODO: error class if (++i > maxCommandRedirections || !(err instanceof Error)) { throw err; @@ -425,13 +486,7 @@ export default class RedisCluster< } client = redirectTo; - - const chainId = Symbol('Asking Chain'); - myOpts = options ? {...options} : {}; - myOpts.chainId = chainId; - - client.sendCommand(parseArgs(ASKING), {chainId: chainId}).catch(err => { console.log(`Asking Failed: ${err}`) } ); - + myFn = this.#handleAsk(fn); continue; } @@ -582,10 +637,12 @@ export default class RedisCluster< } close() { + this._self.#slots.clientSideCache?.onPoolClose(); return this._self.#slots.close(); } destroy() { + this._self.#slots.clientSideCache?.onPoolClose(); return this._self.#slots.destroy(); } diff --git a/packages/client/lib/commands/GEOSEARCH.ts b/packages/client/lib/commands/GEOSEARCH.ts index 8c77fd89239..869dc60bec9 100644 --- a/packages/client/lib/commands/GEOSEARCH.ts +++ b/packages/client/lib/commands/GEOSEARCH.ts @@ -29,12 +29,7 @@ export function parseGeoSearchArguments( from: GeoSearchFrom, by: GeoSearchBy, options?: GeoSearchOptions, - store?: RedisArgument ) { - if (store !== undefined) { - parser.pushKey(store); - } - parser.pushKey(key); if (typeof from === 'string' || from instanceof Buffer) { diff --git a/packages/client/lib/commands/GEOSEARCHSTORE.ts b/packages/client/lib/commands/GEOSEARCHSTORE.ts index eb8e12abe6d..34c6e0988e2 100644 --- a/packages/client/lib/commands/GEOSEARCHSTORE.ts +++ b/packages/client/lib/commands/GEOSEARCHSTORE.ts @@ -17,7 +17,12 @@ export default { options?: GeoSearchStoreOptions ) { parser.push('GEOSEARCHSTORE'); - parseGeoSearchArguments(parser, source, from, by, options, destination); + + if (destination !== undefined) { + parser.pushKey(destination); + } + + parseGeoSearchArguments(parser, source, from, by, options); if (options?.STOREDIST) { parser.push('STOREDIST'); diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index cf9228c261f..035cf09020d 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -5,11 +5,59 @@ import { RESP_TYPES } from '../RESP/decoder'; import { WatchError } from "../errors"; import { RedisSentinelConfig, SentinelFramework } from "./test-util"; import { RedisSentinelEvent, RedisSentinelType, RedisSentinelClientType, RedisNode } from "./types"; +import RedisSentinel from "./index"; import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, NumberReply } from '../RESP/types'; import { promisify } from 'node:util'; import { exec } from 'node:child_process'; +import { BasicPooledClientSideCache } from '../client/cache' +import { once } from 'node:events' const execAsync = promisify(exec); +describe('RedisSentinel', () => { + describe('initialization', () => { + describe('clientSideCache validation', () => { + const clientSideCacheConfig = { ttl: 0, maxEntries: 0 }; + const options = { + name: 'mymaster', + sentinelRootNodes: [ + { host: 'localhost', port: 26379 } + ] + }; + + it('should throw error when clientSideCache is enabled with RESP 2', () => { + assert.throws( + () => RedisSentinel.create({ + ...options, + clientSideCache: clientSideCacheConfig, + RESP: 2 as const, + }), + new Error('Client Side Caching is only supported with RESP3') + ); + }); + + it('should throw error when clientSideCache is enabled with RESP undefined', () => { + assert.throws( + () => RedisSentinel.create({ + ...options, + clientSideCache: clientSideCacheConfig, + }), + new Error('Client Side Caching is only supported with RESP3') + ); + }); + + it('should not throw when clientSideCache is enabled with RESP 3', () => { + assert.doesNotThrow(() => + RedisSentinel.create({ + ...options, + clientSideCache: clientSideCacheConfig, + RESP: 3 as const, + }) + ); + }); + }); + }); +}); + [GLOBAL.SENTINEL.OPEN, GLOBAL.SENTINEL.PASSWORD].forEach(testOptions => { const passIndex = testOptions.serverArguments.indexOf('--requirepass')+1; let password: string | undefined = undefined; @@ -967,6 +1015,34 @@ describe.skip('legacy tests', () => { tracer.push("added node and waiting on added promise"); await nodeAddedPromise; }) + + it('with client side caching', async function() { + this.timeout(30000); + const csc = new BasicPooledClientSideCache(); + + sentinel = frame.getSentinelClient({nodeClientOptions: {RESP: 3}, clientSideCache: csc, masterPoolSize: 5}); + await sentinel.connect(); + + await sentinel.set('x', 1); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + + assert.equal(1, csc.cacheMisses()); + assert.equal(3, csc.cacheHits()); + + const invalidatePromise = once(csc, 'invalidate'); + await sentinel.set('x', 2); + await invalidatePromise; + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + + assert.equal(csc.cacheMisses(), 2); + assert.equal(csc.cacheHits(), 6); + }) }); }); diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index 3bf94abd819..ec570e64bf2 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -16,6 +16,7 @@ import { RedisVariadicArgument } from '../commands/generic-transformers'; import { WaitQueue } from './wait-queue'; import { TcpNetConnectOpts } from 'node:net'; import { RedisTcpSocketOptions } from '../client/socket'; +import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; interface ClientInfo { id: number; @@ -301,6 +302,10 @@ export default class RedisSentinel< #masterClientCount = 0; #masterClientInfo?: ClientInfo; + get clientSideCache() { + return this._self.#internal.clientSideCache; + } + constructor(options: RedisSentinelOptions) { super(); @@ -617,7 +622,7 @@ class RedisSentinelInternal< readonly #name: string; readonly #nodeClientOptions: RedisClientOptions; - readonly #sentinelClientOptions: RedisClientOptions; + readonly #sentinelClientOptions: RedisClientOptions; readonly #scanInterval: number; readonly #passthroughClientErrorEvents: boolean; @@ -650,9 +655,22 @@ class RedisSentinelInternal< #trace: (msg: string) => unknown = () => { }; + #clientSideCache?: PooledClientSideCacheProvider; + get clientSideCache() { + return this.#clientSideCache; + } + + #validateOptions(options?: RedisSentinelOptions) { + if (options?.clientSideCache && options?.RESP !== 3) { + throw new Error('Client Side Caching is only supported with RESP3'); + } + } + constructor(options: RedisSentinelOptions) { super(); + this.#validateOptions(options); + this.#name = options.name; this.#sentinelRootNodes = Array.from(options.sentinelRootNodes); @@ -662,11 +680,21 @@ class RedisSentinelInternal< this.#scanInterval = options.scanInterval ?? 0; this.#passthroughClientErrorEvents = options.passthroughClientErrorEvents ?? false; - this.#nodeClientOptions = options.nodeClientOptions ? Object.assign({} as RedisClientOptions, options.nodeClientOptions) : {}; + this.#nodeClientOptions = (options.nodeClientOptions ? {...options.nodeClientOptions} : {}) as RedisClientOptions; if (this.#nodeClientOptions.url !== undefined) { throw new Error("invalid nodeClientOptions for Sentinel"); } + if (options.clientSideCache) { + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); +// this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); + } + } + this.#sentinelClientOptions = options.sentinelClientOptions ? Object.assign({} as RedisClientOptions, options.sentinelClientOptions) : {}; this.#sentinelClientOptions.modules = RedisSentinelModule; @@ -904,6 +932,8 @@ class RedisSentinelInternal< this.#isReady = false; + this.#clientSideCache?.onPoolClose(); + if (this.#scanTimer) { clearInterval(this.#scanTimer); this.#scanTimer = undefined; @@ -952,6 +982,8 @@ class RedisSentinelInternal< this.#isReady = false; + this.#clientSideCache?.onPoolClose(); + if (this.#scanTimer) { clearInterval(this.#scanTimer); this.#scanTimer = undefined; diff --git a/packages/client/lib/sentinel/test-util.ts b/packages/client/lib/sentinel/test-util.ts index 60696bc3437..86bc5b31786 100644 --- a/packages/client/lib/sentinel/test-util.ts +++ b/packages/client/lib/sentinel/test-util.ts @@ -188,18 +188,22 @@ export class SentinelFramework extends DockerBase { } const options: RedisSentinelOptions = { + ...opts, name: this.config.sentinelName, sentinelRootNodes: this.#sentinelList.map((sentinel) => { return { host: '127.0.0.1', port: sentinel.docker.port } }), passthroughClientErrorEvents: errors } if (this.config.password !== undefined) { - options.nodeClientOptions = {password: this.config.password}; - options.sentinelClientOptions = {password: this.config.password}; - } + if (!options.nodeClientOptions) { + options.nodeClientOptions = {}; + } + options.nodeClientOptions.password = this.config.password; - if (opts) { - Object.assign(options, opts); + if (!options.sentinelClientOptions) { + options.sentinelClientOptions = {}; + } + options.sentinelClientOptions = {password: this.config.password}; } return RedisSentinel.create(options); diff --git a/packages/client/lib/sentinel/types.ts b/packages/client/lib/sentinel/types.ts index 28a5a91ddd3..e72f2eec2a0 100644 --- a/packages/client/lib/sentinel/types.ts +++ b/packages/client/lib/sentinel/types.ts @@ -4,6 +4,7 @@ import { CommandSignature, CommanderConfig, RedisFunctions, RedisModules, RedisS import COMMANDS from '../commands'; import RedisSentinel, { RedisSentinelClient } from '.'; import { RedisTcpSocketOptions } from '../client/socket'; +import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache'; export interface RedisNode { host: string; @@ -67,6 +68,41 @@ export interface RedisSentinelOptions< * When `false`, the sentinel object will wait for the first available client from the pool. */ reserveClient?: boolean; + /** + * Client Side Caching configuration for the pool. + * + * Enables Redis Servers and Clients to work together to cache results from commands + * sent to a server. The server will notify the client when cached results are no longer valid. + * In pooled mode, the cache is shared across all clients in the pool. + * + * Note: Client Side Caching is only supported with RESP3. + * + * @example Anonymous cache configuration + * ``` + * const client = createSentinel({ + * clientSideCache: { + * ttl: 0, + * maxEntries: 0, + * evictPolicy: "LRU" + * }, + * minimum: 5 + * }); + * ``` + * + * @example Using a controllable cache + * ``` + * const cache = new BasicPooledClientSideCache({ + * ttl: 0, + * maxEntries: 0, + * evictPolicy: "LRU" + * }); + * const client = createSentinel({ + * clientSideCache: cache, + * minimum: 5 + * }); + * ``` + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; } export interface SentinelCommander< diff --git a/packages/client/lib/sentinel/utils.ts b/packages/client/lib/sentinel/utils.ts index 90b789ddca9..7e2404c2f7a 100644 --- a/packages/client/lib/sentinel/utils.ts +++ b/packages/client/lib/sentinel/utils.ts @@ -1,5 +1,5 @@ -import { BasicCommandParser } from '../client/parser'; import { ArrayReply, Command, RedisFunction, RedisScript, RespVersions, UnwrapReply } from '../RESP/types'; +import { BasicCommandParser } from '../client/parser'; import { RedisSocketOptions, RedisTcpSocketOptions } from '../client/socket'; import { functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { NamespaceProxySentinel, NamespaceProxySentinelClient, ProxySentinel, ProxySentinelClient, RedisNode } from './types'; diff --git a/packages/redis/README.md b/packages/redis/README.md index f0b2a34905d..d04a19b0d71 100644 --- a/packages/redis/README.md +++ b/packages/redis/README.md @@ -234,6 +234,23 @@ of sending a `QUIT` command to the server, the client can simply close the netwo ```typescript client.destroy(); ``` +### Client Side Caching + +Node Redis v5 adds support for [Client Side Caching](https://redis.io/docs/manual/client-side-caching/), which enables clients to cache query results locally. The Redis server will notify the client when cached results are no longer valid. + +```typescript +// Enable client side caching with RESP3 +const client = createClient({ + RESP: 3, + clientSideCache: { + ttl: 0, // Time-to-live (0 = no expiration) + maxEntries: 0, // Maximum entries (0 = unlimited) + evictPolicy: "LRU" // Eviction policy: "LRU" or "FIFO" + } +}); +``` + +See the [V5 documentation](../../docs/v5.md#client-side-caching) for more details and advanced usage. ### Auto-Pipelining diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 8ed85bf6e3e..d92c5c9e3d8 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -450,7 +450,7 @@ export default class TestUtils { await fn(pool); } finally { await pool.flushAll(); - pool.destroy(); + pool.close(); } }); }