Skip to content

Commit

Permalink
Merge branch 'bugfix/BB-624-connection-retry' into tmp/octopus/w/7.70…
Browse files Browse the repository at this point in the history
…/bugfix/BB-624-connection-retry
  • Loading branch information
bert-e committed Nov 6, 2024
2 parents 57fc235 + af67f46 commit 35fb11e
Show file tree
Hide file tree
Showing 20 changed files with 169 additions and 61 deletions.
20 changes: 20 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
node_modules
localData/*
localMetadata/*
.git
.github
.tox
coverage
.DS_Store

.dockerignore
docs/
images/
res/
_config.yml
.eslintrc
.gitignore
circle.yml
DESIGN.md
README.md
Using.md
2 changes: 1 addition & 1 deletion bin/queuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const queuePopulator = new QueuePopulator(
zkConfig, kafkaConfig, qpConfig, httpsConfig, mConfig, rConfig, extConfigs);

async.waterfall([
done => queuePopulator.open(done),
done => startProbeServer(qpConfig.probeServer, (err, probeServer) => {
if (err) {
log.error('error starting probe server', {
Expand All @@ -73,7 +74,6 @@ async.waterfall([
}
done();
}),
done => queuePopulator.open(done),
done => {
const taskState = {
batchInProgress: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ class KafkaNotificationDestination extends NotificationDestination {
this._log.info('error setting up kafka notif destination',
{ error: err.message });
done(err);
} else {
done();
}
done();
});
}

Expand Down
3 changes: 2 additions & 1 deletion extensions/notification/destination/KafkaProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ class KafkaProducer extends EventEmitter {
'request.timeout.ms': ACK_TIMEOUT,
});
this._ready = false;
this._producer.connect({}, error => {
this._producer.connect({ timeout: 60000 }, error => {
if (error) {
this._log.info('error connecting to broker', {
error,
method: 'KafkaProducer.constructor',
});
this.emit('error', error);
}
});
this._producer.on('ready', () => {
Expand Down
37 changes: 29 additions & 8 deletions extensions/notification/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,20 @@ class QueueProcessor extends EventEmitter {
* @param {boolean} [options.disableConsumer] - true to disable
* startup of consumer (for testing: one has to call
* processQueueEntry() explicitly)
* @param {function} done callback
* @return {undefined}
*/
start(options) {
start(options, done) {
async.series([
next => this._setupZookeeper(next),
next => this._setupNotificationConfigManager(next),
next => this._setupDestination(this.destinationConfig.type, next),
next => this._destination.init(() => {
// if connection to destination fails, process will stop & restart
next => this._destination.init(next),
next => {
if (options && options.disableConsumer) {
this.emit('ready');
return undefined;
return next();
}
const { groupId, concurrency, logConsumerMetricsIntervalS }
= this.notifConfig.queueProcessor;
Expand All @@ -142,22 +145,31 @@ class QueueProcessor extends EventEmitter {
queueProcessor: this.processKafkaEntry.bind(this),
logConsumerMetricsIntervalS,
});
this._consumer.on('error', () => { });
this._consumer.on('error', err => {
this.logger.error('error starting notification consumer',
{ method: 'QueueProcessor.start', error: err.message });
// crash if got error at startup
if (!this.isReady()) {
return next(err);
}
return undefined;
});
this._consumer.on('ready', () => {
this._consumer.subscribe();
this.logger.info('queue processor is ready to consume ' +
'notification entries');
this.emit('ready');
return next();
});
return next();
}),
return undefined;
},
], err => {
if (err) {
this.logger.info('error starting notification queue processor',
{ error: err.message });
return undefined;
return done(err);
}
return undefined;
return done();
});
}

Expand Down Expand Up @@ -263,6 +275,15 @@ class QueueProcessor extends EventEmitter {
return done();
}
}

/**
* Checks if queue processor is ready to consume
*
* @returns {boolean} is queue processor ready
*/
isReady() {
return this._consumer && this._consumer.isReady();
}
}

