Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Support for script operations in the update action of the client.helpers.bulk method #210

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add guidelines on installing yarn, dependencies; instructions on running ESLint in developer_guide ([#439](https://github.com/opensearch-project/opensearch-js/issues/435))
- Added pull_request_template ([440](https://github.com/opensearch-project/opensearch-js/pull/440))
- Added guide for Search ([#473](https://github.com/opensearch-project/opensearch-js/pull/489))
- Added support script operations in the update action of the client.helpers.bulk ([#210](https://github.com/opensearch-project/opensearch-js/issues/209))

### Dependencies

Expand Down
17 changes: 16 additions & 1 deletion lib/Helpers.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ interface UpdateActionOperation {
};
}

interface UpdateActionDocOperation {
doc: {
[key: string]: any;
};
}

interface UpdateActionScriptOperation {
script: {
[key: string]: any;
};
}

interface DeleteAction {
delete: {
_index: string;
Expand All @@ -111,7 +123,10 @@ interface DeleteAction {

type CreateAction = CreateActionOperation | [CreateActionOperation, unknown];
type IndexAction = IndexActionOperation | [IndexActionOperation, unknown];
type UpdateAction = [UpdateActionOperation, Record<string, any>];
nhtruong marked this conversation as resolved.
Show resolved Hide resolved
type UpdateAction = [
UpdateActionOperation,
UpdateActionDocOperation | UpdateActionScriptOperation
];
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction;
type Omit<T, K extends keyof T> = Pick<T, Exclude<keyof T, K>>;

Expand Down
5 changes: 1 addition & 4 deletions lib/Helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -539,10 +539,7 @@ class Helpers {
bulkBody.push(actionBody, payloadBody);
} else if (operation === 'update') {
actionBody = serializer.serialize(action);
payloadBody =
typeof chunk === 'string'
? `{"doc":${chunk}}`
: serializer.serialize({ doc: chunk, ...payload });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the reason why this breaks the bulk operation for regular use of this helper method?

(Also sorry the comment mentioning me got lost in the mail)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nhtruong
Yes. The old method only supported doc, so we modified it to support script as well, but that will not work with the previous method of input.

Copy link
Collaborator

@nhtruong nhtruong Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid breaking changes for existing users. Unless there are no other ways around this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huuyafwww have you found a work around to avoid breaking current use cases?

payloadBody = serializer.serialize(payload || chunk);
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody);
bulkBody.push(actionBody, payloadBody);
} else if (operation === 'delete') {
Expand Down
17 changes: 16 additions & 1 deletion test/types/helpers.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import {
ScrollSearchResponse,
OnDropDocument,
MsearchHelper,
UpdateActionDocOperation,
UpdateActionScriptOperation,
} from '../../lib/Helpers';
import { ApiResponse, ApiError, Context } from '../../lib/Transport';

Expand Down Expand Up @@ -107,7 +109,20 @@ expectError(
const options: BulkHelperOptions<Record<string, any>> = {
datasource: [],
onDocument(doc: Record<string, any>) {
return [{ update: { _index: 'test' } }, doc];
return [{ update: { _index: 'test' } }, doc as UpdateActionDocOperation];
},
};
expectAssignable<BulkHelperOptions<Record<string, any>>>(options);
}
// update
{
// without `:BulkHelperOptions` this test cannot pass
// but if we write these options inline inside
// a `.helper.bulk`, it works as expected
const options: BulkHelperOptions<Record<string, any>> = {
datasource: [],
onDocument(doc: Record<string, any>) {
return [{ update: { _index: 'test' } }, doc as UpdateActionScriptOperation];
},
};
expectAssignable<BulkHelperOptions<Record<string, any>>>(options);
Expand Down
221 changes: 217 additions & 4 deletions test/unit/helpers/bulk.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ test('bulk create', (t) => {
t.end();
});

test('bulk update', (t) => {
test('bulk update by doc operation', (t) => {
t.test('Should perform a bulk request', async (t) => {
let count = 0;
const MockConnection = connection.buildMockConnection({
Expand All @@ -971,14 +971,16 @@ test('bulk update', (t) => {
flushBytes: 1,
concurrency: 1,
onDocument() {
const currentId = id++;
return [
{
update: {
_index: 'test',
_id: id++,
_id: currentId,
},
},
{
doc: dataset[currentId],
doc_as_upsert: true,
},
];
Expand Down Expand Up @@ -1022,13 +1024,15 @@ test('bulk update', (t) => {
flushBytes: 1,
concurrency: 1,
onDocument() {
const currentId = id++;
return [
{
update: {
_index: 'test',
_id: id++,
_id: currentId,
},
},
{ doc: dataset[currentId] },
];
},
onDrop() {
Expand Down Expand Up @@ -1070,14 +1074,16 @@ test('bulk update', (t) => {
flushBytes: 1,
concurrency: 1,
onDocument() {
const currentId = id++;
return [
{
update: {
_index: 'test',
_id: id++,
_id: currentId,
},
},
{
doc: dataset[currentId],
doc_as_upsert: true,
},
];
Expand All @@ -1102,6 +1108,213 @@ test('bulk update', (t) => {
t.end();
});

test('bulk update by script operation', (t) => {
t.test('Should perform a bulk request', async (t) => {
let count = 0;
const MockConnection = connection.buildMockConnection({
onRequest(params) {
t.equal(params.path, '/_bulk');
t.match(params.headers, { 'content-type': 'application/x-ndjson' });
const [action, payload] = params.body.split('\n');
t.same(JSON.parse(action), { update: { _index: 'test', _id: count } });
t.same(JSON.parse(payload), {
script: {
source: 'ctx._source.user = params.user; ctx._source.age = params.age;',
lang: 'painless',
params: {
...dataset[count++],
},
},
scripted_upsert: true,
});
return { body: { errors: false, items: [{}] } };
},
});

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection,
});
let id = 0;
const result = await client.helpers.bulk({
datasource: dataset.slice(),
flushBytes: 1,
concurrency: 1,
onDocument() {
const currentId = id++;
return [
{
update: {
_index: 'test',
_id: currentId,
},
},
{
script: {
source: 'ctx._source.user = params.user; ctx._source.age = params.age;',
lang: 'painless',
params: {
...dataset[currentId],
},
},
scripted_upsert: true,
},
];
},
onDrop() {
t.fail('This should never be called');
},
});

t.type(result.time, 'number');
t.type(result.bytes, 'number');
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false,
});
});

t.test('Should perform a bulk request dataset as string)', async (t) => {
let count = 0;
const MockConnection = connection.buildMockConnection({
onRequest(params) {
t.equal(params.path, '/_bulk');
t.match(params.headers, { 'content-type': 'application/x-ndjson' });
const [action, payload] = params.body.split('\n');
t.same(JSON.parse(action), { update: { _index: 'test', _id: count } });
t.same(JSON.parse(payload), {
script: {
source: 'ctx._source.user = params.user; ctx._source.age = params.age;',
lang: 'painless',
params: {
...dataset[count++],
},
},
});
return { body: { errors: false, items: [{}] } };
},
});

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection,
});
let id = 0;
const result = await client.helpers.bulk({
datasource: dataset.map((d) => JSON.stringify(d)),
flushBytes: 1,
concurrency: 1,
onDocument() {
const currentId = id++;
return [
{
update: {
_index: 'test',
_id: currentId,
},
},
{
script: {
source: 'ctx._source.user = params.user; ctx._source.age = params.age;',
lang: 'painless',
params: {
...dataset[currentId],
},
},
},
];
},
onDrop() {
t.fail('This should never be called');
},
});

t.type(result.time, 'number');
t.type(result.bytes, 'number');
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false,
});
});

t.test('Should track the number of noop results', async (t) => {
let count = 0;
const MockConnection = connection.buildMockConnection({
onRequest(params) {
t.equal(params.path, '/_bulk');
t.match(params.headers, { 'content-type': 'application/x-ndjson' });
const [action, payload] = params.body.split('\n');
t.same(JSON.parse(action), { update: { _index: 'test', _id: count } });
t.same(JSON.parse(payload), {
script: {
source: 'ctx._source.user = params.user; ctx._source.age = params.age;',
lang: 'painless',
params: {
...dataset[count++],
},
},
scripted_upsert: true,
});
return { body: { errors: false, items: [{ update: { result: 'noop' } }] } };
},
});

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection,
});
let id = 0;
const result = await client.helpers.bulk({
datasource: dataset.slice(),
flushBytes: 1,
concurrency: 1,
onDocument() {
const currentId = id++;
return [
{
update: {
_index: 'test',
_id: currentId,
},
},
{
script: {
source: 'ctx._source.user = params.user; ctx._source.age = params.age;',
lang: 'painless',
params: {
...dataset[currentId],
},
},
scripted_upsert: true,
},
];
},
onDrop() {
t.fail('This should never be called');
},
});

t.type(result.time, 'number');
t.type(result.bytes, 'number');
t.match(result, {
total: 3,
successful: 3,
noop: 3,
retry: 0,
failed: 0,
aborted: false,
});
});

t.end();
});

test('bulk delete', (t) => {
t.test('Should perform a bulk request', async (t) => {
let count = 0;
Expand Down