Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support date and origin in stored logs #43

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions taskLogger/StepLogger.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class StepLogger extends EventEmitter {
this.name = name;

this.fatal = false;

const { origin, logWriteStrategyOptions } = opts;
this.origin = origin || 'unknown';
this.logWriteStrategyOptions = logWriteStrategyOptions || { logWriteStrategy: 'singleLogWrite' };
Copy link
Contributor Author

@noamt-codefresh noamt-codefresh Jan 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove line 31 ( this.logWriteStrategyOptions = logWriteStrategyOptions || { logWriteStrategy: 'singleLogWrite' };)

}

start(eventReporting) {
Expand Down Expand Up @@ -63,25 +67,38 @@ class StepLogger extends EventEmitter {
}

write(message) {
this._reportLog(message);
this._reportLog(this.createLogContext(message));
this.updateLastUpdate();
}

writeStream() {
return this.streamLog();
// TODO: this.updateLastUpdate(); ? should we pass it into stream log?
}

debug(message) {
this._reportLog(`${message}\r\n`);
this._reportLog(this.createLogContext(`${message}\r\n`));
this.updateLastUpdate();
}

warn(message) {
this._reportLog(`\x1B[01;93m${message}\x1B[0m\r\n`);
this._reportLog(this.createLogContext(`\x1B[01;93m${message}\x1B[0m\r\n`));
this.updateLastUpdate();
}

info(message) {
this._reportLog(`${message}\r\n`);
this._reportLog(this.createLogContext(`${message}\r\n`));
this.updateLastUpdate();
}

createLogContext(message) {
return {
message,
origin: this.origin,
timestamp: new Date().toISOString()
};
}

finish(err, skip, finishTime) {
if (this.status === STATUS.PENDING && !skip) { // do not close a pending step that should not be skipped
return;
Expand Down
7 changes: 6 additions & 1 deletion taskLogger/TaskLogger.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class TaskLogger extends EventEmitter {
this.fatal = false;
this.finished = false;
this.steps = {};

this.rateLimitOptions = opts.rateLimitOptions;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove line 35 implemented in another change


this.origin = opts.origin || 'unknown';
}

create(name, resetStatus, runCreationLogic) {
Expand Down Expand Up @@ -75,7 +79,8 @@ class TaskLogger extends EventEmitter {
const step = new StepClass({
accountId: this.accountId,
jobId: this.jobId,
name
name,
origin: this.origin
}, {
...opts
}, this);
Expand Down
106 changes: 106 additions & 0 deletions taskLogger/firebase/FirebaseWritableStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
const { Writable } = require('stream');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete file - implemented in another change

const _ = require('lodash');

const FIREBASE_MESSAGE_SIZE_LIMIT = 10 * 1024 * 1024; // 10 MB Maximum size of a string


class FirebaseWritableStream extends Writable {
constructor(_firebaseClient, options) {
super(options);
this._firebaseClient = _firebaseClient;
this._timeUnitLimitMs = options.timeUnitLimitMs;
this._debounceDelay = options.debounceDelay;
this._messageSizeLimitPerTimeUnit = options.messageSizeLimitPerTimeUnit;
this._batchSize = options.batchSize;
}

_write(chunk, encoding, next) {
clearTimeout(this._debounceTimeout);
const newLogKey = `${this._firebaseClient.child('logs').push().key()}`;
const currentMessageSize = Buffer.byteLength(chunk);

this._previousTimestamp = this._previousTimestamp || new Date().getTime();

const now = new Date().getTime();
const msDelta = now - this._previousTimestamp;
console.log(`${new Date().toISOString()} streamLog.updateBatch: ms passed ${msDelta} / ${this._timeUnitLimitMs}`);
// minute as passed, reset current byte size
if (msDelta > this._timeUnitLimitMs) {
console.log(`${new Date().toISOString()} streamLog.updateBatch: ${this._timeUnitLimitMs}
has passed resetting current log byte size to 0`);
this._currentLogByteSize = 0;
this._previousTimestamp = now;
}

// current logs size during timeUnit exceeds limit (1MB/Second)
if (currentMessageSize + this._currentLogByteSize > this._messageSizeLimitPerTimeUnit) {
console.log(`${new Date().toISOString()} streamLog.updateBatch: current log size + message
[${currentMessageSize + this._currentLogByteSize} / ${this._messageSizeLimitPerTimeUnit}] exceeded, flushing...`);

this._firebaseClient.update(this._logsBatch, (err) => {
if (err) {
next(err);
return;
}
this._logBatchByteSize = 0;
this._logsBatch = Object.create(null);

const waitMs = (this._timeUnitLimitMs - msDelta) + 5;
// lets wait till time unit limit will pass in order to continue
this._debounceTimeout = setTimeout(this._write.bind(this, chunk, encoding, next), waitMs);
console.log(`${new Date().toISOString()} streamLog.updateBatch: successfully flushed to firebase,
waiting ${waitMs} ms for logs byte size reset`);
});
return;
}

this._logBatchByteSize += currentMessageSize;
this._currentLogByteSize += this._logBatchByteSize;
this._logsBatch[`${newLogKey}`] = chunk.toString();
console.log(`${new Date().toISOString()} streamLog.updateBatch: updated logs batch with new key
'${newLogKey}', current logs byte size ${this._currentLogByteSize / 1024} KB`);

if (_.size(this._logsBatch) < this._batchSize) {
console.log(`${new Date().toISOString()} streamLog.updateBatch: batch capacity is still
available [${_.keys(this._logsBatch).length}/${this._batchSize}], resetting debounce flush and continue`);
this._setBatchFlushTimeout(this._debounceDelay);
next();
return;
}

console.log(`${new Date().toISOString()} streamLog.updateBatch: logs batch size has been met [${this._batchSize}] flushing...`);
this._firebaseClient.update(this._logsBatch, (err) => {
if (err) {
next(err);
return;
}
console.log(`${new Date().toISOString()} streamLog.updateBatch: flushed successfully,
resetting logs batch and debounce flush`);
this._logsBatch = Object.create(null);
this._setBatchFlushTimeout(this._debounceDelay);
next();
});
}

_setBatchFlushTimeout(flushInterval) {
console.log(new Date().toISOString(), 'streamLog._setBatchFlushTimeout: setting flush timout', flushInterval);
this._debounceTimeout = setTimeout(() => {
if (_.isEmpty(this._logsBatch)) {
console.log(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: logs batch is empty, no update is required`);
return;
}
console.log(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: timeout
triggered, [${this._logBatchByteSize / 1024} KB /${FIREBASE_MESSAGE_SIZE_LIMIT / 1024} KB], flushing...`);
this._firebaseClient.update(this._logsBatch, (err) => {
if (err) {
console.error(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: flushed successfully`);
} else {
console.log(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: flushed successfully`);
}
this._logsBatch = Object.create(null);
});
}, flushInterval);
}
}

module.exports = FirebaseWritableStream;
29 changes: 29 additions & 0 deletions taskLogger/firebase/StepLogger.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ const CFError = require('cf-errors');
const { STATUS } = require('../enums');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this file - implemented in another change

const BaseStepLogger = require('../StepLogger');
const { wrapWithRetry } = require('../helpers');
const FirebaseWritableStream = require('./FirebaseWritableStream');

class FirebaseStepLogger extends BaseStepLogger {

constructor(step, opts) {
super(step, opts);

Expand All @@ -21,6 +23,12 @@ class FirebaseStepLogger extends BaseStepLogger {

this.stepUrl = `${this.baseUrl}/steps/${this.name}`;
this.stepRef = new Firebase(this.stepUrl);

this.writableStream = new FirebaseWritableStream(this.stepRef, opts.rateLimitOptions);

/* this._reportLog = this._logWriteStrategyFactory();
this._logsBatch = null;
this._logBatchFlushTimeout = null; */
}

async restore() {
Expand Down Expand Up @@ -95,6 +103,27 @@ class FirebaseStepLogger extends BaseStepLogger {
this.stepRef.child('logs').set({});
}

streamLog() {
return this.writableStream;
}

_setBatchFlushTimeout(flushInterval) {
this._logBatchFlushTimeout = setTimeout(() => {
this.stepRef.update(this._logsBatch, err => console.log(err));
this._logsBatch = {};
}, flushInterval);
}

_logWriteStrategyFactory(logWriteStrategy) {
switch (logWriteStrategy) {
case 'batchLogWrite':
this._logsBatch = {};
return this.batchLogWrite.bind(this);
case 'singleLogWrite':
default: return this.singleLogWrite.bind(this);
}
}

async delete() {
return this.stepRef.remove();
}
Expand Down
3 changes: 2 additions & 1 deletion taskLogger/firebase/TaskLogger.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ class FirebaseTaskLogger extends BaseTaskLogger {
const step = new StepLogger({
accountId: this.accountId,
jobId: this.jobId,
name: key
name: key,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

rateLimitOptions: this.rateLimitOptions
}, {
...this.opts
});
Expand Down
2 changes: 1 addition & 1 deletion taskLogger/test/StepLogger.unit.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ describe('Base StepLogger tests', () => {
const stepLogger = getStepLoggerInstance();
const message = 'message';
stepLogger.write(message);
expect(stepLogger._reportLog).to.have.been.calledWith(message);
expect(stepLogger._reportLog).to.have.been.calledWithMatch({ message, origin: 'unknwon' });
expect(stepLogger.updateLastUpdate).to.have.been.calledWith();
});
});
Expand Down