Skip to content

Commit 0722d00

Browse files
container logger will wait for all logs to be sent (#41)
* container logger will wait for all logs to be sent * CONTAINER_LOGGER_SHOW_PROGRESS - if true will print progression info * added more info to health-check
1 parent b216a86 commit 0722d00

11 files changed

+962
-213
lines changed

gulpfile.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ gulp.task('no.onlys', function (callback) {
1919

2020

2121
gulp.task('lint', ['clean'], function () {
22-
return gulp.src(['**/*.js', '!**/node_modules/**', '!**/server/migration/**', '!coverage/**/*.js', '!test/logger.unit.spec.js'])
22+
return gulp.src(['**/*.js', '!**/node_modules/**', '!**/server/migration/**', '!coverage/**/*.js', '!test/*.unit.spec.js'])
2323
.pipe(jshint({lookup: true}))
2424
.pipe(jshint.reporter('default'))
2525
.pipe(jshint.reporter('fail'));

lib/ContainerLogger.js

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ class ContainerLogger extends EventEmitter {
2727
this.logSize = 0;
2828
this.isWorkflowLogSizeExceeded = isWorkflowLogSizeExceeded;
2929
this.stepFinished = false;
30+
this.finishedStreams = 0;
31+
this.handledStreams = 0;
3032
}
3133

3234
start() {
@@ -105,14 +107,24 @@ class ContainerLogger extends EventEmitter {
105107
stepLoggerWritableStream.on('error', err => logger.error(`stepLoggerWritableStream: ${err}`));
106108

107109
// Attention(!) all streams piped to step logger writable stream must be a new streams(!) in order to avoid message piping twice to writable stream.
108-
stdout.pipe(this._logSizeLimitStream(), {end: false}).pipe(this.stepLogger.stepNameTransformStream(), {end: false}).pipe(stepLoggerWritableStream, {end: false});
109-
110+
// { end = false } on the stepLoggerWritableStream because there is only one instance of it for all the steps.
111+
this.handledStreams++;
112+
stdout
113+
.pipe(this._logSizeLimitStream())
114+
.pipe(this.stepLogger.stepNameTransformStream().once('end', this._handleFinished.bind(this)))
115+
.pipe(stepLoggerWritableStream, {end: false});
116+
117+
110118
if (!stderr) {
111119
return;
112120
}
113121

114-
stderr.pipe(this._logSizeLimitStream(), {end: false})
115-
.pipe(this._errorTransformerStream(), {end: false}).pipe(this.stepLogger.stepNameTransformStream(), {end: false}).pipe(stepLoggerWritableStream, {end: false});
122+
this.handledStreams++;
123+
stderr
124+
.pipe(this._logSizeLimitStream())
125+
.pipe(this._errorTransformerStream())
126+
.pipe(this.stepLogger.stepNameTransformStream().once('end', this._handleFinished.bind(this)))
127+
.pipe(stepLoggerWritableStream, {end: false});
116128

117129
stderr.once('end', () => {
118130
this.stepFinished = true;
@@ -133,6 +145,8 @@ class ContainerLogger extends EventEmitter {
133145
}
134146

135147
_handleTtyStream(stream, isError) {
148+
this.handledStreams++;
149+
stream.on('end', this._handleFinished.bind(this));
136150
stream.on('data', (chunk) => {
137151
const buf = new Buffer(chunk);
138152
const message = buf.toString('utf8');
@@ -142,6 +156,7 @@ class ContainerLogger extends EventEmitter {
142156
}
143157

144158
_handleNonTtyStream(stream, isError) {
159+
this.handledStreams++;
145160
stream.on('readable', () => {
146161
let header = stream.read(8);
147162
while (header !== null) {
@@ -153,6 +168,7 @@ class ContainerLogger extends EventEmitter {
153168
header = stream.read(8);
154169
}
155170
});
171+
stream.on('end', this._handleFinished.bind(this));
156172
logger.info(`Listening on stream 'readable' event for container: ${this.containerId}`);
157173
}
158174

@@ -218,6 +234,14 @@ class ContainerLogger extends EventEmitter {
218234
});
219235
}
220236

237+
_handleFinished() {
238+
this.finishedStreams++;
239+
240+
if (this.finishedStreams === this.handledStreams) {
241+
// the emission of this event reflects the ending of all streams handled by this container logger
242+
this.emit('end');
243+
}
244+
}
221245
}
222246

223247
module.exports = ContainerLogger;

lib/index.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
const path = require('path');
44
const cflogs = require('cf-logs');
5+
const Q = require('q');
56

67
const loggerOptions = {
78
filePath: path.join(__dirname, '../logs', 'logs.log'),
@@ -15,17 +16,25 @@ const loggerOptions = {
1516
cflogs.init(loggerOptions);
1617

1718
const Logger = require('./logger');
19+
const buildFinishedPromise = Q.defer();
1820

1921
const logger = new Logger({
2022
loggerId: process.env.LOGGER_ID,
2123
taskLoggerConfig: JSON.parse(process.env.TASK_LOGGER_CONFIG),
2224
findExistingContainers: process.env.LISTEN_ON_EXISTING,
2325
logSizeLimit: process.env.LOG_SIZE_LIMIT ? (parseInt(process.env.LOG_SIZE_LIMIT) * 1000000) : undefined,
26+
buildFinishedPromise: buildFinishedPromise.promise,
27+
showProgress: process.env.SHOW_PROGRESS === 'true' ? true : false,
2428
});
2529

2630
logger.validate();
2731
logger.start();
2832

33+
process.on('SIGUSR2', () => {
34+
console.log('GOT SIGUSR2 --- engine signaling build is finished');
35+
buildFinishedPromise.resolve();
36+
});
37+
2938
process.on('beforeExit', (code) => {
3039
console.log(`beforeExit: ${code}`);
3140
logger.state.beforeExitCode = code;

lib/logger.js

Lines changed: 81 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
'use strict';
22

33
const fs = require('fs');
4+
const { EventEmitter } = require('events');
45
const _ = require('lodash');
6+
const Q = require('q');
57
const Docker = require('dockerode');
68
const DockerEvents = require('docker-events');
79
const CFError = require('cf-errors');
@@ -12,15 +14,16 @@ const ContainerHandlingStatus = require('./enums').ContainerHandlingStatus;
1214
const ContainerLogger = require('./ContainerLogger');
1315
const { TaskLogger } = require('@codefresh-io/task-logger');
1416

15-
16-
const initialState = { status: 'init', lastLogsDate: Date.now() , failedHealthChecks: [] , restartCounter: 0, containers: {} };
17+
const initialState = { pid: process.pid, status: 'init', lastLogsDate: new Date() , failedHealthChecks: [] , restartCounter: 0, containers: {} };
1718
class Logger {
1819

1920
constructor({
2021
loggerId,
2122
taskLoggerConfig,
2223
findExistingContainers,
23-
logSizeLimit
24+
logSizeLimit,
25+
buildFinishedPromise,
26+
showProgress,
2427
}) {
2528
this.taskLoggerConfig = taskLoggerConfig;
2629
this.loggerId = loggerId;
@@ -29,6 +32,10 @@ class Logger {
2932
this.containerLoggers = [];
3033
this.logSize = 0;
3134
this.taskLogger = undefined;
35+
this.buildFinishedPromise = buildFinishedPromise || Q.resolve();
36+
this.finishedContainers = 0;
37+
this.finishedContainersEmitter = new EventEmitter();
38+
this.showProgress = showProgress;
3239

3340
let dockerSockPath;
3441
if (fs.existsSync('/var/run/codefresh/docker.sock')) {
@@ -43,6 +50,8 @@ class Logger {
4350
socketPath: dockerSockPath,
4451
});
4552
this._readState();
53+
this._handleBuildFinished();
54+
this._updateStateInterval = setInterval(this._updateStateFile.bind(this), 1000);
4655
}
4756

4857
/**
@@ -70,6 +79,7 @@ class Logger {
7079

7180
TaskLogger(this.taskLoggerConfig.task, this.taskLoggerConfig.opts)
7281
.then((taskLogger) => {
82+
this.taskLogger = taskLogger;
7383
taskLogger.on('error', (err) => {
7484
logger.error(err.stack);
7585
});
@@ -81,12 +91,16 @@ class Logger {
8191
}else {
8292
this.state.healthCheckStatus = status;
8393
}
94+
8495
this._writeNewState();
8596
});
86-
87-
this.taskLogger = taskLogger;
97+
taskLogger.on('flush', () => {
98+
this._updateMissingLogs();
99+
this._updateLastLoggingDate();
100+
});
101+
this.state.logsStatus = this.taskLogger.getStatus();
88102
logger.info(`taskLogger successfully created`);
89-
103+
90104
this._listenForNewContainers();
91105

92106
this.state.status = 'ready';
@@ -108,8 +122,9 @@ class Logger {
108122
_readState() {
109123
const filePath = `${__dirname}/state.json`;
110124
if (fs.existsSync(filePath)) {
111-
this.state = _.omit(JSON.parse(fs.readFileSync(filePath, 'utf8'), 'containers'));
125+
this.state = _.omit(JSON.parse(fs.readFileSync(filePath, 'utf8'), ['containers', 'pid']));
112126
this.state.containers = {};
127+
this.state.pid = process.pid;
113128
let restartCounter = _.get(this.state, 'restartCounter', 0);
114129
restartCounter++;
115130
this.state.restartCounter = restartCounter;
@@ -143,10 +158,9 @@ class Logger {
143158
} else if (!disableLog) {
144159
logger.info(`State: ${currentState} updated and written to file: ${filePath}`);
145160
}
146-
});
161+
});
147162
}
148163

149-
150164
logLimitExceeded() {
151165
// TODO in the future when we allow a workflow to use multuple dinds, this will not be correct
152166
// we need to get the total size of logs from all dinds
@@ -238,7 +252,7 @@ class Logger {
238252
});
239253
this.containerLoggers.push(containerLogger);
240254
containerLogger.on('message.logged', this._updateTotalLogSize.bind(this));
241-
containerLogger.on('message.logged', this._updateLastLoggingDate.bind(this));
255+
containerLogger.once('end', this._handleContainerStreamEnd.bind(this));
242256

243257
containerLogger.start()
244258
.done(() => {
@@ -254,14 +268,33 @@ class Logger {
254268
});
255269
}
256270

271+
_updateMissingLogs() {
272+
const resolvedCalls = _.get(this, 'state.logsStatus.resolvedCalls', 0);
273+
const writeCalls = _.get(this, 'state.logsStatus.writeCalls', 0);
274+
const rejectedCalls = _.get(this, 'state.logsStatus.rejectedCalls', 0);
275+
276+
_.set(this, 'state.logsStatus.missingLogs', writeCalls - resolvedCalls - rejectedCalls);
277+
}
278+
257279
_updateTotalLogSize() {
258280
this.logSize = this._getTotalLogSize();
259281
this.taskLogger.setLogSize(this.logSize);
260282
}
261283

262284
_updateLastLoggingDate() {
263-
this.state.lastLogsDate = Date.now();
264-
this._writeNewState(true);
285+
this.state.lastLogsDate = new Date();
286+
}
287+
288+
_updateStateFile() {
289+
if (this.state.status === 'done') {
290+
clearInterval(this._updateStateInterval);
291+
} else {
292+
this._writeNewState(true);
293+
294+
if (this.showProgress) {
295+
logger.debug(`logger progress update: ${JSON.stringify(this.state.logsStatus)}`);
296+
}
297+
}
265298
}
266299

267300
/**
@@ -304,6 +337,42 @@ class Logger {
304337
});
305338
}
306339

340+
_handleContainerStreamEnd() {
341+
this.finishedContainers++;
342+
this.finishedContainersEmitter.emit('end');
343+
}
344+
345+
// do not call before build is finished
346+
_awaitAllStreamsClosed() {
347+
const deferred = Q.defer();
348+
this._checkAllStreamsClosed(deferred);
349+
this.finishedContainersEmitter.on('end', this._checkAllStreamsClosed.bind(this, deferred));
350+
return deferred.promise;
351+
}
352+
353+
_checkAllStreamsClosed(deferred) {
354+
if (this.finishedContainers === this.containerLoggers.length) {
355+
deferred.resolve();
356+
}
357+
}
358+
359+
_handleBuildFinished() {
360+
this.buildFinishedPromise
361+
.then(() => {
362+
logger.info('=== build is finished ===');
363+
return this._awaitAllStreamsClosed();
364+
})
365+
.then(() => {
366+
logger.info('=== all streams have been closed ===');
367+
return this.taskLogger.awaitLogsFlushed();
368+
})
369+
.then(() => {
370+
logger.info('=== All logs flushed. Container logger finished. ===');
371+
this.state.logsStatus = this.taskLogger.getStatus();
372+
this.state.status = 'done';
373+
this._writeNewState();
374+
});
375+
}
307376
}
308377

309378
module.exports = Logger;

0 commit comments

Comments
 (0)