Skip to content

Commit

Permalink
Merge pull request #163 from TomChv/master
Browse files Browse the repository at this point in the history
Handle invalid Transport and fix problem on async loop during error
  • Loading branch information
vanthome authored Nov 3, 2020
2 parents 601dddf + ad9a992 commit 1fc1dfc
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 19 deletions.
35 changes: 28 additions & 7 deletions bulk_writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ BulkWriter.prototype.append = function append(index, type, doc) {

BulkWriter.prototype.write = function write(body) {
const thiz = this;

debug('writing to ES');
return this.client
.bulk({
Expand All @@ -141,7 +142,20 @@ BulkWriter.prototype.write = function write(body) {
.catch((e) => {
// rollback this.bulk array
const newBody = [];
for (let i = 0; i < body.length; i += 2) {
body.forEach((chunk, index, chunks) => {
const { attempts } = chunk;
if (attempts < thiz.retryLimit) {
newBody.push({
index: chunk.index._index,
type: chunk.index._type,
doc: chunks[index + 1],
attempts: attempts + 1,
});
} else {
debug('retry attempts exceeded');
}
});
/* for (let i = 0; i < body.length; i += 2) {
// eslint-disable-next-line prefer-destructuring
const attempts = body[i].attempts;
if (attempts < thiz.retryLimit) {
Expand All @@ -154,7 +168,7 @@ BulkWriter.prototype.write = function write(body) {
} else {
debug('retry attempts exceeded');
}
}
} */

const lenSum = thiz.bulk.length + newBody.length;
if (thiz.options.bufferLimit && lenSum >= thiz.options.bufferLimit) {
Expand All @@ -166,8 +180,15 @@ BulkWriter.prototype.write = function write(body) {
}
debug('error occurred during writing', e);
this.stop();
this.checkEsConnection();
this.checkEsConnection()
.catch((err) => thiz.transport.emit('error', err));
thiz.transport.emit('warning', e);

thiz.bulk.forEach((bulk) => {
if (bulk.attempts === thiz.retryLimit) {
this.transport.emit('error', e);
}
});
});
};

Expand All @@ -176,11 +197,11 @@ BulkWriter.prototype.checkEsConnection = function checkEsConnection() {
thiz.esConnection = false;

const operation = retry.operation({
forever: true,
retries: 1,
forever: false,
retries: 3,
factor: 1,
minTimeout: 1 * 1000,
maxTimeout: 60 * 1000,
minTimeout: 1000,
maxTimeout: 10 * 1000,
randomize: false
});
return new Promise((fulfill, reject) => {
Expand Down
2 changes: 1 addition & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Agent from 'elastic-apm-node';
import * as Agent from 'elastic-apm-node';
import { Client, ClientOptions, ApiResponse } from '@elastic/elasticsearch';
import TransportStream = require('winston-transport');

Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 9 additions & 10 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,19 @@ describe('a buffering logger', () => {
const transport = logger.transports[0];
transport.bulkWriter.bulk.should.have.lengthOf(0);

// mock client.bulk to throw an error
transport.client.bulk = () => {
return Promise.reject(new Error('Test Error'));
};
logger.info('test');

logger.on('error', (err) => {
should.exist(err);
transport.bulkWriter.bulk.should.have.lengthOf(1);
// manually clear the buffer of stop transport from attempting to flush logs
transport.bulkWriter.bulk = [];
done();
});
// mock client.bulk to throw an error
logger.info('test');
transport.client.bulk = () => {
return Promise.reject(new Error('Test Error'));
};
logger.info('test');
logger.end();
});

Expand Down Expand Up @@ -209,13 +209,12 @@ describe('a non buffering logger', () => {
});
});

/*
describe('a defective log transport', () => {
it('emits an error', function (done) {
this.timeout(40000);
const transport = new (winston.transports.Elasticsearch)({
clientOpts: {
host: 'http://does-not-exist.test:9200',
node: 'http://does-not-exist.test:9200',
log: NullLogger
}
});
Expand All @@ -225,15 +224,15 @@ describe('a defective log transport', () => {
done();
});

// eslint-disable-next-line no-unused-vars
const defectiveLogger = winston.createLogger({
transports: [
transport
]
});

defectiveLogger.info('test');
});
});
*/

// Manual test which allows to test re-connection of the ES client for unavailable ES instance.
// Must be combined with --no-timeouts option for mocha
Expand Down

0 comments on commit 1fc1dfc

Please sign in to comment.