Skip to content

Commit

Permalink
#1 array decode
Browse files Browse the repository at this point in the history
  • Loading branch information
exe-dealer committed Mar 24, 2019
1 parent 59c28c3 commit 09c9538
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 48 deletions.
4 changes: 2 additions & 2 deletions doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
value: 'hello',
}],
limit: 10,
stdin: fs.createFileStream(...)
stdin: fs.createReadStream(...),
}, {
// creates prepared statement
op: 'parse',
Expand Down Expand Up @@ -66,7 +66,7 @@
// if limit is exceeded then result will have `suspeded: true` flag
limit: 10,
// optional stdin source stream for `COPY target FROM STDIN` queries
stdin: fs.createFileStream(...),
stdin: fs.createReadStream(...),
}],
});
```
90 changes: 86 additions & 4 deletions lib/datatypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ define({
decodeBin: buf => buf,
decodeText: str => Buffer.from(str.slice(2 /* skip \x */), 'hex'),
encode: buf => Buffer.isBuffer(buf) ? buf : Buffer.from(buf),
encodeBin: true,
});

define({
Expand Down Expand Up @@ -102,15 +103,96 @@ define({
encode: String,
});

function define({ name, id, arrayid, decodeText, decodeBin, encode }) {
function define({ name, id, arrayid, decodeText, decodeBin, encode, encodeBin }) {
module.exports.datatypesByName[name] =
module.exports.datatypesById[id] = { id, decodeText, decodeBin, encode };
module.exports.datatypesByName[name + '[]'] =
module.exports.datatypesById[arrayid] = {
id: arrayid,
decodeBin: noop => noop,
decodeText: noop => noop,
encode: arr => JSON.stringify(arr.map(encode)).replace(/^\[(.*)]$/, '{$1}'),
decodeBin: buf => decodeBinArray(buf, decodeBin),
decodeText: str => decodeTextArray(str, decodeText),
encode: encodeBin
? arr => encodeBinArray(arr, encode, id)
: arr => encodeTextArray(arr, encode),
};
}

function decodeTextArray(inp, decodeElem) {
inp = inp.replace(/^\[.+=/, ''); // skip dimensions
const jsonArray = inp.replace(/{|}|,|"(?:[^"\\]|\\.)*"|[^,}]+/gy, token => (
token == '{' ? '[' :
token == '}' ? ']' :
token == 'NULL' ? 'null' :
token == ',' || token[0] == '"' ? token :
JSON.stringify(token)
));
return JSON.parse(jsonArray,
(_, elem) => typeof elem == 'string' ? decodeElem(elem) : elem
);
}

function decodeBinArray(buf, decodeElem) {
const ndim = buf.readInt32BE();
let cardinality = 0;
for (let di = ndim - 1; di >= 0; di--) {
cardinality += buf.readInt32BE(12 + di * 8);
}
let result = Array(cardinality);
for (let pos = 12 + ndim * 8, i = 0; pos < buf.length; i++) {
const len = buf.readInt32BE(pos);
pos += 4;
if (len < 0) {
result[i] = null;
} else {
result[i] = decodeElem(buf.slice(pos, pos += len));
}
}
for (let di = ndim - 1; di > 0; di--) {
const dimlen = buf.readInt32BE(12 + di * 8);
const reshaped = Array(result.length / dimlen);
for (let i = 0; i < reshaped.length; i++) {
reshaped[i] = result.slice(i * dimlen, (i + 1) * dimlen);
}
result = reshaped;
}
return result;
}

// FIXME: one dimension only
function encodeTextArray(arr, encodeElem) {
return JSON.stringify(arr.map(encodeElem)).replace(/^\[(.*)]$/, '{$1}');
}

// FIXME: one dimension only
function encodeBinArray(array, encodeElem, elemTypeid) {
const ndim = 1;
const encodedArray = Array(array.length);
let size = 4 + 4 + 4 + ndim * (4 + 4) + array.length * 4;
let hasNull = 0;
for (let i = 0; i < array.length; i++) {
if (array[i] == null) {
hasNull = 1;
} else {
const elBuf = encodeElem(array[i]);
size += elBuf.length;
encodedArray[i] = elBuf;
}
}
const result = Buffer.allocUnsafe(size);
let pos = 0;
pos = result.writeInt32BE(1, pos);
pos = result.writeInt32BE(hasNull, pos);
pos = result.writeInt32BE(elemTypeid, pos);
pos = result.writeInt32BE(array.length, pos);
const lb = 1;
pos = result.writeInt32BE(lb, pos);
for (const elBuf of encodedArray) {
if (elBuf) {
pos = result.writeInt32BE(elBuf.length, pos);
pos += elBuf.copy(result, pos);
} else {
pos += result.writeInt32BE(-1, pos);
}
}
return result;
}
88 changes: 46 additions & 42 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,18 @@ it('row decode simple', async () => {
'\\xdeadbeaf'::bytea,
42::int2, -42::int2,
42::int4, -42::int4,
42::int8, -42::int8,
36.6::float4, -36.6::float4,
36.6::float8, -36.6::float8,
jsonb_build_object('hello', 'world', 'num', 1),
json_build_object('hello', 'world', 'num', 1),
'1/2'::pg_lsn
'1/2'::pg_lsn,
ARRAY[1, 2, 3]::int[],
ARRAY[1, 2, 3]::text[],
ARRAY['"quoted"', '{string}', '"{-,-}"'],
ARRAY[[1, 2], [3, 4]],
'[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}'::int[],
ARRAY[1, NULL, 2]
`);
assert.deepStrictEqual(rows, [[
null,
Expand All @@ -231,23 +238,18 @@ it('row decode simple', async () => {
Buffer.from('deadbeaf', 'hex'),
42, -42,
42, -42,
BigInt(42), BigInt(-42),
36.6, -36.6,
36.6, -36.6,
{ hello: 'world', num: 1 },
{ hello: 'world', num: 1 },
'00000001/00000002',
]]);
});

