From c9dc7e8eba71b46b207d043c8adab476535aab31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?O=C4=9Fuzhan=20Olguncu?= <21091016+ogzhanolguncu@users.noreply.github.com> Date: Fri, 10 May 2024 16:43:28 +0300 Subject: [PATCH] DX 593 - Auto executed pipeline (#1039) * feat: add auto executed pipelines * test: improve test times by shrinking test subjects * feat: add proxy over autopipeline function * add enableAutoPipelining parameter to redis * initalize auto pipeline with static method * add docstrings for autoPipeline methods * add pipelineCounter field to autoPipeline proxy * add test for consecutive awaits with auto pipeline * simplfy auto pipeline tests * fix test descriptions * rm pipelineCounter field from auto pipeline proxy --------- Co-authored-by: CahidArda --- pkg/auto-pipeline.test.ts | 228 +++++++++++++++++++++++++++++++++++++ pkg/auto-pipeline.ts | 83 ++++++++++++++ pkg/commands/xtrim.test.ts | 20 ++-- pkg/pipeline.test.ts | 3 +- pkg/pipeline.ts | 21 ++++ pkg/redis.ts | 5 + platforms/cloudflare.ts | 21 ++++ platforms/fastly.ts | 14 +++ platforms/nodejs.ts | 21 ++++ 9 files changed, 404 insertions(+), 12 deletions(-) create mode 100644 pkg/auto-pipeline.test.ts create mode 100644 pkg/auto-pipeline.ts diff --git a/pkg/auto-pipeline.test.ts b/pkg/auto-pipeline.test.ts new file mode 100644 index 00000000..1b51cd3d --- /dev/null +++ b/pkg/auto-pipeline.test.ts @@ -0,0 +1,228 @@ +import { Redis } from "../platforms/nodejs" +import { keygen, newHttpClient } from "./test-utils"; + +import { afterEach, describe, expect, test } from "bun:test"; +import { ScriptLoadCommand } from "./commands/script_load"; + + +const client = newHttpClient(); + +const { newKey, cleanup } = keygen(); +afterEach(cleanup); + +describe("Auto pipeline", () => { + test("should execute all commands inside a Promise.all in a single pipeline", async () => { + const persistentKey = newKey(); + const persistentKey2 = newKey(); + const scriptHash = await new ScriptLoadCommand(["return 1"]).exec(client); + + const redis = Redis.autoPipeline({ + latencyLogging: false + }) + // @ts-expect-error pipelineCounter is not in type but accessible + expect(redis.pipelineCounter).toBe(0) + + // all the following commands are in a single pipeline call + const result = await Promise.all([ + redis.append(newKey(), "hello"), + redis.bitcount(newKey(), 0, 1), + redis.bitop("and", newKey(), newKey()), + redis.bitpos(newKey(), 1, 0), + redis.dbsize(), + redis.decr(newKey()), + redis.decrby(newKey(), 1), + redis.del(newKey()), + redis.echo("hello"), + redis.eval("return ARGV[1]", [], ["Hello"]), + redis.evalsha(scriptHash, [], ["Hello"]), + redis.exists(newKey()), + redis.expire(newKey(), 5), + redis.expireat(newKey(), Math.floor(new Date().getTime() / 1000) + 60), + redis.flushall(), + redis.flushdb(), + redis.get(newKey()), + redis.getbit(newKey(), 0), + redis.getdel(newKey()), + redis.getset(newKey(), "hello"), + redis.hdel(newKey(), "field"), + redis.hexists(newKey(), "field"), + redis.hget(newKey(), "field"), + redis.hgetall(newKey()), + redis.hincrby(newKey(), "field", 1), + redis.hincrbyfloat(newKey(), "field", 1.5), + redis.hkeys(newKey()), + redis.hlen(newKey()), + redis.hmget(newKey(), newKey()), + redis.hmset(newKey(), { field: "field", value: "value" }), + redis.hscan(newKey(), 0), + redis.hset(newKey(), { field: "value" }), + redis.hsetnx(newKey(), "field", "value"), + redis.hstrlen(newKey(), "field"), + redis.hvals(newKey()), + redis.incr(newKey()), + redis.incrby(newKey(), 1), + redis.incrbyfloat(newKey(), 1.5), + redis.keys("*"), + redis.lindex(newKey(), 0), + redis.linsert(newKey(), "before", "pivot", "value"), + redis.llen(newKey()), + redis.lmove(newKey(), newKey(), "left", "right"), + redis.lpop(newKey()), + redis.lpos(newKey(), "value"), + redis.lpush(persistentKey, "element"), + redis.lpushx(newKey(), "element1", "element2"), + redis.lrange(newKey(), 0, 1), + redis.lrem(newKey(), 1, "value"), + redis.lset(persistentKey, 0, "value"), + redis.ltrim(newKey(), 0, 1), + redis.hrandfield(newKey()), + redis.hrandfield(newKey(), 2), + redis.hrandfield(newKey(), 3, true), + redis.mget<[string, string]>(newKey(), newKey()), + redis.mset({ key1: "value", key2: "value" }), + redis.msetnx({ key3: "value", key4: "value" }), + redis.persist(newKey()), + redis.pexpire(newKey(), 1000), + redis.pexpireat(newKey(), new Date().getTime() + 1000), + redis.ping(), + redis.psetex(newKey(), 1, "value"), + redis.pttl(newKey()), + redis.publish("test", "hello"), + redis.randomkey(), + redis.rename(persistentKey, persistentKey2), + redis.renamenx(persistentKey2, newKey()), + redis.rpop(newKey()), + redis.rpush(newKey(), "element1", "element2"), + redis.rpushx(newKey(), "element1", "element2"), + redis.sadd(newKey(), "memeber1", "member2"), + redis.scan(0), + redis.scard(newKey()), + redis.sdiff(newKey()), + redis.sdiffstore(newKey(), newKey()), + redis.set(newKey(), "value"), + redis.setbit(newKey(), 1, 1), + redis.setex(newKey(), 1, "value"), + redis.setnx(newKey(), "value"), + redis.setrange(newKey(), 1, "value"), + redis.sinter(newKey(), newKey()), + redis.sinterstore(newKey(), newKey()), + redis.sismember(newKey(), "member"), + redis.smembers(newKey()), + redis.smove(newKey(), newKey(), "member"), + redis.spop(newKey()), + redis.srandmember(newKey()), + redis.srem(newKey(), "member"), + redis.sscan(newKey(), 0), + redis.strlen(newKey()), + redis.sunion(newKey()), + redis.sunionstore(newKey(), newKey()), + redis.time(), + redis.touch(newKey()), + redis.ttl(newKey()), + redis.type(newKey()), + redis.unlink(newKey()), + redis.zadd(newKey(), { score: 0, member: "member" }), + redis.zcard(newKey()), + redis.scriptExists(scriptHash), + redis.scriptFlush({ async: true }), + redis.scriptLoad("return 1"), + redis.zcount(newKey(), 0, 1), + redis.zincrby(newKey(), 1, "member"), + redis.zinterstore(newKey(), 1, [newKey()]), + redis.zlexcount(newKey(), "-", "+"), + redis.zpopmax(newKey()), + redis.zpopmin(newKey()), + redis.zrange(newKey(), 0, 1), + redis.zrank(newKey(), "member"), + redis.zrem(newKey(), "member"), + redis.zremrangebylex(newKey(), "-", "+"), + redis.zremrangebyrank(newKey(), 0, 1), + redis.zremrangebyscore(newKey(), 0, 1), + redis.zrevrank(newKey(), "member"), + redis.zscan(newKey(), 0), + redis.zscore(newKey(), "member"), + redis.zunionstore(newKey(), 1, [newKey()]), + redis.zunion(1, [newKey()]), + redis.json.set(newKey(), "$", { hello: "world" }) + ]) + expect(result).toBeTruthy(); + expect(result.length).toBe(120); // returns + // @ts-expect-error pipelineCounter is not in type but accessible120 results + expect(redis.pipelineCounter).toBe(1); + }); + + test("should group async requests with sync requests", async () => { + + const redis = Redis.autoPipeline({ + latencyLogging: false + }) + // @ts-expect-error pipelineCounter is not in type but accessible + expect(redis.pipelineCounter).toBe(0); + + // following five commands are added to the pipeline + redis.flushdb(); + redis.incr("baz"); + redis.incr("baz"); + redis.set("foo", "bar"); + redis.incr("baz"); + + // two get calls are added to the pipeline and pipeline + // is executed since we called await + const [fooValue, bazValue] = await Promise.all([ + redis.get("foo"), + redis.get("baz") + ]); + + expect(fooValue).toBe("bar"); + expect(bazValue).toBe(3); + // @ts-expect-error pipelineCounter is not in type but accessible + expect(redis.pipelineCounter).toBe(1); + }) + + test("should execute a pipeline for each consecutive awaited command", async () => { + + const redis = Redis.autoPipeline({ + latencyLogging: false + }); + // @ts-expect-error pipelineCounter is not in type but accessible + expect(redis.pipelineCounter).toBe(0); + + redis.flushdb(); + + const res1 = await redis.incr("baz"); + // @ts-expect-error pipelineCounter is not in type but accessible + expect(redis.pipelineCounter).toBe(1); + + const res2 = await redis.incr("baz"); + // @ts-expect-error pipelineCounter is not in type but accessible + expect(redis.pipelineCounter).toBe(2); + + const res3 = await redis.set("foo", "bar"); + // @ts-expect-error pipelineCounter is not in type but accessible + expect(redis.pipelineCounter).toBe(3); + + expect([res1, res2, res3]).toEqual([1, 2, "OK"]); + + }); + + test("should execute a single pipeline for several commands inside Promise.all", async () => { + + const redis = Redis.autoPipeline({ + latencyLogging: false + }); + // @ts-expect-error pipelineCounter is not in type but accessible + expect(redis.pipelineCounter).toBe(0); + + const resArray = await Promise.all([ + redis.flushdb(), + redis.incr("baz"), + redis.incr("baz"), + redis.set("foo", "bar"), + redis.get("foo") + ]); + // @ts-expect-error pipelineCounter is not in type but accessible + expect(redis.pipelineCounter).toBe(1); + expect(resArray).toEqual(["OK", 1, 2, "OK", "bar"]); + + }) +}); diff --git a/pkg/auto-pipeline.ts b/pkg/auto-pipeline.ts new file mode 100644 index 00000000..3cfdfea6 --- /dev/null +++ b/pkg/auto-pipeline.ts @@ -0,0 +1,83 @@ +import { Command } from "./commands/command"; +import { CommandArgs } from "./types"; +import { Pipeline } from "./pipeline"; +import { Redis } from "./redis"; + +// will omit redis only commands since we call Pipeline in the background in auto pipeline +type redisOnly = Exclude + +export function createAutoPipelineProxy(_redis: Redis) { + + const redis = _redis as Redis & { + autoPipelineExecutor: AutoPipelineExecutor; + } + + if (!redis.autoPipelineExecutor) { + redis.autoPipelineExecutor = new AutoPipelineExecutor(redis); + } + + return new Proxy(redis, { + get: (target, prop: "pipelineCounter" | keyof Pipeline ) => { + + // return pipelineCounter of autoPipelineExecutor + if (prop == "pipelineCounter") { + return target.autoPipelineExecutor.pipelineCounter; + } + + // If the method is a function on the pipeline, wrap it with the executor logic + if (typeof target.autoPipelineExecutor.pipeline[prop] === "function") { + return (...args: CommandArgs) => { + return target.autoPipelineExecutor.withAutoPipeline((pipeline) => { + (pipeline[prop] as Function)(...args); + }); + }; + } + return target.autoPipelineExecutor.pipeline[prop]; + }, + }) as Omit; +} + +export class AutoPipelineExecutor { + private pipelinePromises = new WeakMap>>(); + private activePipeline: Pipeline | null = null; + private indexInCurrentPipeline = 0; + private redis: Redis; + pipeline: Pipeline; // only to make sure that proxy can work + pipelineCounter: number = 0; // to keep track of how many times a pipeline was executed + + constructor(redis: Redis) { + this.redis = redis; + this.pipeline = redis.pipeline(); + } + + async withAutoPipeline(executeWithPipeline: (pipeline: Pipeline) => unknown): Promise { + const pipeline = this.activePipeline || this.redis.pipeline(); + + if (!this.activePipeline) { + this.activePipeline = pipeline; + this.indexInCurrentPipeline = 0; + } + + const index = this.indexInCurrentPipeline++; + executeWithPipeline(pipeline); + + const pipelineDone = this.deferExecution().then(() => { + if (!this.pipelinePromises.has(pipeline)) { + const pipelinePromise = pipeline.exec(); + this.pipelineCounter += 1; + + this.pipelinePromises.set(pipeline, pipelinePromise); + this.activePipeline = null; + } + return this.pipelinePromises.get(pipeline)!; + }); + + const results = await pipelineDone; + return results[index] as T; + } + + private async deferExecution() { + await Promise.resolve(); + return await Promise.resolve(); + } +} diff --git a/pkg/commands/xtrim.test.ts b/pkg/commands/xtrim.test.ts index 0bbfb7a4..22fbd5ea 100644 --- a/pkg/commands/xtrim.test.ts +++ b/pkg/commands/xtrim.test.ts @@ -12,20 +12,20 @@ afterAll(cleanup); describe("XLEN", () => { test( - "should approximately trim stream to 300 items", + "should approximately trim stream to 30 items", async () => { const key = newKey(); const promises = []; - for (let i = 1; i <= 10000; i++) { + for (let i = 1; i <= 1000; i++) { promises.push(new XAddCommand([key, "*", { [randomID()]: randomID() }]).exec(client)); } await Promise.all(promises); - await new XTrimCommand([key, { strategy: "MAXLEN", threshold: 300, exactness: "~" }]).exec( + await new XTrimCommand([key, { strategy: "MAXLEN", threshold: 30, exactness: "~" }]).exec( client, ); const len = await new XLenCommand([key]).exec(client); - expect(len).toBeGreaterThanOrEqual(290); - expect(len).toBeLessThanOrEqual(310); + expect(len).toBeGreaterThanOrEqual(29); + expect(len).toBeLessThanOrEqual(31); }, { timeout: 1000 * 60 }, ); @@ -45,20 +45,20 @@ describe("XLEN", () => { }); test( - "should trim with MINID and a limit and only remove 10 items that satisfies MINID", + "should trim with MINID and a limit and only remove 2 items that satisfies MINID", async () => { const key = newKey(); const baseTimestamp = Date.now(); - for (let i = 0; i < 100; i++) { + for (let i = 0; i < 20; i++) { const id = `${baseTimestamp}-${i}`; await new XAddCommand([key, id, { data: `value${i}` }]).exec(client); } - const midRangeId = `${baseTimestamp}-50`; - await new XTrimCommand([key, { strategy: "MINID", threshold: midRangeId, limit: 10 }]).exec( + const midRangeId = `${baseTimestamp}-10`; + await new XTrimCommand([key, { strategy: "MINID", threshold: midRangeId, limit: 2 }]).exec( client, ); const len = await new XLenCommand([key]).exec(client); - expect(len).toBeLessThanOrEqual(100); + expect(len).toBeLessThanOrEqual(20); }, { timeout: 20000 }, ); diff --git a/pkg/pipeline.test.ts b/pkg/pipeline.test.ts index a8661f3e..2c571a7c 100644 --- a/pkg/pipeline.test.ts +++ b/pkg/pipeline.test.ts @@ -140,7 +140,6 @@ describe("use all the things", () => { .get(newKey()) .getbit(newKey(), 0) .getdel(newKey()) - .getrange(newKey(), 0, 1) .getset(newKey(), "hello") .hdel(newKey(), "field") .hexists(newKey(), "field") @@ -244,6 +243,6 @@ describe("use all the things", () => { .json.set(newKey(), "$", { hello: "world" }); const res = await p.exec(); - expect(res.length).toEqual(121); + expect(res.length).toEqual(120); }); }); diff --git a/pkg/pipeline.ts b/pkg/pipeline.ts index 847e94d0..ae5d198b 100644 --- a/pkg/pipeline.ts +++ b/pkg/pipeline.ts @@ -223,6 +223,7 @@ export class Pipeline[] = []> { private commands: TCommands; private commandOptions?: CommandOptions; private multiExec: boolean; + constructor(opts: { client: Requester; commandOptions?: CommandOptions; @@ -233,6 +234,26 @@ export class Pipeline[] = []> { this.commands = [] as unknown as TCommands; // the TCommands generic in the class definition is only used for carrying through chained command types and should never be explicitly set when instantiating the class this.commandOptions = opts.commandOptions; this.multiExec = opts.multiExec ?? false; + + if (this.commandOptions?.latencyLogging) { + const originalExec = this.exec.bind(this); + this.exec = async < + TCommandResults extends unknown[] = [] extends TCommands + ? unknown[] + : InferResponseData, + >(): Promise => { + const start = performance.now(); + const result = await originalExec(); + const end = performance.now(); + const loggerResult = (end - start).toFixed(2); + console.log( + `Latency for \x1b[38;2;19;185;39m${ + this.multiExec ? ["MULTI-EXEC"] : ["PIPELINE"].toString().toUpperCase() + }\x1b[0m: \x1b[38;2;0;255;255m${loggerResult} ms\x1b[0m`, + ); + return result as TCommandResults; + }; + } } /** diff --git a/pkg/redis.ts b/pkg/redis.ts index 72badb10..251a9c3a 100644 --- a/pkg/redis.ts +++ b/pkg/redis.ts @@ -175,6 +175,7 @@ import { Requester, UpstashRequest, UpstashResponse } from "./http"; import { Pipeline } from "./pipeline"; import { Script } from "./script"; import type { CommandArgs, RedisOptions, Telemetry } from "./types"; +import { AutoPipelineExecutor, createAutoPipelineProxy } from "../pkg/auto-pipeline" // See https://github.com/upstash/upstash-redis/issues/342 // why we need this export @@ -378,6 +379,10 @@ export class Redis { multiExec: false, }); + autoPipeline = () => { + return createAutoPipelineProxy(this) + }; + /** * Create a new transaction to allow executing multiple steps atomically. * diff --git a/platforms/cloudflare.ts b/platforms/cloudflare.ts index c71e1a6e..d1ddd25c 100644 --- a/platforms/cloudflare.ts +++ b/platforms/cloudflare.ts @@ -111,4 +111,25 @@ export class Redis extends core.Redis { } return new Redis({ ...opts, url, token }, env); } + + /** + * Create a Redis client utilizing auto pipeline. + * + * This means that the client will try to pipeline multiple calls + * into a single request to reduce latency and the number of requests + */ + static autoPipeline(config: Partial, env?: Env) { + let redis: Redis; + if (config.url && config.token) { + // casting below since only url and token are the non-optional fields of RedisConfigNodejs + redis = new Redis(config as RedisConfigCloudflare); + } + + // try to initialise Redis from env + // @ts-ignore env variable may not have Upstash env variables but this will be checked in runtime + redis = Redis.fromEnv(env, config); + + // return autoPipeline + return redis.autoPipeline() + } } diff --git a/platforms/fastly.ts b/platforms/fastly.ts index b68722c0..9f91cefe 100644 --- a/platforms/fastly.ts +++ b/platforms/fastly.ts @@ -68,4 +68,18 @@ export class Redis extends core.Redis { platform: "fastly", }); } + + /** + * Create a Redis client utilizing auto pipeline. + * + * This means that the client will try to pipeline multiple calls + * into a single request to reduce latency and the number of requests + */ + + static autoPipeline(config: RedisConfigFastly) { + const redis = new Redis(config); + + // return autoPipeline + return redis.autoPipeline() + } } diff --git a/platforms/nodejs.ts b/platforms/nodejs.ts index 4defb7c0..d39f54f2 100644 --- a/platforms/nodejs.ts +++ b/platforms/nodejs.ts @@ -166,4 +166,25 @@ export class Redis extends core.Redis { } return new Redis({ ...config, url, token }); } + + + /** + * Create a Redis client utilizing auto pipeline. + * + * This means that the client will try to pipeline multiple calls + * into a single request to reduce latency and the number of requests + */ + static autoPipeline(configOrRequester: Partial) { + let redis: Redis; + if (configOrRequester.url && configOrRequester.token) { + // casting below since only url and token are the non-optional fields of RedisConfigNodejs + redis = new Redis(configOrRequester as RedisConfigNodejs); + } + + // try to initialise Redis from env + redis = Redis.fromEnv(configOrRequester); + + // return autoPipeline + return redis.autoPipeline() + } }