From 16981a7bc867d4a699f33a147fd8e01415d9ef4c Mon Sep 17 00:00:00 2001 From: noamdoron Date: Wed, 18 Dec 2019 13:53:30 +0200 Subject: [PATCH 1/3] Support date and origin in stored logs --- taskLogger/StepLogger.js | 18 ++++++++++++++---- taskLogger/TaskLogger.js | 5 ++++- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/taskLogger/StepLogger.js b/taskLogger/StepLogger.js index 5b5a66c..bbb86a6 100644 --- a/taskLogger/StepLogger.js +++ b/taskLogger/StepLogger.js @@ -25,6 +25,8 @@ class StepLogger extends EventEmitter { this.name = name; this.fatal = false; + + this.origin = opts.origin || 'unknown'; } start(eventReporting) { @@ -63,25 +65,33 @@ class StepLogger extends EventEmitter { } write(message) { - this._reportLog(message); + this._reportLog(this.createLogContext(message)); this.updateLastUpdate(); } debug(message) { - this._reportLog(`${message}\r\n`); + this._reportLog(this.createLogContext(`${message}\r\n`)); this.updateLastUpdate(); } warn(message) { - this._reportLog(`\x1B[01;93m${message}\x1B[0m\r\n`); + this._reportLog(this.createLogContext(`\x1B[01;93m${message}\x1B[0m\r\n`)); this.updateLastUpdate(); } info(message) { - this._reportLog(`${message}\r\n`); + this._reportLog(this.createLogContext(`${message}\r\n`)); this.updateLastUpdate(); } + createLogContext(message) { + return { + message, + origin: this.origin, + timestamp: new Date().toISOString() + }; + } + finish(err, skip, finishTime) { if (this.status === STATUS.PENDING && !skip) { // do not close a pending step that should not be skipped return; diff --git a/taskLogger/TaskLogger.js b/taskLogger/TaskLogger.js index fd791f4..f0f69e0 100644 --- a/taskLogger/TaskLogger.js +++ b/taskLogger/TaskLogger.js @@ -31,6 +31,8 @@ class TaskLogger extends EventEmitter { this.fatal = false; this.finished = false; this.steps = {}; + + this.origin = opts.origin || 'unknown'; } create(name, resetStatus, runCreationLogic) { @@ -75,7 +77,8 @@ class TaskLogger extends EventEmitter { const step = new StepClass({ accountId: this.accountId, jobId: this.jobId, - name + name, + origin: this.origin }, { ...opts }, this); From 6067e15884dcccba790bb0dda97135dc9999d51a Mon Sep 17 00:00:00 2001 From: noamdoron Date: Mon, 6 Jan 2020 12:21:45 +0200 Subject: [PATCH 2/3] added firebase writable stream with rate limit options --- taskLogger/StepLogger.js | 9 +- taskLogger/TaskLogger.js | 2 + taskLogger/firebase/FirebaseWritableStream.js | 106 ++++++++++++++++++ taskLogger/firebase/StepLogger.js | 29 +++++ taskLogger/firebase/TaskLogger.js | 3 +- taskLogger/test/StepLogger.unit.spec.js | 2 +- 6 files changed, 148 insertions(+), 3 deletions(-) create mode 100644 taskLogger/firebase/FirebaseWritableStream.js diff --git a/taskLogger/StepLogger.js b/taskLogger/StepLogger.js index bbb86a6..d43231c 100644 --- a/taskLogger/StepLogger.js +++ b/taskLogger/StepLogger.js @@ -26,7 +26,9 @@ class StepLogger extends EventEmitter { this.fatal = false; - this.origin = opts.origin || 'unknown'; + const { origin, logWriteStrategyOptions } = opts; + this.origin = origin || 'unknown'; + this.logWriteStrategyOptions = logWriteStrategyOptions || { logWriteStrategy: 'singleLogWrite' }; } start(eventReporting) { @@ -69,6 +71,11 @@ class StepLogger extends EventEmitter { this.updateLastUpdate(); } + writeStream() { + return this.streamLog(); + // TODO: this.updateLastUpdate(); ? should we pass it into stream log? + } + debug(message) { this._reportLog(this.createLogContext(`${message}\r\n`)); this.updateLastUpdate(); diff --git a/taskLogger/TaskLogger.js b/taskLogger/TaskLogger.js index f0f69e0..9c235f4 100644 --- a/taskLogger/TaskLogger.js +++ b/taskLogger/TaskLogger.js @@ -32,6 +32,8 @@ class TaskLogger extends EventEmitter { this.finished = false; this.steps = {}; + this.rateLimitOptions = opts.rateLimitOptions; + this.origin = opts.origin || 'unknown'; } diff --git a/taskLogger/firebase/FirebaseWritableStream.js b/taskLogger/firebase/FirebaseWritableStream.js new file mode 100644 index 0000000..1ac8de7 --- /dev/null +++ b/taskLogger/firebase/FirebaseWritableStream.js @@ -0,0 +1,106 @@ +const { Writable } = require('stream'); +const _ = require('lodash'); + +const FIREBASE_MESSAGE_SIZE_LIMIT = 10 * 1024 * 1024; // 10 MB Maximum size of a string + + +class FirebaseWritableStream extends Writable { + constructor(_firebaseClient, options) { + super(options); + this._firebaseClient = _firebaseClient; + this._timeUnitLimitMs = options.timeUnitLimitMs; + this._debounceDelay = options.debounceDelay; + this._messageSizeLimitPerTimeUnit = options.messageSizeLimitPerTimeUnit; + this._batchSize = options.batchSize; + } + + _write(chunk, encoding, next) { + clearTimeout(this._debounceTimeout); + const newLogKey = `${this._firebaseClient.child('logs').push().key()}`; + const currentMessageSize = Buffer.byteLength(chunk); + + this._previousTimestamp = this._previousTimestamp || new Date().getTime(); + + const now = new Date().getTime(); + const msDelta = now - this._previousTimestamp; + console.log(`${new Date().toISOString()} streamLog.updateBatch: ms passed ${msDelta} / ${this._timeUnitLimitMs}`); + // minute as passed, reset current byte size + if (msDelta > this._timeUnitLimitMs) { + console.log(`${new Date().toISOString()} streamLog.updateBatch: ${this._timeUnitLimitMs} + has passed resetting current log byte size to 0`); + this._currentLogByteSize = 0; + this._previousTimestamp = now; + } + + // current logs size during timeUnit exceeds limit (1MB/Second) + if (currentMessageSize + this._currentLogByteSize > this._messageSizeLimitPerTimeUnit) { + console.log(`${new Date().toISOString()} streamLog.updateBatch: current log size + message + [${currentMessageSize + this._currentLogByteSize} / ${this._messageSizeLimitPerTimeUnit}] exceeded, flushing...`); + + this._firebaseClient.update(this._logsBatch, (err) => { + if (err) { + next(err); + return; + } + this._logBatchByteSize = 0; + this._logsBatch = Object.create(null); + + const waitMs = (this._timeUnitLimitMs - msDelta) + 5; + // lets wait till time unit limit will pass in order to continue + this._debounceTimeout = setTimeout(this._write.bind(this, chunk, encoding, next), waitMs); + console.log(`${new Date().toISOString()} streamLog.updateBatch: successfully flushed to firebase, + waiting ${waitMs} ms for logs byte size reset`); + }); + return; + } + + this._logBatchByteSize += currentMessageSize; + this._currentLogByteSize += this._logBatchByteSize; + this._logsBatch[`${newLogKey}`] = chunk.toString(); + console.log(`${new Date().toISOString()} streamLog.updateBatch: updated logs batch with new key + '${newLogKey}', current logs byte size ${this._currentLogByteSize / 1024} KB`); + + if (_.size(this._logsBatch) < this._batchSize) { + console.log(`${new Date().toISOString()} streamLog.updateBatch: batch capacity is still + available [${_.keys(this._logsBatch).length}/${this._batchSize}], resetting debounce flush and continue`); + this._setBatchFlushTimeout(this._debounceDelay); + next(); + return; + } + + console.log(`${new Date().toISOString()} streamLog.updateBatch: logs batch size has been met [${this._batchSize}] flushing...`); + this._firebaseClient.update(this._logsBatch, (err) => { + if (err) { + next(err); + return; + } + console.log(`${new Date().toISOString()} streamLog.updateBatch: flushed successfully, + resetting logs batch and debounce flush`); + this._logsBatch = Object.create(null); + this._setBatchFlushTimeout(this._debounceDelay); + next(); + }); + } + + _setBatchFlushTimeout(flushInterval) { + console.log(new Date().toISOString(), 'streamLog._setBatchFlushTimeout: setting flush timout', flushInterval); + this._debounceTimeout = setTimeout(() => { + if (_.isEmpty(this._logsBatch)) { + console.log(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: logs batch is empty, no update is required`); + return; + } + console.log(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: timeout + triggered, [${this._logBatchByteSize / 1024} KB /${FIREBASE_MESSAGE_SIZE_LIMIT / 1024} KB], flushing...`); + this._firebaseClient.update(this._logsBatch, (err) => { + if (err) { + console.error(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: flushed successfully`); + } else { + console.log(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: flushed successfully`); + } + this._logsBatch = Object.create(null); + }); + }, flushInterval); + } +} + +module.exports = FirebaseWritableStream; diff --git a/taskLogger/firebase/StepLogger.js b/taskLogger/firebase/StepLogger.js index 0ba5a84..9ec40cf 100644 --- a/taskLogger/firebase/StepLogger.js +++ b/taskLogger/firebase/StepLogger.js @@ -5,8 +5,10 @@ const CFError = require('cf-errors'); const { STATUS } = require('../enums'); const BaseStepLogger = require('../StepLogger'); const { wrapWithRetry } = require('../helpers'); +const FirebaseWritableStream = require('./FirebaseWritableStream'); class FirebaseStepLogger extends BaseStepLogger { + constructor(step, opts) { super(step, opts); @@ -21,6 +23,12 @@ class FirebaseStepLogger extends BaseStepLogger { this.stepUrl = `${this.baseUrl}/steps/${this.name}`; this.stepRef = new Firebase(this.stepUrl); + + this.rateLimitOptions = opts.rateLimitOptions; + + /* this._reportLog = this._logWriteStrategyFactory(); + this._logsBatch = null; + this._logBatchFlushTimeout = null; */ } async restore() { @@ -95,6 +103,27 @@ class FirebaseStepLogger extends BaseStepLogger { this.stepRef.child('logs').set({}); } + streamLog() { + return new FirebaseWritableStream(this.stepRef, this.rateLimitOptions); + } + + _setBatchFlushTimeout(flushInterval) { + this._logBatchFlushTimeout = setTimeout(() => { + this.stepRef.update(this._logsBatch, err => console.log(err)); + this._logsBatch = {}; + }, flushInterval); + } + + _logWriteStrategyFactory(logWriteStrategy) { + switch (logWriteStrategy) { + case 'batchLogWrite': + this._logsBatch = {}; + return this.batchLogWrite.bind(this); + case 'singleLogWrite': + default: return this.singleLogWrite.bind(this); + } + } + async delete() { return this.stepRef.remove(); } diff --git a/taskLogger/firebase/TaskLogger.js b/taskLogger/firebase/TaskLogger.js index 5a336e3..6b3d23b 100644 --- a/taskLogger/firebase/TaskLogger.js +++ b/taskLogger/firebase/TaskLogger.js @@ -179,7 +179,8 @@ class FirebaseTaskLogger extends BaseTaskLogger { const step = new StepLogger({ accountId: this.accountId, jobId: this.jobId, - name: key + name: key, + rateLimitOptions: this.rateLimitOptions }, { ...this.opts }); diff --git a/taskLogger/test/StepLogger.unit.spec.js b/taskLogger/test/StepLogger.unit.spec.js index da532e7..ca2a9d5 100644 --- a/taskLogger/test/StepLogger.unit.spec.js +++ b/taskLogger/test/StepLogger.unit.spec.js @@ -161,7 +161,7 @@ describe('Base StepLogger tests', () => { const stepLogger = getStepLoggerInstance(); const message = 'message'; stepLogger.write(message); - expect(stepLogger._reportLog).to.have.been.calledWith(message); + expect(stepLogger._reportLog).to.have.been.calledWithMatch({ message, origin: 'unknwon' }); expect(stepLogger.updateLastUpdate).to.have.been.calledWith(); }); }); From 58fc3ccd9adf4baf2349bcee19683685b7774371 Mon Sep 17 00:00:00 2001 From: noamdoron Date: Mon, 6 Jan 2020 12:27:26 +0200 Subject: [PATCH 3/3] made step logger writable stream singleton --- taskLogger/firebase/StepLogger.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/taskLogger/firebase/StepLogger.js b/taskLogger/firebase/StepLogger.js index 9ec40cf..91a2740 100644 --- a/taskLogger/firebase/StepLogger.js +++ b/taskLogger/firebase/StepLogger.js @@ -24,7 +24,7 @@ class FirebaseStepLogger extends BaseStepLogger { this.stepUrl = `${this.baseUrl}/steps/${this.name}`; this.stepRef = new Firebase(this.stepUrl); - this.rateLimitOptions = opts.rateLimitOptions; + this.writableStream = new FirebaseWritableStream(this.stepRef, opts.rateLimitOptions); /* this._reportLog = this._logWriteStrategyFactory(); this._logsBatch = null; @@ -104,7 +104,7 @@ class FirebaseStepLogger extends BaseStepLogger { } streamLog() { - return new FirebaseWritableStream(this.stepRef, this.rateLimitOptions); + return this.writableStream; } _setBatchFlushTimeout(flushInterval) {