From b85322be2a21b9e800409005b2937d4f7c646738 Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 12:04:43 +0000 Subject: [PATCH 01/17] feat: use the same transaction id for matching roles and context --- src/index.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/index.ts b/src/index.ts index 6767fd9..fb21eca 100644 --- a/src/index.ts +++ b/src/index.ts @@ -76,6 +76,7 @@ export type CustomAbilities< export type GetContextFn = () => { role: string; + transactionId?: string; context?: { [key in ContextKeys]: string | number | string[]; }; @@ -250,6 +251,16 @@ export const createClient = ( // See https://github.com/prisma/prisma/issues/18276 const queryResults = await prisma.$transaction( async (tx) => { + if (ctx.transactionId) { + // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it + (tx as any)[Symbol.for("prisma.client.transaction.id")] = + ctx.transactionId; + } else { + const txId = hashWithPrefix("yates_tx_", JSON.stringify(ctx)); + // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it + (tx as any)[Symbol.for("prisma.client.transaction.id")] = + txId; + } // Switch to the user role, We can't use a prepared statement here, due to limitations in PG not allowing prepared statements to be used in SET ROLE await tx.$queryRawUnsafe(`SET ROLE ${pgRole}`); // Now set all the context variables using `set_config` so that they can be used in RLS From 4036bbff63de19c296878ba6287c4feff10d297d Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 12:22:12 +0000 Subject: [PATCH 02/17] f --- src/index.ts | 61 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/src/index.ts b/src/index.ts index fb21eca..0db16f8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -188,6 +188,52 @@ export const createClient = ( ) => { // Set default options const { txMaxWait = 30000, txTimeout = 30000 } = options; + + // biome-ignore lint/suspicious/noExplicitAny: TODO fix this + (prisma as any)._transactionWithCallback = async function ({ + callback, + options, + }: { + callback: (client: any) => Promise; + options?: any; + }) { + const headers = { traceparent: this._tracingHelper.getTraceParent() }; + + const optionsWithDefaults: any = { + maxWait: + options?.maxWait ?? this._engineConfig.transactionOptions.maxWait, + timeout: + options?.timeout ?? this._engineConfig.transactionOptions.timeout, + isolationLevel: + options?.isolationLevel ?? + this._engineConfig.transactionOptions.isolationLevel, + new_tx_id: options?.new_tx_id ?? undefined, + }; + const info = await this._engine.transaction( + "start", + headers, + optionsWithDefaults, + ); + + let result: unknown; + try { + // execute user logic with a proxied the client + const transaction = { kind: "itx", ...info } as const; + + result = await callback(this._createItxClient(transaction)); + + // it went well, then we commit the transaction + await this._engine.transaction("commit", headers, info); + } catch (e: any) { + // it went bad, then we rollback the transaction + await this._engine.transaction("rollback", headers, info).catch(() => {}); + + throw e; // silent rollback, throw original error + } + + return result; + }; + const client = prisma.$extends({ name: "Yates client", query: { @@ -245,22 +291,16 @@ export const createClient = ( } try { + const txId = + ctx.transactionId ?? + hashWithPrefix("yates_tx_", JSON.stringify(ctx)); // Because batch transactions inside a prisma client query extension can run out of order if used with async middleware, // we need to run the logic inside an interactive transaction, however this brings a different set of problems in that the // main query will no longer automatically run inside the transaction. We resolve this issue by manually executing the prisma request. // See https://github.com/prisma/prisma/issues/18276 + // @ts-ignore const queryResults = await prisma.$transaction( async (tx) => { - if (ctx.transactionId) { - // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it - (tx as any)[Symbol.for("prisma.client.transaction.id")] = - ctx.transactionId; - } else { - const txId = hashWithPrefix("yates_tx_", JSON.stringify(ctx)); - // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it - (tx as any)[Symbol.for("prisma.client.transaction.id")] = - txId; - } // Switch to the user role, We can't use a prepared statement here, due to limitations in PG not allowing prepared statements to be used in SET ROLE await tx.$queryRawUnsafe(`SET ROLE ${pgRole}`); // Now set all the context variables using `set_config` so that they can be used in RLS @@ -296,6 +336,7 @@ export const createClient = ( { maxWait: txMaxWait, timeout: txTimeout, + new_tx_id: txId, }, ); From 4d3f0bed7f85867df73fc3d992f4a4ff9edd7fff Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 12:24:16 +0000 Subject: [PATCH 03/17] f --- src/index.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/index.ts b/src/index.ts index 0db16f8..4c3f944 100644 --- a/src/index.ts +++ b/src/index.ts @@ -194,12 +194,14 @@ export const createClient = ( callback, options, }: { + // biome-ignore lint/suspicious/noExplicitAny: This is a private API callback: (client: any) => Promise; + // biome-ignore lint/suspicious/noExplicitAny: This is a private API options?: any; }) { const headers = { traceparent: this._tracingHelper.getTraceParent() }; - const optionsWithDefaults: any = { + const optionsWithDefaults = { maxWait: options?.maxWait ?? this._engineConfig.transactionOptions.maxWait, timeout: @@ -224,7 +226,7 @@ export const createClient = ( // it went well, then we commit the transaction await this._engine.transaction("commit", headers, info); - } catch (e: any) { + } catch (e: unknown) { // it went bad, then we rollback the transaction await this._engine.transaction("rollback", headers, info).catch(() => {}); From 2f0f63a75385c358596386e35a3822c29ac96a47 Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 13:14:16 +0000 Subject: [PATCH 04/17] f --- src/index.ts | 70 +++++++++++++++++++++++++++------------------------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/src/index.ts b/src/index.ts index 4c3f944..81f73ed 100644 --- a/src/index.ts +++ b/src/index.ts @@ -222,6 +222,8 @@ export const createClient = ( // execute user logic with a proxied the client const transaction = { kind: "itx", ...info } as const; + transaction.yates_id = optionsWithDefaults.new_tx_id; + result = await callback(this._createItxClient(transaction)); // it went well, then we commit the transaction @@ -301,39 +303,41 @@ export const createClient = ( // main query will no longer automatically run inside the transaction. We resolve this issue by manually executing the prisma request. // See https://github.com/prisma/prisma/issues/18276 // @ts-ignore - const queryResults = await prisma.$transaction( + return prisma.$transaction( async (tx) => { - // Switch to the user role, We can't use a prepared statement here, due to limitations in PG not allowing prepared statements to be used in SET ROLE - await tx.$queryRawUnsafe(`SET ROLE ${pgRole}`); - // Now set all the context variables using `set_config` so that they can be used in RLS - for (const [key, value] of toPairs(context)) { - await tx.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`; - } - - // Inconveniently, the `query` function will not run inside an interactive transaction. - // We need to manually reconstruct the query, and attached the "secret" transaction ID. - // This ensures that the query will run inside the transaction AND that middlewares will not be re-applied - - // https://github.com/prisma/prisma/blob/4.11.0/packages/client/src/runtime/getPrismaClient.ts#L1013 - // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it - const txId = (tx as any)[ - Symbol.for("prisma.client.transaction.id") - ]; - - // See https://github.com/prisma/prisma/blob/4.11.0/packages/client/src/runtime/getPrismaClient.ts#L860 - // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it - const __internalParams = (params as any).__internalParams; - const result = await prisma._executeRequest({ - ...__internalParams, - transaction: { - kind: "itx", - id: txId, - }, - }); - // Switch role back to admin user - await tx.$queryRawUnsafe("SET ROLE none"); - - return result; + return Promise.all([ + // Switch to the user role, We can't use a prepared statement here, due to limitations in PG not allowing prepared statements to be used in SET ROLE + tx.$queryRawUnsafe(`SET ROLE ${pgRole}`), + // Now set all the context variables using `set_config` so that they can be used in RLS + Promise.all( + toPairs(context).map( + ([key, value]) => + tx.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`, + ), + ), + // Inconveniently, the `query` function will not run inside an interactive transaction. + // We need to manually reconstruct the query, and attached the "secret" transaction ID. + // This ensures that the query will run inside the transaction AND that middlewares will not be re-applied + + // https://github.com/prisma/prisma/blob/4.11.0/packages/client/src/runtime/getPrismaClient.ts#L1013 + (() => { + // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it + const txId = (tx as any)[ + Symbol.for("prisma.client.transaction.id") + ]; + + // See https://github.com/prisma/prisma/blob/4.11.0/packages/client/src/runtime/getPrismaClient.ts#L860 + // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it + const __internalParams = (params as any).__internalParams; + return prisma._executeRequest({ + ...__internalParams, + transaction: { + kind: "itx", + id: txId, + }, + }); + })(), + ]).then((results) => results.pop()); }, { maxWait: txMaxWait, @@ -341,8 +345,6 @@ export const createClient = ( new_tx_id: txId, }, ); - - return queryResults; } catch (e) { // Normalize RLS errors to make them a bit more readable. if ( From ebca4d0077854584c92f4a04993bf5bb7c28b313 Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 13:18:19 +0000 Subject: [PATCH 05/17] f --- src/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/index.ts b/src/index.ts index 81f73ed..5083be2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -242,7 +242,7 @@ export const createClient = ( name: "Yates client", query: { $allModels: { - async $allOperations(params) { + $allOperations(params) { const { model, args, query, operation } = params; if (!model) { // If the model is not defined, we can't apply RLS @@ -304,7 +304,7 @@ export const createClient = ( // See https://github.com/prisma/prisma/issues/18276 // @ts-ignore return prisma.$transaction( - async (tx) => { + (tx) => { return Promise.all([ // Switch to the user role, We can't use a prepared statement here, due to limitations in PG not allowing prepared statements to be used in SET ROLE tx.$queryRawUnsafe(`SET ROLE ${pgRole}`), From c0d43f362aec28d94c5c01357c4ad96aef80c50c Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 15:43:14 +0000 Subject: [PATCH 06/17] f --- src/index.ts | 132 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index 5083be2..2682790 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ import * as crypto from "crypto"; -import { Prisma, PrismaClient } from "@prisma/client"; +import { Prisma, PrismaClient, PrismaPromise } from "@prisma/client"; import logger from "debug"; import difference from "lodash/difference"; import flatMap from "lodash/flatMap"; @@ -11,6 +11,100 @@ const VALID_OPERATIONS = ["SELECT", "UPDATE", "INSERT", "DELETE"] as const; const debug = logger("yates"); +const BatchTxIdCounter = { + id: 0, + nextId() { + return ++this.id; + }, +}; +export interface ErrorWithBatchIndex { + batchRequestIdx?: number; +} + +export function hasBatchIndex( + value: object, +): value is Required { + // @ts-ignore + return typeof value["batchRequestIdx"] === "number"; +} +export function waitForBatch[]>( + promises: T, +): Promise<{ [K in keyof T]: Awaited }> { + if (promises.length === 0) { + return Promise.resolve([] as { [K in keyof T]: Awaited }); + } + return new Promise((resolve, reject) => { + const successfulResults = new Array(promises.length) as { + [K in keyof T]: Awaited; + }; + let bestError: unknown = null; + let done = false; + let settledPromisesCount = 0; + + const settleOnePromise = () => { + if (done) { + return; + } + settledPromisesCount++; + if (settledPromisesCount === promises.length) { + done = true; + if (bestError) { + reject(bestError); + } else { + resolve(successfulResults); + } + } + }; + + const immediatelyReject = (error: unknown) => { + if (!done) { + done = true; + reject(error); + } + }; + + for (let i = 0; i < promises.length; i++) { + promises[i].then( + (result) => { + successfulResults[i] = result; + settleOnePromise(); + }, + (error) => { + if (!hasBatchIndex(error)) { + immediatelyReject(error); + return; + } + + if (error.batchRequestIdx === i) { + immediatelyReject(error); + } else { + if (!bestError) { + bestError = error; + } + settleOnePromise(); + } + }, + ); + } + }); +} + +export function getLockCountPromise( + knock: number, + cb: () => V | void = () => {}, +) { + let resolve: (v: V | void) => void; + const lock = new Promise((res) => (resolve = res)); + + return { + then(onFulfilled) { + if (--knock === 0) resolve(cb()); + + return onFulfilled?.(lock as unknown as V | void); + }, + } as PromiseLike; +} + type Operation = (typeof VALID_OPERATIONS)[number]; export type Models = Prisma.ModelName; @@ -189,6 +283,42 @@ export const createClient = ( // Set default options const { txMaxWait = 30000, txTimeout = 30000 } = options; + // biome-ignore lint/suspicious/noExplicitAny: TODO fix this + (prisma as any)._transactionWithArray = async function ({ + promises, + options, + }: { + promises: Array>; + options?: any; + }): Promise { + const id = BatchTxIdCounter.nextId(); + const lock = getLockCountPromise(promises.length); + + const requests = promises.map((request, index) => { + if (request?.[Symbol.toStringTag] !== "PrismaPromise") { + throw new Error( + `All elements of the array need to be Prisma Client promises. Hint: Please make sure you are not awaiting the Prisma client calls you intended to pass in the $transaction function.`, + ); + } + + const isolationLevel = + options?.isolationLevel ?? + this._engineConfig.transactionOptions.isolationLevel; + const transaction = { + kind: "batch", + id, + index, + isolationLevel, + lock, + yates_id: options?.new_tx_id, + } as const; + // @ts-ignore + return request.requestTransaction?.(transaction) ?? request; + }); + + return waitForBatch(requests); + }; + // biome-ignore lint/suspicious/noExplicitAny: TODO fix this (prisma as any)._transactionWithCallback = async function ({ callback, From 6525f16b4f2c92bdad4ed01d0f55dee2566d0d63 Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 15:47:07 +0000 Subject: [PATCH 07/17] f --- src/index.ts | 53 ++++++++++++++++------------------------------------ 1 file changed, 16 insertions(+), 37 deletions(-) diff --git a/src/index.ts b/src/index.ts index 2682790..81b46d1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -372,7 +372,7 @@ export const createClient = ( name: "Yates client", query: { $allModels: { - $allOperations(params) { + async $allOperations(params) { const { model, args, query, operation } = params; if (!model) { // If the model is not defined, we can't apply RLS @@ -433,48 +433,27 @@ export const createClient = ( // main query will no longer automatically run inside the transaction. We resolve this issue by manually executing the prisma request. // See https://github.com/prisma/prisma/issues/18276 // @ts-ignore - return prisma.$transaction( - (tx) => { - return Promise.all([ - // Switch to the user role, We can't use a prepared statement here, due to limitations in PG not allowing prepared statements to be used in SET ROLE - tx.$queryRawUnsafe(`SET ROLE ${pgRole}`), - // Now set all the context variables using `set_config` so that they can be used in RLS - Promise.all( - toPairs(context).map( - ([key, value]) => - tx.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`, - ), - ), - // Inconveniently, the `query` function will not run inside an interactive transaction. - // We need to manually reconstruct the query, and attached the "secret" transaction ID. - // This ensures that the query will run inside the transaction AND that middlewares will not be re-applied - - // https://github.com/prisma/prisma/blob/4.11.0/packages/client/src/runtime/getPrismaClient.ts#L1013 - (() => { - // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it - const txId = (tx as any)[ - Symbol.for("prisma.client.transaction.id") - ]; - - // See https://github.com/prisma/prisma/blob/4.11.0/packages/client/src/runtime/getPrismaClient.ts#L860 - // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it - const __internalParams = (params as any).__internalParams; - return prisma._executeRequest({ - ...__internalParams, - transaction: { - kind: "itx", - id: txId, - }, - }); - })(), - ]).then((results) => results.pop()); - }, + const results = await prisma.$transaction( + [ + // Switch to the user role, We can't use a prepared statement here, due to limitations in PG not allowing prepared statements to be used in SET ROLE + prisma.$queryRawUnsafe(`SET ROLE ${pgRole}`), + // Now set all the context variables using `set_config` so that they can be used in RLS + ...toPairs(context).map( + ([key, value]) => + prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`, + ), + // Inconveniently, the `query` function will not run inside an interactive transaction. + // We need to manually reconstruct the query, and attached the "secret" transaction ID. + // This ensures that the query will run inside the transaction AND that middlewares will not be re-applied + query(args), + ], { maxWait: txMaxWait, timeout: txTimeout, new_tx_id: txId, }, ); + return results.pop(); } catch (e) { // Normalize RLS errors to make them a bit more readable. if ( From 1eaba09b3bb53978ed0be55f6f96692758860bcd Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 16:02:39 +0000 Subject: [PATCH 08/17] f --- src/index.ts | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/index.ts b/src/index.ts index 81b46d1..b073380 100644 --- a/src/index.ts +++ b/src/index.ts @@ -428,6 +428,31 @@ export const createClient = ( const txId = ctx.transactionId ?? hashWithPrefix("yates_tx_", JSON.stringify(ctx)); + + // @ts-ignore + if (!globalThis.txIdSet) { + // @ts-ignore + globalThis.txIdSet = new Set(); + } + + let txInitiatedInTick = false; + + // @ts-ignore + if (globalThis.txIdSet.has(txId)) { + txInitiatedInTick = true; + } else { + // @ts-ignore + globalThis.txIdSet.add(txId); + } + if (txInitiatedInTick) { + // @ts-ignore + const results = await prisma.$transaction([query(args)], { + maxWait: txMaxWait, + timeout: txTimeout, + new_tx_id: txId, + }); + return results.pop(); + } // Because batch transactions inside a prisma client query extension can run out of order if used with async middleware, // we need to run the logic inside an interactive transaction, however this brings a different set of problems in that the // main query will no longer automatically run inside the transaction. We resolve this issue by manually executing the prisma request. From 57811c026e8e34e43e1ceaecd4282178f8f99779 Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 16:09:55 +0000 Subject: [PATCH 09/17] f --- src/index.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/index.ts b/src/index.ts index b073380..72022bf 100644 --- a/src/index.ts +++ b/src/index.ts @@ -443,6 +443,10 @@ export const createClient = ( } else { // @ts-ignore globalThis.txIdSet.add(txId); + process.nextTick(() => { + // @ts-ignore + globalThis.txIdSet.delete(txId); + }); } if (txInitiatedInTick) { // @ts-ignore From c5a74b8ebb3acd9a14ef5b45eb02920f4e9b4a0e Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 16:45:06 +0000 Subject: [PATCH 10/17] f --- src/index.ts | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/index.ts b/src/index.ts index 72022bf..3bc9977 100644 --- a/src/index.ts +++ b/src/index.ts @@ -274,6 +274,38 @@ export const createRoleName = (name: string) => { return sanitizeSlug(hashWithPrefix("yates_role_", `${name}`)); }; +// @ts-ignore +export function getBatchId(query: any): string | undefined { + if (query.action !== "findUnique" && query.action !== "findUniqueOrThrow") { + return undefined; + } + const parts: string[] = []; + if (query.modelName) { + parts.push(query.modelName); + } + + if (query.query.arguments) { + parts.push(buildKeysString(query.query.arguments)); + } + parts.push(buildKeysString(query.query.selection)); + + return parts.join(""); +} +function buildKeysString(obj: object): string { + const keysArray = Object.keys(obj) + .sort() + .map((key) => { + // @ts-ignore + const value = obj[key]; + if (typeof value === "object" && value !== null) { + return `(${key} ${buildKeysString(value)})`; + } + return key; + }); + + return `(${keysArray.join(" ")})`; +} + // This uses client extensions to set the role and context for the current user so that RLS can be applied export const createClient = ( prisma: PrismaClient, @@ -283,6 +315,16 @@ export const createClient = ( // Set default options const { txMaxWait = 30000, txTimeout = 30000 } = options; + (prisma as any)._requestHandler.batchBy = (n) => { + console.log("batch by yates id?", n.transaction?.yates_id); + console.log("pq", getBatchId(n.protocolQuery)); + return n.transaction?.yates_id + ? n.transaction.yates_id + (getBatchId(n.protocolQuery) || "") + : n.transaction?.id + ? `transaction-${n.transaction.id}` + : getBatchId(n.protocolQuery); + }; + // biome-ignore lint/suspicious/noExplicitAny: TODO fix this (prisma as any)._transactionWithArray = async function ({ promises, From 6c5653661f38eea1dd2eb4c10916e8ca99675944 Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Thu, 31 Oct 2024 16:46:17 +0000 Subject: [PATCH 11/17] f --- src/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/index.ts b/src/index.ts index 3bc9977..d7d10e5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -315,6 +315,7 @@ export const createClient = ( // Set default options const { txMaxWait = 30000, txTimeout = 30000 } = options; + // @ts-ignore (prisma as any)._requestHandler.batchBy = (n) => { console.log("batch by yates id?", n.transaction?.yates_id); console.log("pq", getBatchId(n.protocolQuery)); From c3dadbc5658205c351240d56bc502b96e20db547 Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Fri, 1 Nov 2024 21:48:11 +0000 Subject: [PATCH 12/17] f --- src/index.ts | 128 +++++++++++++++++++++++++++++---------------------- 1 file changed, 72 insertions(+), 56 deletions(-) diff --git a/src/index.ts b/src/index.ts index d7d10e5..ff13a47 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,6 +11,17 @@ const VALID_OPERATIONS = ["SELECT", "UPDATE", "INSERT", "DELETE"] as const; const debug = logger("yates"); +interface Batch { + pgRole: string; + context?: { [x: string]: string | number | string[] }; + requests: Array<{ + query: (args: unknown[]) => PrismaPromise; + args: unknown; + resolve: (result: unknown) => void; + reject: (error: unknown) => void; + }>; +} + const BatchTxIdCounter = { id: 0, nextId() { @@ -316,7 +327,7 @@ export const createClient = ( const { txMaxWait = 30000, txTimeout = 30000 } = options; // @ts-ignore - (prisma as any)._requestHandler.batchBy = (n) => { + (prisma as any)._requestHandler.dataloader.options.batchBy = (n) => { console.log("batch by yates id?", n.transaction?.yates_id); console.log("pq", getBatchId(n.protocolQuery)); return n.transaction?.yates_id @@ -411,6 +422,41 @@ export const createClient = ( return result; }; + let tickActive = false; + const batches: Record = {}; + + const dispatchBatches = () => { + for (const [key, batch] of Object.entries(batches)) { + console.log(key, batch); + prisma + .$transaction([ + prisma.$queryRawUnsafe(`SET ROLE ${batch.pgRole}`), + // Now set all the context variables using `set_config` so that they can be used in RLS + ...toPairs(batch.context).map( + ([key, value]) => + prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`, + ), + ...batch.requests.map((request) => + request.query(request.args as unknown[]), + ), + // Switch role back to admin user + prisma.$queryRawUnsafe("SET ROLE none"), + ]) + .then((results) => { + const n = toPairs(batch.context).length + 1; + const slicedResults = results.slice(0, n - 1); + slicedResults.forEach((result, index) => { + batch.requests[index].resolve(result); + }); + delete batches[key]; + }) + .catch((e) => { + batch.requests.forEach((request) => request.reject(e)); + delete batches[key]; + }); + } + }; + const client = prisma.$extends({ name: "Yates client", query: { @@ -468,64 +514,34 @@ export const createClient = ( } try { - const txId = - ctx.transactionId ?? - hashWithPrefix("yates_tx_", JSON.stringify(ctx)); - - // @ts-ignore - if (!globalThis.txIdSet) { - // @ts-ignore - globalThis.txIdSet = new Set(); + const txId = hashWithPrefix("yates_tx_", JSON.stringify(ctx)); + + const hash = txId; + if (!batches[hash]) { + batches[hash] = { + pgRole, + context, + requests: [], + }; + + // make sure, that we only tick once at a time + if (tickActive) { + tickActive = true; + process.nextTick(() => { + dispatchBatches(); + tickActive = false; + }); + } } - let txInitiatedInTick = false; - - // @ts-ignore - if (globalThis.txIdSet.has(txId)) { - txInitiatedInTick = true; - } else { - // @ts-ignore - globalThis.txIdSet.add(txId); - process.nextTick(() => { - // @ts-ignore - globalThis.txIdSet.delete(txId); + return new Promise((resolve, reject) => { + batches[hash].requests.push({ + query, + args, + resolve, + reject, }); - } - if (txInitiatedInTick) { - // @ts-ignore - const results = await prisma.$transaction([query(args)], { - maxWait: txMaxWait, - timeout: txTimeout, - new_tx_id: txId, - }); - return results.pop(); - } - // Because batch transactions inside a prisma client query extension can run out of order if used with async middleware, - // we need to run the logic inside an interactive transaction, however this brings a different set of problems in that the - // main query will no longer automatically run inside the transaction. We resolve this issue by manually executing the prisma request. - // See https://github.com/prisma/prisma/issues/18276 - // @ts-ignore - const results = await prisma.$transaction( - [ - // Switch to the user role, We can't use a prepared statement here, due to limitations in PG not allowing prepared statements to be used in SET ROLE - prisma.$queryRawUnsafe(`SET ROLE ${pgRole}`), - // Now set all the context variables using `set_config` so that they can be used in RLS - ...toPairs(context).map( - ([key, value]) => - prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`, - ), - // Inconveniently, the `query` function will not run inside an interactive transaction. - // We need to manually reconstruct the query, and attached the "secret" transaction ID. - // This ensures that the query will run inside the transaction AND that middlewares will not be re-applied - query(args), - ], - { - maxWait: txMaxWait, - timeout: txTimeout, - new_tx_id: txId, - }, - ); - return results.pop(); + }); } catch (e) { // Normalize RLS errors to make them a bit more readable. if ( From 9bf83363ceb23650dee0c562efe44b6bee361e93 Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Fri, 1 Nov 2024 22:19:03 +0000 Subject: [PATCH 13/17] f --- src/index.ts | 52 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/src/index.ts b/src/index.ts index ff13a47..3539413 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,6 +15,7 @@ interface Batch { pgRole: string; context?: { [x: string]: string | number | string[] }; requests: Array<{ + params: object; query: (args: unknown[]) => PrismaPromise; args: unknown; resolve: (result: unknown) => void; @@ -428,27 +429,41 @@ export const createClient = ( const dispatchBatches = () => { for (const [key, batch] of Object.entries(batches)) { console.log(key, batch); + delete batches[key]; + prisma - .$transaction([ - prisma.$queryRawUnsafe(`SET ROLE ${batch.pgRole}`), - // Now set all the context variables using `set_config` so that they can be used in RLS - ...toPairs(batch.context).map( - ([key, value]) => - prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`, - ), - ...batch.requests.map((request) => - request.query(request.args as unknown[]), - ), + .$transaction(async (tx) => { + await tx.$queryRawUnsafe(`SET ROLE ${batch.pgRole}`), + // Now set all the context variables using `set_config` so that they can be used in RLS + await Promise.all( + toPairs(batch.context).map( + ([key, value]) => + prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`, + ), + ); + // https://github.com/prisma/prisma/blob/4.11.0/packages/client/src/runtime/getPrismaClient.ts#L1013 + // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it + const txId = (tx as any)[Symbol.for("prisma.client.transaction.id")]; + const results = await Promise.all( + batch.requests.map((request) => + prisma._executeRequest({ + ...request.params, + transaction: { + kind: "itx", + id: txId, + }, + }), + ), + ); // Switch role back to admin user - prisma.$queryRawUnsafe("SET ROLE none"), - ]) + await prisma.$queryRawUnsafe("SET ROLE none"); + + return results; + }) .then((results) => { - const n = toPairs(batch.context).length + 1; - const slicedResults = results.slice(0, n - 1); - slicedResults.forEach((result, index) => { + results.forEach((result, index) => { batch.requests[index].resolve(result); }); - delete batches[key]; }) .catch((e) => { batch.requests.forEach((request) => request.reject(e)); @@ -534,8 +549,13 @@ export const createClient = ( } } + // See https://github.com/prisma/prisma/blob/4.11.0/packages/client/src/runtime/getPrismaClient.ts#L860 + // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it + const __internalParams = (params as any).__internalParams; + return new Promise((resolve, reject) => { batches[hash].requests.push({ + params: __internalParams, query, args, resolve, From 53c1af344e7c458e9dcf5b680064c619956ae02c Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Fri, 1 Nov 2024 22:23:21 +0000 Subject: [PATCH 14/17] f --- src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index 3539413..85afb7d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -540,7 +540,7 @@ export const createClient = ( }; // make sure, that we only tick once at a time - if (tickActive) { + if (!tickActive) { tickActive = true; process.nextTick(() => { dispatchBatches(); From 2e349ae01a319e1833001651a15b37b339a9f879 Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Fri, 1 Nov 2024 22:28:11 +0000 Subject: [PATCH 15/17] f --- src/index.ts | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/index.ts b/src/index.ts index 85afb7d..e86eaa4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -433,14 +433,11 @@ export const createClient = ( prisma .$transaction(async (tx) => { - await tx.$queryRawUnsafe(`SET ROLE ${batch.pgRole}`), - // Now set all the context variables using `set_config` so that they can be used in RLS - await Promise.all( - toPairs(batch.context).map( - ([key, value]) => - prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`, - ), - ); + await tx.$queryRawUnsafe(`SET ROLE ${batch.pgRole}`); + // Now set all the context variables using `set_config` so that they can be used in RLS + for (const [key, value] of toPairs(batch.context)) { + await prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`; + } // https://github.com/prisma/prisma/blob/4.11.0/packages/client/src/runtime/getPrismaClient.ts#L1013 // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it const txId = (tx as any)[Symbol.for("prisma.client.transaction.id")]; From d0a48e35955ad3ef7b7ba1d1a693f24f3ff4022b Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Fri, 1 Nov 2024 22:45:30 +0000 Subject: [PATCH 16/17] f --- src/index.ts | 88 +--------------------------------------------------- 1 file changed, 1 insertion(+), 87 deletions(-) diff --git a/src/index.ts b/src/index.ts index e86eaa4..8d57f33 100644 --- a/src/index.ts +++ b/src/index.ts @@ -329,8 +329,6 @@ export const createClient = ( // @ts-ignore (prisma as any)._requestHandler.dataloader.options.batchBy = (n) => { - console.log("batch by yates id?", n.transaction?.yates_id); - console.log("pq", getBatchId(n.protocolQuery)); return n.transaction?.yates_id ? n.transaction.yates_id + (getBatchId(n.protocolQuery) || "") : n.transaction?.id @@ -338,91 +336,6 @@ export const createClient = ( : getBatchId(n.protocolQuery); }; - // biome-ignore lint/suspicious/noExplicitAny: TODO fix this - (prisma as any)._transactionWithArray = async function ({ - promises, - options, - }: { - promises: Array>; - options?: any; - }): Promise { - const id = BatchTxIdCounter.nextId(); - const lock = getLockCountPromise(promises.length); - - const requests = promises.map((request, index) => { - if (request?.[Symbol.toStringTag] !== "PrismaPromise") { - throw new Error( - `All elements of the array need to be Prisma Client promises. Hint: Please make sure you are not awaiting the Prisma client calls you intended to pass in the $transaction function.`, - ); - } - - const isolationLevel = - options?.isolationLevel ?? - this._engineConfig.transactionOptions.isolationLevel; - const transaction = { - kind: "batch", - id, - index, - isolationLevel, - lock, - yates_id: options?.new_tx_id, - } as const; - // @ts-ignore - return request.requestTransaction?.(transaction) ?? request; - }); - - return waitForBatch(requests); - }; - - // biome-ignore lint/suspicious/noExplicitAny: TODO fix this - (prisma as any)._transactionWithCallback = async function ({ - callback, - options, - }: { - // biome-ignore lint/suspicious/noExplicitAny: This is a private API - callback: (client: any) => Promise; - // biome-ignore lint/suspicious/noExplicitAny: This is a private API - options?: any; - }) { - const headers = { traceparent: this._tracingHelper.getTraceParent() }; - - const optionsWithDefaults = { - maxWait: - options?.maxWait ?? this._engineConfig.transactionOptions.maxWait, - timeout: - options?.timeout ?? this._engineConfig.transactionOptions.timeout, - isolationLevel: - options?.isolationLevel ?? - this._engineConfig.transactionOptions.isolationLevel, - new_tx_id: options?.new_tx_id ?? undefined, - }; - const info = await this._engine.transaction( - "start", - headers, - optionsWithDefaults, - ); - - let result: unknown; - try { - // execute user logic with a proxied the client - const transaction = { kind: "itx", ...info } as const; - - transaction.yates_id = optionsWithDefaults.new_tx_id; - - result = await callback(this._createItxClient(transaction)); - - // it went well, then we commit the transaction - await this._engine.transaction("commit", headers, info); - } catch (e: unknown) { - // it went bad, then we rollback the transaction - await this._engine.transaction("rollback", headers, info).catch(() => {}); - - throw e; // silent rollback, throw original error - } - - return result; - }; - let tickActive = false; const batches: Record = {}; @@ -448,6 +361,7 @@ export const createClient = ( transaction: { kind: "itx", id: txId, + yates_id: key, }, }), ), From a4055e7f749f77c4391846dcfad9ed8415190a79 Mon Sep 17 00:00:00 2001 From: Lucian Buzzo Date: Fri, 1 Nov 2024 22:48:10 +0000 Subject: [PATCH 17/17] f --- src/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/index.ts b/src/index.ts index 8d57f33..bfc3f18 100644 --- a/src/index.ts +++ b/src/index.ts @@ -349,7 +349,7 @@ export const createClient = ( await tx.$queryRawUnsafe(`SET ROLE ${batch.pgRole}`); // Now set all the context variables using `set_config` so that they can be used in RLS for (const [key, value] of toPairs(batch.context)) { - await prisma.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`; + await tx.$queryRaw`SELECT set_config(${key}, ${value.toString()}, true);`; } // https://github.com/prisma/prisma/blob/4.11.0/packages/client/src/runtime/getPrismaClient.ts#L1013 // biome-ignore lint/suspicious/noExplicitAny: This is a private API, so not much we can do about it @@ -367,7 +367,7 @@ export const createClient = ( ), ); // Switch role back to admin user - await prisma.$queryRawUnsafe("SET ROLE none"); + await tx.$queryRawUnsafe("SET ROLE none"); return results; })