-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support date and origin in stored logs #43
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,10 @@ class TaskLogger extends EventEmitter { | |
this.fatal = false; | ||
this.finished = false; | ||
this.steps = {}; | ||
|
||
this.rateLimitOptions = opts.rateLimitOptions; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove line 35 implemented in another change |
||
|
||
this.origin = opts.origin || 'unknown'; | ||
} | ||
|
||
create(name, resetStatus, runCreationLogic) { | ||
|
@@ -75,7 +79,8 @@ class TaskLogger extends EventEmitter { | |
const step = new StepClass({ | ||
accountId: this.accountId, | ||
jobId: this.jobId, | ||
name | ||
name, | ||
origin: this.origin | ||
}, { | ||
...opts | ||
}, this); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
const { Writable } = require('stream'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delete file - implemented in another change |
||
const _ = require('lodash'); | ||
|
||
const FIREBASE_MESSAGE_SIZE_LIMIT = 10 * 1024 * 1024; // 10 MB Maximum size of a string | ||
|
||
|
||
class FirebaseWritableStream extends Writable { | ||
constructor(_firebaseClient, options) { | ||
super(options); | ||
this._firebaseClient = _firebaseClient; | ||
this._timeUnitLimitMs = options.timeUnitLimitMs; | ||
this._debounceDelay = options.debounceDelay; | ||
this._messageSizeLimitPerTimeUnit = options.messageSizeLimitPerTimeUnit; | ||
this._batchSize = options.batchSize; | ||
} | ||
|
||
_write(chunk, encoding, next) { | ||
clearTimeout(this._debounceTimeout); | ||
const newLogKey = `${this._firebaseClient.child('logs').push().key()}`; | ||
const currentMessageSize = Buffer.byteLength(chunk); | ||
|
||
this._previousTimestamp = this._previousTimestamp || new Date().getTime(); | ||
|
||
const now = new Date().getTime(); | ||
const msDelta = now - this._previousTimestamp; | ||
console.log(`${new Date().toISOString()} streamLog.updateBatch: ms passed ${msDelta} / ${this._timeUnitLimitMs}`); | ||
// minute as passed, reset current byte size | ||
if (msDelta > this._timeUnitLimitMs) { | ||
console.log(`${new Date().toISOString()} streamLog.updateBatch: ${this._timeUnitLimitMs} | ||
has passed resetting current log byte size to 0`); | ||
this._currentLogByteSize = 0; | ||
this._previousTimestamp = now; | ||
} | ||
|
||
// current logs size during timeUnit exceeds limit (1MB/Second) | ||
if (currentMessageSize + this._currentLogByteSize > this._messageSizeLimitPerTimeUnit) { | ||
console.log(`${new Date().toISOString()} streamLog.updateBatch: current log size + message | ||
[${currentMessageSize + this._currentLogByteSize} / ${this._messageSizeLimitPerTimeUnit}] exceeded, flushing...`); | ||
|
||
this._firebaseClient.update(this._logsBatch, (err) => { | ||
if (err) { | ||
next(err); | ||
return; | ||
} | ||
this._logBatchByteSize = 0; | ||
this._logsBatch = Object.create(null); | ||
|
||
const waitMs = (this._timeUnitLimitMs - msDelta) + 5; | ||
// lets wait till time unit limit will pass in order to continue | ||
this._debounceTimeout = setTimeout(this._write.bind(this, chunk, encoding, next), waitMs); | ||
console.log(`${new Date().toISOString()} streamLog.updateBatch: successfully flushed to firebase, | ||
waiting ${waitMs} ms for logs byte size reset`); | ||
}); | ||
return; | ||
} | ||
|
||
this._logBatchByteSize += currentMessageSize; | ||
this._currentLogByteSize += this._logBatchByteSize; | ||
this._logsBatch[`${newLogKey}`] = chunk.toString(); | ||
console.log(`${new Date().toISOString()} streamLog.updateBatch: updated logs batch with new key | ||
'${newLogKey}', current logs byte size ${this._currentLogByteSize / 1024} KB`); | ||
|
||
if (_.size(this._logsBatch) < this._batchSize) { | ||
console.log(`${new Date().toISOString()} streamLog.updateBatch: batch capacity is still | ||
available [${_.keys(this._logsBatch).length}/${this._batchSize}], resetting debounce flush and continue`); | ||
this._setBatchFlushTimeout(this._debounceDelay); | ||
next(); | ||
return; | ||
} | ||
|
||
console.log(`${new Date().toISOString()} streamLog.updateBatch: logs batch size has been met [${this._batchSize}] flushing...`); | ||
this._firebaseClient.update(this._logsBatch, (err) => { | ||
if (err) { | ||
next(err); | ||
return; | ||
} | ||
console.log(`${new Date().toISOString()} streamLog.updateBatch: flushed successfully, | ||
resetting logs batch and debounce flush`); | ||
this._logsBatch = Object.create(null); | ||
this._setBatchFlushTimeout(this._debounceDelay); | ||
next(); | ||
}); | ||
} | ||
|
||
_setBatchFlushTimeout(flushInterval) { | ||
console.log(new Date().toISOString(), 'streamLog._setBatchFlushTimeout: setting flush timout', flushInterval); | ||
this._debounceTimeout = setTimeout(() => { | ||
if (_.isEmpty(this._logsBatch)) { | ||
console.log(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: logs batch is empty, no update is required`); | ||
return; | ||
} | ||
console.log(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: timeout | ||
triggered, [${this._logBatchByteSize / 1024} KB /${FIREBASE_MESSAGE_SIZE_LIMIT / 1024} KB], flushing...`); | ||
this._firebaseClient.update(this._logsBatch, (err) => { | ||
if (err) { | ||
console.error(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: flushed successfully`); | ||
} else { | ||
console.log(`${new Date().toISOString()} streamLog._setBatchFlushTimeout: flushed successfully`); | ||
} | ||
this._logsBatch = Object.create(null); | ||
}); | ||
}, flushInterval); | ||
} | ||
} | ||
|
||
module.exports = FirebaseWritableStream; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,8 +5,10 @@ const CFError = require('cf-errors'); | |
const { STATUS } = require('../enums'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. revert this file - implemented in another change |
||
const BaseStepLogger = require('../StepLogger'); | ||
const { wrapWithRetry } = require('../helpers'); | ||
const FirebaseWritableStream = require('./FirebaseWritableStream'); | ||
|
||
class FirebaseStepLogger extends BaseStepLogger { | ||
|
||
constructor(step, opts) { | ||
super(step, opts); | ||
|
||
|
@@ -21,6 +23,12 @@ class FirebaseStepLogger extends BaseStepLogger { | |
|
||
this.stepUrl = `${this.baseUrl}/steps/${this.name}`; | ||
this.stepRef = new Firebase(this.stepUrl); | ||
|
||
this.writableStream = new FirebaseWritableStream(this.stepRef, opts.rateLimitOptions); | ||
|
||
/* this._reportLog = this._logWriteStrategyFactory(); | ||
this._logsBatch = null; | ||
this._logBatchFlushTimeout = null; */ | ||
} | ||
|
||
async restore() { | ||
|
@@ -95,6 +103,27 @@ class FirebaseStepLogger extends BaseStepLogger { | |
this.stepRef.child('logs').set({}); | ||
} | ||
|
||
streamLog() { | ||
return this.writableStream; | ||
} | ||
|
||
_setBatchFlushTimeout(flushInterval) { | ||
this._logBatchFlushTimeout = setTimeout(() => { | ||
this.stepRef.update(this._logsBatch, err => console.log(err)); | ||
this._logsBatch = {}; | ||
}, flushInterval); | ||
} | ||
|
||
_logWriteStrategyFactory(logWriteStrategy) { | ||
switch (logWriteStrategy) { | ||
case 'batchLogWrite': | ||
this._logsBatch = {}; | ||
return this.batchLogWrite.bind(this); | ||
case 'singleLogWrite': | ||
default: return this.singleLogWrite.bind(this); | ||
} | ||
} | ||
|
||
async delete() { | ||
return this.stepRef.remove(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -179,7 +179,8 @@ class FirebaseTaskLogger extends BaseTaskLogger { | |
const step = new StepLogger({ | ||
accountId: this.accountId, | ||
jobId: this.jobId, | ||
name: key | ||
name: key, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. revert |
||
rateLimitOptions: this.rateLimitOptions | ||
}, { | ||
...this.opts | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove line 31 ( this.logWriteStrategyOptions = logWriteStrategyOptions || { logWriteStrategy: 'singleLogWrite' };)