xit('array decode simple', async () => {
const { rows } = await pg.query(/*sql*/ `
SELECT
array[true, false],
array[['a', 'b'], ['c', 'd']]
`);
assert.deepStrictEqual(rows, [[
[true, false],
[['a', 'b'], ['c', 'd']],
[1, 2, 3],
['1', '2', '3'],
['"quoted"', '{string}', '"{-,-}"'],
[[1, 2], [3, 4]],
[[[1, 2, 3], [4, 5, 6]]],
[1, null, 2],
]]);
});

Expand All @@ -261,9 +263,18 @@ it('row decode extended', async () => {
'\\xdeadbeaf'::bytea,
42::int2, -42::int2,
42::int4, -42::int4,
42::int8, -42::int8,
36.599998474121094::float4, -36.599998474121094::float4,
36.6::float8, -36.6::float8,
'1/2'::pg_lsn
jsonb_build_object('hello', 'world', 'num', 1),
json_build_object('hello', 'world', 'num', 1),
'1/2'::pg_lsn,
ARRAY[1, 2, 3]::int[],
ARRAY[1, 2, 3]::text[],
ARRAY['"quoted"', '{string}', '"{-,-}"'],
ARRAY[[1, 2], [3, 4]],
'[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}'::int[],
ARRAY[1, NULL, 2]
`,
}],
});
Expand All @@ -273,35 +284,28 @@ it('row decode extended', async () => {
Buffer.from('deadbeaf', 'hex'),
42, -42,
42, -42,
BigInt(42), BigInt(-42),
36.599998474121094, -36.599998474121094,
36.6, -36.6,
{ hello: 'world', num: 1 },
{ hello: 'world', num: 1 },
'00000001/00000002',
[1, 2, 3],
['1', '2', '3'],
['"quoted"', '{string}', '"{-,-}"'],
[[1, 2], [3, 4]],
[[[1, 2, 3], [4, 5, 6]]],
[1, null, 2],
]]);
});

it('bigint decode extended', async () => {
const { rows } = await pg.query({
extended: true,
script: [{
sql: /*sql*/ `SELECT 42::int8, -42::int8`,
}],
});
assert.deepStrictEqual(rows, [[BigInt(42), BigInt(-42)]]);
});

it('listen/notify', async () => {
const conn = await pg.connect(process.env.POSTGRES);
try {
const response = Promise.race([
new Promise((_resolve, reject) => setTimeout(
() => reject(Error('no notification received in 1s')),
1000,
)),
new Promise((resolve, reject) => {
conn.on('notify:test', ({ payload }) => resolve(payload));
conn.on('error', reject);
}),
]);
const response = new Promise((resolve, reject) => {
conn.on('notify:test', ({ payload }) => resolve(payload));
conn.on('error', reject);
});
await conn.query(/*sql*/ `LISTEN test`);
psql(/*sql*/ `NOTIFY test, 'hello'`);
assert.deepStrictEqual(await response, 'hello');
Expand Down Expand Up @@ -536,7 +540,8 @@ it('param explicit type', async () => {
SELECT pg_typeof($1)::text, $1,
pg_typeof($2)::text, $2,
pg_typeof($3)::text, $3->>'key',
pg_typeof($4)::text, array_to_string($4, ',')
pg_typeof($4)::text, $4::text,
pg_typeof($5)::text, $5::text
`,
params: [{
type: 'int4',
Expand All @@ -552,14 +557,18 @@ it('param explicit type', async () => {
}, {
type: 'text[]',
value: ['1', '2', '3'],
}, {
type: 'bytea[]',
value: ['x', 'y', 'z'],
}],
}],
});
assert.deepStrictEqual(row, [
'integer', 1,
'boolean', true,
'jsonb', 'hello',
'text[]', '1,2,3',
'text[]', '{1,2,3}',
'bytea[]', '{"\\\\x78","\\\\x79","\\\\x7a"}',
]);
});

Expand Down Expand Up @@ -681,12 +690,7 @@ it('idle timeout', async () => {
idleTimeout: 200,
});
try {
await Promise.race([
new Promise(resolve => conn.on('close', resolve)),
new Promise((_, reject) => setTimeout(reject, 400, Error(
'Connection was not closed after idleTimeout',
))),
]);
await new Promise(resolve => conn.on('close', resolve));
} finally {
conn.end(); // close manually if fail
}
Expand Down

0 comments on commit 09c9538

Please sign in to comment.