module.exports = QueueProcessor;
6 changes: 5 additions & 1 deletion extensions/notification/queueProcessor/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ try {
'could not be found in destinations defined');
const queueProcessor = new QueueProcessor(
zkConfig, kafkaConfig, notifConfig, destinationConfig, destination);
queueProcessor.start();
queueProcessor.start(undefined, err => {
if (err) {
process.exit(1);
}
});
} catch (err) {
log.error('error starting notification queue processor task', {
method: 'notification.task.queueProcessor.start',
Expand Down
12 changes: 11 additions & 1 deletion extensions/replication/failedCRR/FailedCRRConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class FailedCRRConsumer {
* @return {undefined}
*/
start(cb) {
let consumerReady = false;
const consumer = new BackbeatConsumer({
kafka: {
hosts: this._kafkaConfig.hosts,
Expand All @@ -46,8 +47,17 @@ class FailedCRRConsumer {
queueProcessor: this.processKafkaEntry.bind(this),
fetchMaxBytes: CONSUMER_FETCH_MAX_BYTES,
});
consumer.on('error', () => {});
consumer.on('error', err => {
if (!consumerReady) {
this.logger.fatal('could not setup a backbeat consumer', {
method: 'FailedCRRConsumer.start',
error: err,
});
process.exit(1);
}
});
consumer.on('ready', () => {
consumerReady = true;
consumer.subscribe();
this.logger.info('retry consumer is ready to consume entries');
});
Expand Down
2 changes: 1 addition & 1 deletion extensions/replication/failedCRR/FailedCRRProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class FailedCRRProducer {
kafka: { hosts: this._kafkaConfig.hosts },
topic: this._topic,
});
this._producer.once('error', () => {});
this._producer.once('error', cb);
this._producer.once('ready', () => {
this._producer.removeAllListeners('error');
this._producer.on('error', err =>
Expand Down
26 changes: 18 additions & 8 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ class QueueProcessor extends EventEmitter {
*/
start(options) {
this._setupProducer(err => {
let consumerReady = false;
if (err) {
this.logger.info('error setting up kafka producer',
{ error: err.message });
Expand All @@ -424,8 +425,15 @@ class QueueProcessor extends EventEmitter {
logConsumerMetricsIntervalS: this.repConfig.queueProcessor.logConsumerMetricsIntervalS,
circuitBreaker: this.circuitBreakerConfig,
});
this._consumer.on('error', () => { });
this._consumer.on('error', () => {
if (!consumerReady) {
this.logger.fatal('queue processor failed to start a ' +
'backbeat consumer');
process.exit(1);
}
});
this._consumer.on('ready', () => {
consumerReady = true;
this._consumer.subscribe();
this.logger.info('queue processor is ready to consume ' +
`replication entries from ${this.topic}`);
Expand Down Expand Up @@ -571,13 +579,15 @@ class QueueProcessor extends EventEmitter {

// consumer stats lag is on a different update cycle so we need to
// update the metrics when requested
const lagStats = this._consumer.consumerStats.lag;
Object.keys(lagStats).forEach(partition => {
metricsHandler.lag({
partition,
serviceName: this.serviceName,
}, lagStats[partition]);
});
if (this._consumer) {
const lagStats = this._consumer.consumerStats.lag;
Object.keys(lagStats).forEach(partition => {
metricsHandler.lag({
partition,
serviceName: this.serviceName,
}, lagStats[partition]);
});
}
const metrics = await promClient.register.metrics();

res.writeHead(200, {
Expand Down
30 changes: 15 additions & 15 deletions extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ function getProbeConfig(queueProcessorConfig, site) {
}

async.waterfall([
done => {
metricsProducer.setupProducer(err => {
if (err) {
log.error('error starting metrics producer for queue processor', {
error: err,
method: 'MetricsProducer::setupProducer',
});
}
done(err);
});
},
done => {
queueProcessor.on('ready', done);
queueProcessor.start();
},
done => startProbeServer(
getProbeConfig(repConfig.queueProcessor, site),
(err, probeServer) => {
Expand All @@ -80,21 +95,6 @@ async.waterfall([
done();
}
),
done => {
metricsProducer.setupProducer(err => {
if (err) {
log.error('error starting metrics producer for queue processor', {
error: err,
method: 'MetricsProducer::setupProducer',
});
}
done(err);
});
},
done => {
queueProcessor.on('ready', done);
queueProcessor.start();
},
], err => {
if (err) {
log.error('error during queue processor initialization', {
Expand Down
2 changes: 1 addition & 1 deletion extensions/replication/replay/ReplayProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ReplayProducer {
kafka: { hosts: this._kafkaConfig.hosts },
topic: this._topic,
});
this._producer.once('error', () => {});
this._producer.once('error', cb);
this._producer.once('ready', () => {
this._producer.removeAllListeners('error');
this._producer.on('error', err =>
Expand Down
30 changes: 15 additions & 15 deletions extensions/replication/replayProcessor/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ function getProbeConfig(replayProcessorConfig, site, topicName) {
}

async.waterfall([
done => {
metricsProducer.setupProducer(err => {
if (err) {
log.error('error starting metrics producer for queue processor', {
error: err,
method: 'MetricsProducer::setupProducer',
});
}
done(err);
});
},
done => {
queueProcessor.on('ready', done);
queueProcessor.start();
},
done => startProbeServer(
getProbeConfig(repConfig.replayProcessor, site, topic),
(err, probeServer) => {
Expand All @@ -87,21 +102,6 @@ async.waterfall([
done();
}
),
done => {
metricsProducer.setupProducer(err => {
if (err) {
log.error('error starting metrics producer for queue processor', {
error: err,
method: 'MetricsProducer::setupProducer',
});
}
done(err);
});
},
done => {
queueProcessor.on('ready', done);
queueProcessor.start();
},
], err => {
if (err) {
log.error('error during queue processor initialization', {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ class ReplicationStatusProcessor {
* @return {undefined}
*/
start(options, cb) {
let consumerReady = false;
this._FailedCRRProducer = new FailedCRRProducer(this.kafkaConfig);
this._replayTopicNames.forEach(t => {
this._ReplayProducers[t] = new ReplayProducer(this.kafkaConfig, t);
Expand All @@ -332,8 +333,14 @@ class ReplicationStatusProcessor {
bootstrap: options && options.bootstrap,
logConsumerMetricsIntervalS: this.repConfig.replicationStatusProcessor.logConsumerMetricsIntervalS,
});
this._consumer.on('error', () => { });
this._consumer.on('error', () => {
if (!consumerReady) {
this.logger.fatal('error starting a backbeat consumer');
process.exit(1);
}
});
this._consumer.on('ready', () => {
consumerReady = true;
this.logger.info('replication status processor is ready to ' +
'consume replication status entries');
this._consumer.subscribe();
Expand Down
2 changes: 1 addition & 1 deletion extensions/replication/replicationStatusProcessor/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ werelogs.configure({
});

async.waterfall([
done => replicationStatusProcessor.start(undefined, done),
done => startProbeServer(
repConfig.replicationStatusProcessor.probeServer,
(err, probeServer) => {
Expand All @@ -49,7 +50,6 @@ async.waterfall([
done();
}
),
done => replicationStatusProcessor.start(undefined, done),
], err => {
if (err) {
log.error('error during queue processor initialization', {
Expand Down
Loading

0 comments on commit 35fb11e

Please sign in to comment.