diff --git a/src/core/shift.js b/src/core/shift.js index 27066e5..b599b76 100644 --- a/src/core/shift.js +++ b/src/core/shift.js @@ -1,17 +1,17 @@ import { - wrap, - unwrap, - remove, - merge, - mergeStreams, + decodeGraph, + decodeQuery, + encodeGraph, encodePath, encodeQuery, - encodeGraph, - decodeQuery, - decodeGraph, + finalize, + merge, + mergeStreams, + remove, + unwrap, unwrapObject, + wrap, wrapObject, - finalize, } from '@graffy/common'; import { makeStream } from '@graffy/stream'; @@ -32,13 +32,13 @@ export function wrapProvider(fn, decodedPath, isRead) { const remainingPayload = remove(payload, path) || []; // This next function is offered to the provider function. - async function shiftedNext(porcelainNextPayload) { + async function shiftedNext(porcelainNextPayload, nextOptions) { nextCalled = true; const nextPayload = encodePayload( wrapObject(porcelainNextPayload, decodedPath), ); if (remainingPayload.length) merge(nextPayload, remainingPayload); - const nextResult = await next(nextPayload); + const nextResult = await next(nextPayload, nextOptions); // Remember the next() results that are not returned to this provider. // These will be merged into the result later. @@ -83,7 +83,10 @@ export function shiftGen(fn, path) { const remainingPayload = remove(payload, path) || []; // TODO: This should probably use makeStream and propagate returns. - const shiftedNext = async function* shiftedNextFn(unwrappedNextPayload) { + const shiftedNext = async function* shiftedNextFn( + unwrappedNextPayload, + nextOptions, + ) { nextCalled = true; const nextPayload = wrap(unwrappedNextPayload, path); if (remainingPayload.length) merge(nextPayload, remainingPayload); @@ -94,7 +97,7 @@ export function shiftGen(fn, path) { pushRemaining = push; }); - for await (const value of next(nextPayload)) { + for await (const value of next(nextPayload, nextOptions)) { const unwrappedValue = unwrap(value, path); const remainingValue = remove(value, path); if (remainingValue) pushRemaining(remainingValue); diff --git a/src/core/test/porcelain.test.js b/src/core/test/porcelain.test.js index 69d28b6..609afa4 100644 --- a/src/core/test/porcelain.test.js +++ b/src/core/test/porcelain.test.js @@ -1,7 +1,7 @@ -import { jest } from '@jest/globals'; -import Graffy from '../Graffy.js'; import GraffyFill from '@graffy/fill'; import { page, ref } from '@graffy/testing'; +import { jest } from '@jest/globals'; +import Graffy from '../Graffy.js'; test('Porcelain read', async () => { const store = new Graffy(); @@ -403,3 +403,17 @@ test('onReadWithNext', async () => { }, }); }); + +test('modified_next_options', async () => { + // @ts-ignore bad jest mockResolvedValue definitions? + const mockOnRead = jest.fn().mockResolvedValue({ 123: { name: 'Alice' } }); + const query = { 123: { name: true } }; + const store = new Graffy(); + store.use(GraffyFill()); + store.onRead('user', (query, _options, next) => { + return next(query, { bar: true }); + }); + store.onRead('user', mockOnRead); + await store.read('user', query, { foo: 2 }); + expect(mockOnRead).toBeCalledWith(query, { bar: true }, expect.any(Function)); +}); diff --git a/src/pg/Db.js b/src/pg/Db.js index 1f03239..2f2bd14 100644 --- a/src/pg/Db.js +++ b/src/pg/Db.js @@ -1,24 +1,24 @@ -import pg from 'pg'; -import sqlTag from 'sql-template-tag'; import { - isPlainObject, + cmp, decodeArgs, - wrap, - unwrap, - finalize, - merge, - wrapObject, + decodeGraph, + decodeQuery, encodeGraph, + encodePath, + finalize, isEmpty, + isPlainObject, isRange, - decodeGraph, + merge, mergeObject, - decodeQuery, - cmp, - encodePath, + unwrap, + wrap, + wrapObject, } from '@graffy/common'; -import { selectByArgs, selectByIds, put, patch, del } from './sql/index.js'; import debug from 'debug'; +import pg from 'pg'; +import sqlTag from 'sql-template-tag'; +import { del, patch, put, selectByArgs, selectByIds } from './sql/index.js'; const log = debug('graffy:pg:db'); const { Pool, Client, types } = pg; @@ -88,7 +88,7 @@ export default class Db { if (!res.rowCount) { throw Error(`pg.nothing_written ${sql.text} with ${sql.values}`); } - return res.rows[0]; + return res.rows; } /* @@ -183,11 +183,11 @@ export default class Db { selectByIds(Object.keys(idQueries), null, tableOptions), tableOptions, ); - result.forEach((object) => { + for (const object of result) { const wrappedGraph = encodeGraph(wrapObject(object, rawPrefix)); log('getByIds', wrappedGraph); merge(results, wrappedGraph); - }); + } }; const query = unwrap(rootQuery, prefix); @@ -227,11 +227,17 @@ export default class Db { const change = unwrap(rootChange, prefix); - const sqls = change.map((node) => { + const puts = []; + const sqls = []; + for (const node of change) { const arg = decodeArgs(node); if (isRange(node)) { - if (cmp(node.key, node.end) === 0) return del(arg, tableOptions); + if (cmp(node.key, node.end) === 0) { + log('delete', node); + sqls.push(del(arg, tableOptions)); + continue; + } throw Error('pg_write.write_range_unsupported'); } @@ -239,22 +245,27 @@ export default class Db { if (isPlainObject(arg)) { mergeObject(object, arg); } else { - object.id = arg; + object[tableOptions.idCol] = arg; } if (object.$put && object.$put !== true) { throw Error('pg_write.partial_put_unsupported'); } - return object.$put - ? put(object, arg, tableOptions) - : patch(object, arg, tableOptions); - }); + if (object.$put) { + puts.push([object, arg]); + } else { + sqls.push(patch(object, arg, tableOptions)); + } + } + + if (puts.length) sqls.push(...put(puts, tableOptions)); const result = []; await Promise.all( sqls.map((sql) => this.writeSql(sql, tableOptions).then((object) => { + log('returned_object_wrapped', wrapObject(object, rawPrefix)); merge(result, encodeGraph(wrapObject(object, rawPrefix))); }), ), diff --git a/src/pg/sql/clauses.js b/src/pg/sql/clauses.js index 3ba4b10..2f954e2 100644 --- a/src/pg/sql/clauses.js +++ b/src/pg/sql/clauses.js @@ -1,5 +1,5 @@ -import sql, { Sql, raw, join, empty } from 'sql-template-tag'; import { isEmpty } from '@graffy/common'; +import sql, { Sql, empty, join, raw } from 'sql-template-tag'; /* Important: This function assumes that the object's keys are from @@ -110,19 +110,49 @@ function castValue(value, type, name, isPut) { return value; } -export const getInsert = (row, options) => { +export const getInsert = (rows, options) => { + const { verCol, schema } = options; const cols = []; + const colSqls = []; + const colIx = {}; + const colUsed = []; + + for (const col of Object.keys(options.schema.types)) { + colIx[col] = cols.length; + colUsed[cols.length] = false; + cols.push(col); + colSqls.push(sql`"${raw(col)}"`); + } + + colUsed[colIx[verCol]] = true; + const vals = []; + for (const row of rows) { + const rowVals = Array(cols.length).fill(sql`default`); + + for (const col of cols) { + if (col === verCol || !(col in row)) continue; + const ix = colIx[col]; + colUsed[ix] = true; + rowVals[ix] = castValue(row[col], schema.types[col], col, row.$put); + } - Object.entries(row) - .filter(([col]) => col !== options.verCol && col[0] !== '$') - .concat([[options.verCol, sql`default`]]) - .forEach(([col, val]) => { - cols.push(sql`"${raw(col)}"`); - vals.push(castValue(val, options.schema.types[col], col, row.$put)); - }); + vals.push(rowVals); + } + + const isUsed = (_, ix) => colUsed[ix]; - return { cols: join(cols, ', '), vals: join(vals, ', ') }; + return { + cols: join(colSqls.filter(isUsed), ', '), + vals: join( + vals.map((rowVals) => sql`(${join(rowVals.filter(isUsed), ', ')})`), + ', ', + ), + updates: join( + colSqls.map((col, ix) => sql`${col} = "excluded".${col}`).filter(isUsed), + ', ', + ), + }; }; export const getUpdates = (row, options) => { diff --git a/src/pg/sql/upsert.js b/src/pg/sql/upsert.js index ba68758..96df04c 100644 --- a/src/pg/sql/upsert.js +++ b/src/pg/sql/upsert.js @@ -1,8 +1,8 @@ -import sql, { raw, join } from 'sql-template-tag'; import { isPlainObject } from '@graffy/common'; +import sql, { join, raw } from 'sql-template-tag'; +import { getInsert, getSelectCols, getUpdates } from './clauses.js'; import getArgSql from './getArgSql.js'; import { getIdMeta } from './getMeta.js'; -import { getInsert, getSelectCols, getUpdates } from './clauses.js'; function getSingleSql(arg, options) { const { table, idCol } = options; @@ -33,7 +33,7 @@ export function patch(object, arg, options) { const { table } = options; const { where, meta } = getSingleSql(arg, options); - const row = object; // objectToRow(object, options); + const row = object; return sql` UPDATE "${raw(table)}" SET ${getUpdates(row, options)} @@ -41,26 +41,40 @@ export function patch(object, arg, options) { RETURNING ${getSelectCols(options)}, ${meta}`; } -export function put(object, arg, options) { +export function put(puts, options) { const { idCol, table } = options; - const row = object; // objectToRow(object, options); + const sqls = []; + + const addSql = (rows, meta, conflictTarget) => { + const { cols, vals, updates } = getInsert(rows, options); + sqls.push(sql` + INSERT INTO "${raw(table)}" (${cols}) VALUES ${vals} + ON CONFLICT (${conflictTarget}) DO UPDATE SET ${updates} + RETURNING ${getSelectCols(options)}, ${meta}`); + }; - let meta; - let conflictTarget; - if (isPlainObject(arg)) { - ({ meta } = getArgSql(arg, options)); - conflictTarget = join(Object.keys(arg).map((col) => sql`"${raw(col)}"`)); - } else { - meta = getIdMeta(options); - conflictTarget = sql`"${raw(idCol)}"`; + const idRows = []; + for (const put of puts) { + const [row, arg] = put; + if (!isPlainObject(arg)) { + idRows.push(row); + continue; + } + + const { meta } = getArgSql(arg, options); + const conflictTarget = join( + Object.keys(arg).map((col) => sql`"${raw(col)}"`), + ); + addSql([row], meta, conflictTarget); } - const { cols, vals } = getInsert(row, options); + if (idRows.length) { + const meta = getIdMeta(options); + const conflictTarget = sql`"${raw(idCol)}"`; + addSql(idRows, meta, conflictTarget); + } - return sql` - INSERT INTO "${raw(table)}" (${cols}) VALUES (${vals}) - ON CONFLICT (${conflictTarget}) DO UPDATE SET (${cols}) = (${vals}) - RETURNING ${getSelectCols(options)}, ${meta}`; + return sqls; } export function del(arg, options) { diff --git a/src/pg/test/db/dbWrite.test.js b/src/pg/test/db/dbWrite.test.js index 43c6256..3cd755c 100644 --- a/src/pg/test/db/dbWrite.test.js +++ b/src/pg/test/db/dbWrite.test.js @@ -1,5 +1,5 @@ -import { jest } from '@jest/globals'; import Graffy from '@graffy/core'; +import { jest } from '@jest/globals'; import sql from 'sql-template-tag'; import expectSql from '../expectSql.js'; @@ -123,10 +123,10 @@ describe('postgres', () => { expect(mockQuery).toBeCalled(); const sqlQuery = sql` - INSERT INTO "user" ("name", "id", "updatedAt") - VALUES (${data.name}, ${'foo'}, default) + INSERT INTO "user" ("id", "name", "updatedAt") + VALUES (${'foo'}, ${data.name}, default) ON CONFLICT ("id") DO UPDATE SET - ("name", "id", "updatedAt") = (${data.name}, ${'foo'}, default) + "id" = "excluded"."id", "name" = "excluded"."name", "updatedAt" = "excluded"."updatedAt" RETURNING *, "id" AS "$key", current_timestamp AS "$ver"`; expectSql(mockQuery.mock.calls[0][0], sqlQuery); }); @@ -140,11 +140,10 @@ describe('postgres', () => { await store.write('googleSession', data); const sqlQuery = sql` - INSERT INTO "googleSession" ( "token", "userId", "version" ) - VALUES (${data.token}, ${'userId_01'}, default) + INSERT INTO "googleSession" ( "userId", "token", "version" ) + VALUES (${'userId_01'}, ${data.token}, default) ON CONFLICT ( "userId" ) - DO UPDATE SET ( "token", "userId", "version" ) = - (${data.token}, ${'userId_01'}, default) + DO UPDATE SET "userId" = "excluded"."userId", "token" = "excluded"."token", "version" = "excluded"."version" RETURNING *, ${`{"userId":"userId_01"}`}::jsonb AS "$key" , current_timestamp AS "$ver", @@ -159,12 +158,16 @@ describe('postgres', () => { $put: true, }; + mockQuery.mockResolvedValueOnce({ + rowCount: 1, + rows: [{ foo: true }], + }); + await store.write(['googleSession', { userId: 'userId_01' }], data); const sqlQuery = sql` - INSERT INTO "googleSession" ( "token" , "userId", "version" ) - VALUES ( ${data.token} , ${data.userId}, default) ON CONFLICT ("userId") - DO UPDATE SET ( "token" ,"userId", "version" ) = - (${data.token}, ${data.userId}, default) + INSERT INTO "googleSession" ( "userId", "token", "version" ) + VALUES ( ${data.userId}, ${data.token}, default) ON CONFLICT ("userId") + DO UPDATE SET "userId" = "excluded"."userId", "token" = "excluded"."token", "version" = "excluded"."version" RETURNING *, ${`{"userId":"userId_01"}`}::jsonb AS "$key" , current_timestamp AS "$ver", @@ -181,10 +184,9 @@ describe('postgres', () => { await store.write('email.e1', data); const sqlQuery = sql` - INSERT INTO "email" ("tenantId", "userId", "id", "version" ) - VALUES (${data.tenantId}, ${data.userId}, ${'e1'}, default) - ON CONFLICT ("id") DO UPDATE SET ("tenantId", "userId", "id", "version" ) = - (${data.tenantId} , ${data.userId} , ${'e1'}, default) + INSERT INTO "email" ("id", "userId", "tenantId", "version" ) + VALUES (${'e1'}, ${data.userId}, ${data.tenantId}, default) + ON CONFLICT ("id") DO UPDATE SET "id" = "excluded"."id", "userId" = "excluded"."userId", "tenantId" = "excluded"."tenantId", "version" = "excluded"."version" RETURNING *, "id" AS "$key", current_timestamp AS "$ver"`; expectSql(mockQuery.mock.calls[0][0], sqlQuery); }); diff --git a/src/pg/test/sql/clauses.test.js b/src/pg/test/sql/clauses.test.js index 61ff966..6e9c1f4 100644 --- a/src/pg/test/sql/clauses.test.js +++ b/src/pg/test/sql/clauses.test.js @@ -1,26 +1,38 @@ import sql from 'sql-template-tag'; -import expectSql from '../expectSql'; import { getInsert, - getUpdates, getJsonBuildTrusted, getSelectCols, + getUpdates, } from '../../sql/clauses'; +import expectSql from '../expectSql'; describe('clauses', () => { - const data = { a: 1, b: 1 }; - test('insert', () => { - const { cols, vals } = getInsert(data, { + const data = [ + { a: 1, b: 1 }, + { a: 2, b: 2 }, + ]; + + const { cols, vals, updates } = getInsert(data, { verCol: 'version', schema: { types: { a: 'int8', b: 'float', version: 'int8' } }, verDefault: 'current_timestamp', }); expectSql(cols, sql`"a", "b", "version"`); - expectSql(vals, sql`${data.a} , ${data.b} , default`); + expectSql( + vals, + sql`(${data[0].a} , ${data[0].b} , default), (${data[1].a} , ${data[1].b} , default)`, + ); + expectSql( + updates, + sql`"a" = "excluded"."a", "b" = "excluded"."b", "version" = "excluded"."version"`, + ); }); test('updates', () => { + const data = { a: 1, b: 1 }; + const options = { idCol: 'id', verCol: 'version', diff --git a/src/pg/test/sql/upsert.test.js b/src/pg/test/sql/upsert.test.js index 7e2ec5c..324804c 100644 --- a/src/pg/test/sql/upsert.test.js +++ b/src/pg/test/sql/upsert.test.js @@ -1,4 +1,4 @@ -import { put, patch } from '../../sql/upsert.js'; +import { patch, put } from '../../sql/upsert.js'; import sql from 'sql-template-tag'; import expectSql from '../expectSql.js'; @@ -25,27 +25,76 @@ const options = { describe('byId', () => { test('put', async () => { + const sqls = put( + [ + [ + { + $put: true, + id: 'post22', + type: 'post', + name: 'hello', + email: 'world', + config: { foo: 3 }, + tags: [1, 2], + }, + 'post22', + ], + ], + options, + ); expectSql( - put( - { - $put: true, - id: 'post22', - type: 'post', - name: 'hello', - email: 'world', - config: { foo: 3 }, - tags: [1, 2], - }, - 'post22', - options, - ), - sql`INSERT INTO "post" ("id", "type", "name", "email", "config", "tags", "version") - VALUES (${'post22'}, ${'post'}, ${'hello'},${'world'}, + sqls[0], + sql`INSERT INTO "post" ("id", "name", "type", "email", "config", "tags", "version") + VALUES (${'post22'}, ${'hello'}, ${'post'}, ${'world'}, ${JSON.stringify({ foo: 3 })}, ${JSON.stringify([1, 2])}, default) - ON CONFLICT ("id") DO UPDATE SET ("id", "type", "name", "email", "config", "tags", "version") - = (${'post22'}, ${'post'}, ${'hello'},${'world'}, - ${JSON.stringify({ foo: 3 })}, - ${JSON.stringify([1, 2])}, default) + ON CONFLICT ("id") DO UPDATE SET "id" = "excluded"."id", "name" = "excluded"."name", + "type" = "excluded"."type", "email" = "excluded"."email", "config" = "excluded"."config", + "tags" = "excluded"."tags", "version" = "excluded"."version" + RETURNING *, "id" AS "$key", current_timestamp AS "$ver" + `, + ); + }); + + test('put_multi', async () => { + const sqls = put( + [ + [ + { + $put: true, + id: 'post22', + type: 'post', + name: 'hello', + email: 'world', + config: { foo: 3 }, + tags: [1, 2], + }, + 'post22', + ], + [ + { + $put: true, + id: 'post24', + type: 'post', + name: 'hi there', + email: 'mars', + config: { foo: 8 }, + tags: [0, 9], + }, + 'post24', + ], + ], + options, + ); + expectSql( + sqls[0], + sql`INSERT INTO "post" ("id", "name", "type", "email", "config", "tags", "version") + VALUES (${'post22'}, ${'hello'}, ${'post'}, ${'world'}, + ${JSON.stringify({ foo: 3 })}, ${JSON.stringify([1, 2])}, default), + (${'post24'}, ${'hi there'}, ${'post'}, ${'mars'}, + ${JSON.stringify({ foo: 8 })}, ${JSON.stringify([0, 9])}, default) + ON CONFLICT ("id") DO UPDATE SET "id" = "excluded"."id", "name" = "excluded"."name", + "type" = "excluded"."type", "email" = "excluded"."email", "config" = "excluded"."config", + "tags" = "excluded"."tags", "version" = "excluded"."version" RETURNING *, "id" AS "$key", current_timestamp AS "$ver" `, ); @@ -84,22 +133,27 @@ describe('byId', () => { describe('byArg', () => { test('put', async () => { + const sqls = put( + [ + [ + { + $put: true, + id: 'post22', + type: 'post', + name: 'hello', + email: 'world', + }, + { email: 'world' }, + ], + ], + options, + ); expectSql( - put( - { - $put: true, - id: 'post22', - type: 'post', - name: 'hello', - email: 'world', - }, - { email: 'world' }, - options, - ), - sql`INSERT INTO "post" ("id", "type", "name", "email", "version") - VALUES (${'post22'}, ${'post'}, ${'hello'},${'world'}, default) - ON CONFLICT ("email") DO UPDATE SET ("id", "type", "name", "email", "version") - = (${'post22'}, ${'post'}, ${'hello'},${'world'}, default) + sqls[0], + sql`INSERT INTO "post" ("id", "name", "type", "email", "version") + VALUES (${'post22'}, ${'hello'}, ${'post'}, ${'world'}, default) + ON CONFLICT ("email") DO UPDATE SET "id" = "excluded"."id", "name" = "excluded"."name", + "type" = "excluded"."type", "email" = "excluded"."email", "version" = "excluded"."version" RETURNING *, ${'{"email":"world"}'}::jsonb AS "$key", current_timestamp AS "$ver",