Skip to content

Commit

Permalink
Merge pull request #150 from usegraffy/multi-insert
Browse files Browse the repository at this point in the history
Insert mulpitle rows with one statement, where possible
  • Loading branch information
aravindet authored Oct 27, 2023
2 parents 02a39ef + 6de0a48 commit 21b9e0c
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 123 deletions.
29 changes: 16 additions & 13 deletions src/core/shift.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import {
wrap,
unwrap,
remove,
merge,
mergeStreams,
decodeGraph,
decodeQuery,
encodeGraph,
encodePath,
encodeQuery,
encodeGraph,
decodeQuery,
decodeGraph,
finalize,
merge,
mergeStreams,
remove,
unwrap,
unwrapObject,
wrap,
wrapObject,
finalize,
} from '@graffy/common';
import { makeStream } from '@graffy/stream';

Expand All @@ -32,13 +32,13 @@ export function wrapProvider(fn, decodedPath, isRead) {
const remainingPayload = remove(payload, path) || [];

// This next function is offered to the provider function.
async function shiftedNext(porcelainNextPayload) {
async function shiftedNext(porcelainNextPayload, nextOptions) {
nextCalled = true;
const nextPayload = encodePayload(
wrapObject(porcelainNextPayload, decodedPath),
);
if (remainingPayload.length) merge(nextPayload, remainingPayload);
const nextResult = await next(nextPayload);
const nextResult = await next(nextPayload, nextOptions);

// Remember the next() results that are not returned to this provider.
// These will be merged into the result later.
Expand Down Expand Up @@ -83,7 +83,10 @@ export function shiftGen(fn, path) {
const remainingPayload = remove(payload, path) || [];

// TODO: This should probably use makeStream and propagate returns.
const shiftedNext = async function* shiftedNextFn(unwrappedNextPayload) {
const shiftedNext = async function* shiftedNextFn(
unwrappedNextPayload,
nextOptions,
) {
nextCalled = true;
const nextPayload = wrap(unwrappedNextPayload, path);
if (remainingPayload.length) merge(nextPayload, remainingPayload);
Expand All @@ -94,7 +97,7 @@ export function shiftGen(fn, path) {
pushRemaining = push;
});

for await (const value of next(nextPayload)) {
for await (const value of next(nextPayload, nextOptions)) {
const unwrappedValue = unwrap(value, path);
const remainingValue = remove(value, path);
if (remainingValue) pushRemaining(remainingValue);
Expand Down
18 changes: 16 additions & 2 deletions src/core/test/porcelain.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { jest } from '@jest/globals';
import Graffy from '../Graffy.js';
import GraffyFill from '@graffy/fill';
import { page, ref } from '@graffy/testing';
import { jest } from '@jest/globals';
import Graffy from '../Graffy.js';

test('Porcelain read', async () => {
const store = new Graffy();
Expand Down Expand Up @@ -403,3 +403,17 @@ test('onReadWithNext', async () => {
},
});
});

test('modified_next_options', async () => {
// @ts-ignore bad jest mockResolvedValue definitions?
const mockOnRead = jest.fn().mockResolvedValue({ 123: { name: 'Alice' } });
const query = { 123: { name: true } };
const store = new Graffy();
store.use(GraffyFill());
store.onRead('user', (query, _options, next) => {
return next(query, { bar: true });
});
store.onRead('user', mockOnRead);
await store.read('user', query, { foo: 2 });
expect(mockOnRead).toBeCalledWith(query, { bar: true }, expect.any(Function));
});
57 changes: 34 additions & 23 deletions src/pg/Db.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import pg from 'pg';
import sqlTag from 'sql-template-tag';
import {
isPlainObject,
cmp,
decodeArgs,
wrap,
unwrap,
finalize,
merge,
wrapObject,
decodeGraph,
decodeQuery,
encodeGraph,
encodePath,
finalize,
isEmpty,
isPlainObject,
isRange,
decodeGraph,
merge,
mergeObject,
decodeQuery,
cmp,
encodePath,
unwrap,
wrap,
wrapObject,
} from '@graffy/common';
import { selectByArgs, selectByIds, put, patch, del } from './sql/index.js';
import debug from 'debug';
import pg from 'pg';
import sqlTag from 'sql-template-tag';
import { del, patch, put, selectByArgs, selectByIds } from './sql/index.js';
const log = debug('graffy:pg:db');
const { Pool, Client, types } = pg;

Expand Down Expand Up @@ -88,7 +88,7 @@ export default class Db {
if (!res.rowCount) {
throw Error(`pg.nothing_written ${sql.text} with ${sql.values}`);
}
return res.rows[0];
return res.rows;
}

/*
Expand Down Expand Up @@ -183,11 +183,11 @@ export default class Db {
selectByIds(Object.keys(idQueries), null, tableOptions),
tableOptions,
);
result.forEach((object) => {
for (const object of result) {
const wrappedGraph = encodeGraph(wrapObject(object, rawPrefix));
log('getByIds', wrappedGraph);
merge(results, wrappedGraph);
});
}
};

const query = unwrap(rootQuery, prefix);
Expand Down Expand Up @@ -227,34 +227,45 @@ export default class Db {

const change = unwrap(rootChange, prefix);

const sqls = change.map((node) => {
const puts = [];
const sqls = [];
for (const node of change) {
const arg = decodeArgs(node);

if (isRange(node)) {
if (cmp(node.key, node.end) === 0) return del(arg, tableOptions);
if (cmp(node.key, node.end) === 0) {
log('delete', node);
sqls.push(del(arg, tableOptions));
continue;
}
throw Error('pg_write.write_range_unsupported');
}

const object = decodeGraph(node.children);
if (isPlainObject(arg)) {
mergeObject(object, arg);
} else {
object.id = arg;
object[tableOptions.idCol] = arg;
}

if (object.$put && object.$put !== true) {
throw Error('pg_write.partial_put_unsupported');
}

return object.$put
? put(object, arg, tableOptions)
: patch(object, arg, tableOptions);
});
if (object.$put) {
puts.push([object, arg]);
} else {
sqls.push(patch(object, arg, tableOptions));
}
}

if (puts.length) sqls.push(...put(puts, tableOptions));

const result = [];
await Promise.all(
sqls.map((sql) =>
this.writeSql(sql, tableOptions).then((object) => {
log('returned_object_wrapped', wrapObject(object, rawPrefix));
merge(result, encodeGraph(wrapObject(object, rawPrefix)));
}),
),
Expand Down
50 changes: 40 additions & 10 deletions src/pg/sql/clauses.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sql, { Sql, raw, join, empty } from 'sql-template-tag';
import { isEmpty } from '@graffy/common';
import sql, { Sql, empty, join, raw } from 'sql-template-tag';

/*
Important: This function assumes that the object's keys are from
Expand Down Expand Up @@ -110,19 +110,49 @@ function castValue(value, type, name, isPut) {
return value;
}

export const getInsert = (row, options) => {
export const getInsert = (rows, options) => {
const { verCol, schema } = options;
const cols = [];
const colSqls = [];
const colIx = {};
const colUsed = [];

for (const col of Object.keys(options.schema.types)) {
colIx[col] = cols.length;
colUsed[cols.length] = false;
cols.push(col);
colSqls.push(sql`"${raw(col)}"`);
}

colUsed[colIx[verCol]] = true;

const vals = [];
for (const row of rows) {
const rowVals = Array(cols.length).fill(sql`default`);

for (const col of cols) {
if (col === verCol || !(col in row)) continue;
const ix = colIx[col];
colUsed[ix] = true;
rowVals[ix] = castValue(row[col], schema.types[col], col, row.$put);
}

Object.entries(row)
.filter(([col]) => col !== options.verCol && col[0] !== '$')
.concat([[options.verCol, sql`default`]])
.forEach(([col, val]) => {
cols.push(sql`"${raw(col)}"`);
vals.push(castValue(val, options.schema.types[col], col, row.$put));
});
vals.push(rowVals);
}

const isUsed = (_, ix) => colUsed[ix];

return { cols: join(cols, ', '), vals: join(vals, ', ') };
return {
cols: join(colSqls.filter(isUsed), ', '),
vals: join(
vals.map((rowVals) => sql`(${join(rowVals.filter(isUsed), ', ')})`),
', ',
),
updates: join(
colSqls.map((col, ix) => sql`${col} = "excluded".${col}`).filter(isUsed),
', ',
),
};
};

export const getUpdates = (row, options) => {
Expand Down
50 changes: 32 additions & 18 deletions src/pg/sql/upsert.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import sql, { raw, join } from 'sql-template-tag';
import { isPlainObject } from '@graffy/common';
import sql, { join, raw } from 'sql-template-tag';
import { getInsert, getSelectCols, getUpdates } from './clauses.js';
import getArgSql from './getArgSql.js';
import { getIdMeta } from './getMeta.js';
import { getInsert, getSelectCols, getUpdates } from './clauses.js';

function getSingleSql(arg, options) {
const { table, idCol } = options;
Expand Down Expand Up @@ -33,34 +33,48 @@ export function patch(object, arg, options) {
const { table } = options;
const { where, meta } = getSingleSql(arg, options);

const row = object; // objectToRow(object, options);
const row = object;

return sql`
UPDATE "${raw(table)}" SET ${getUpdates(row, options)}
WHERE ${where}
RETURNING ${getSelectCols(options)}, ${meta}`;
}

export function put(object, arg, options) {
export function put(puts, options) {
const { idCol, table } = options;
const row = object; // objectToRow(object, options);
const sqls = [];

const addSql = (rows, meta, conflictTarget) => {
const { cols, vals, updates } = getInsert(rows, options);
sqls.push(sql`
INSERT INTO "${raw(table)}" (${cols}) VALUES ${vals}
ON CONFLICT (${conflictTarget}) DO UPDATE SET ${updates}
RETURNING ${getSelectCols(options)}, ${meta}`);
};

let meta;
let conflictTarget;
if (isPlainObject(arg)) {
({ meta } = getArgSql(arg, options));
conflictTarget = join(Object.keys(arg).map((col) => sql`"${raw(col)}"`));
} else {
meta = getIdMeta(options);
conflictTarget = sql`"${raw(idCol)}"`;
const idRows = [];
for (const put of puts) {
const [row, arg] = put;
if (!isPlainObject(arg)) {
idRows.push(row);
continue;
}

const { meta } = getArgSql(arg, options);
const conflictTarget = join(
Object.keys(arg).map((col) => sql`"${raw(col)}"`),
);
addSql([row], meta, conflictTarget);
}

const { cols, vals } = getInsert(row, options);
if (idRows.length) {
const meta = getIdMeta(options);
const conflictTarget = sql`"${raw(idCol)}"`;
addSql(idRows, meta, conflictTarget);
}

return sql`
INSERT INTO "${raw(table)}" (${cols}) VALUES (${vals})
ON CONFLICT (${conflictTarget}) DO UPDATE SET (${cols}) = (${vals})
RETURNING ${getSelectCols(options)}, ${meta}`;
return sqls;
}

export function del(arg, options) {
Expand Down
Loading

0 comments on commit 21b9e0c

Please sign in to comment.