Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
-add logging to handle queuing failure
-remove redundant JSON.stringify when queuing message (there is no impact to message decoding)
-move queueHandler and defUpgrader to local scope in setup
-minor refactoring to improve code reuse and readability
-add more tests
  • Loading branch information
qtomlinson committed Dec 11, 2024
1 parent 96938b4 commit c3cdd04
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 67 deletions.
18 changes: 13 additions & 5 deletions providers/upgrade/defUpgradeQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@ class DefinitionQueueUpgrader extends DefinitionVersionChecker {

async _queueUpgrade(definition) {
if (!this._upgrade) throw new Error('Upgrade queue is not set')
const message = this._constructMessage(definition)
await this._upgrade.queue(JSON.stringify(message))
this.logger.debug('Queued for definition upgrade ', {
coordinates: EntityCoordinates.fromObject(definition.coordinates).toString()
})
try {
const message = this._constructMessage(definition)
await this._upgrade.queue(message)
this.logger.debug('Queued for definition upgrade ', {
coordinates: DefinitionVersionChecker.getCoordinates(definition)
})
} catch (error) {
//continue if queuing fails and requeue at the next request.
this.logger.error(`Error queuing for definition upgrade ${error.message}`, {
error,
coordinates: DefinitionVersionChecker.getCoordinates(definition)
})
}
}

_constructMessage(definition) {
Expand Down
6 changes: 5 additions & 1 deletion providers/upgrade/defVersionCheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DefinitionVersionChecker {
if (!this._currentSchema) throw new Error('Current schema version is not set')
const defSchemaVersion = get(definition, '_meta.schemaVersion')
this.logger.debug(`Definition version: %s, Current schema version: %s `, defSchemaVersion, this._currentSchema, {
coordinates: definition?.coordinates && EntityCoordinates.fromObject(definition.coordinates).toString()
coordinates: DefinitionVersionChecker.getCoordinates(definition)
})
if (defSchemaVersion && gte(defSchemaVersion, this._currentSchema)) return definition
}
Expand All @@ -36,6 +36,10 @@ class DefinitionVersionChecker {
setupProcessing() {
//do nothing for set up processing
}

static getCoordinates(definition) {
return definition?.coordinates && EntityCoordinates.fromObject(definition.coordinates).toString()
}
}

const factory = options => new DefinitionVersionChecker(options)
Expand Down
7 changes: 2 additions & 5 deletions providers/upgrade/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,9 @@ class DefinitionUpgrader {
}
}

let queueHandler
let defUpgrader

function setup(_queue, _definitionService, _logger, once = false, _defVersionChecker = factory({ logger: _logger })) {
defUpgrader = new DefinitionUpgrader(_definitionService, _logger, _defVersionChecker)
queueHandler = new QueueHandler(_queue, _logger, defUpgrader)
const defUpgrader = new DefinitionUpgrader(_definitionService, _logger, _defVersionChecker)
const queueHandler = new QueueHandler(_queue, _logger, defUpgrader)
return queueHandler.work(once)
}

Expand Down
163 changes: 107 additions & 56 deletions test/providers/upgrade/defUpgradeQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,80 +7,131 @@ chai.use(chaiAsPromised)
const { expect } = require('chai')
const sinon = require('sinon')
const DefinitionQueueUpgrader = require('../../../providers/upgrade/defUpgradeQueue')
const MemoryQueue = require('../../../providers/upgrade/memoryQueueConfig')

describe('DefinitionQueueUpgrader', () => {
const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } }
let queue, upgrader

beforeEach(async () => {
const logger = { debug: sinon.stub() }
queue = {
queue: sinon.stub().resolves(),
initialize: sinon.stub().resolves()
}
const queueFactory = sinon.stub().returns(queue)
upgrader = new DefinitionQueueUpgrader({ logger, queue: queueFactory })
})
const logger = { debug: sinon.stub(), error: sinon.stub() }

it('returns an instance of DefinitionQueueUpgrader', () => {
expect(upgrader).to.be.an.instanceOf(DefinitionQueueUpgrader)
})
describe('Unit tests', () => {
const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } }
let queue, upgrader

it('sets and gets current schema version', () => {
upgrader.currentSchema = '1.0.0'
expect(upgrader.currentSchema).to.equal('1.0.0')
})
beforeEach(async () => {
queue = {
queue: sinon.stub().resolves(),
initialize: sinon.stub().resolves()
}
const queueFactory = sinon.stub().returns(queue)
upgrader = new DefinitionQueueUpgrader({ logger, queue: queueFactory })
})

it('initializes', async () => {
await upgrader.initialize()
expect(queue.initialize.calledOnce).to.be.true
})
it('returns an instance of DefinitionQueueUpgrader', () => {
expect(upgrader).to.be.an.instanceOf(DefinitionQueueUpgrader)
})

it('connects to queue after setupProcessing', async () => {
await upgrader.initialize()
const definitionService = { currentSchema: '1.0.0' }
const logger = { debug: sinon.stub() }
queue.dequeueMultiple = sinon.stub().resolves([])
upgrader.setupProcessing(definitionService, logger, true)
expect(queue.dequeueMultiple.calledOnce).to.be.true
})
it('sets and gets current schema version', () => {
upgrader.currentSchema = '1.0.0'
expect(upgrader.currentSchema).to.equal('1.0.0')
})

context('validate', () => {
it('fails if current schema version is not set', async () => {
await expect(upgrader.validate(definition)).to.be.rejectedWith(Error)
it('initializes', async () => {
await upgrader.initialize()
expect(queue.initialize.calledOnce).to.be.true
})

it('fails if it is not initialized', async () => {
upgrader.currentSchema = '1.0.0'
const stale = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } }
await expect(upgrader.validate(stale)).to.be.rejectedWith(Error)
it('connects to queue after setupProcessing', async () => {
await upgrader.initialize()
const definitionService = { currentSchema: '1.0.0' }
queue.dequeueMultiple = sinon.stub().resolves([])
upgrader.setupProcessing(definitionService, logger, true)
expect(queue.dequeueMultiple.calledOnce).to.be.true
})

context('validate', () => {
it('fails if current schema version is not set', async () => {
await expect(upgrader.validate(definition)).to.be.rejectedWith(Error)
})

it('fails if it is not initialized', async () => {
upgrader.currentSchema = '1.0.0'
const stale = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } }
await expect(upgrader.validate(stale)).to.be.rejectedWith(Error)
})
})

context('validate after set up', () => {
beforeEach(async () => {
await upgrader.initialize()
upgrader.currentSchema = '1.0.0'
})

it('does not queue null definition', async () => {
const result = await upgrader.validate(null)
expect(result).to.be.not.ok
expect(queue.queue.called).to.be.false
})

it('does not queue an up-to-date definition', async () => {
const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } }
const result = await upgrader.validate(definition)
expect(result).to.deep.equal(definition)
expect(queue.queue.called).to.be.false
})

it('queues and returns a stale definition', async () => {
const definition = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } }
const result = await upgrader.validate(definition)
expect(result).to.deep.equal(definition)
expect(queue.queue.calledOnce).to.be.true
})

it('logs erorr when queueing throws', async () => {
const staleDef = {
coordinates: {
type: 'npm',
provider: 'npmjs',
name: 'lodash',
revision: '4.17.11'
},
_meta: { schemaVersion: '0.0.1' }
}
queue.queue.rejects(new Error('test'))
const result = await upgrader.validate(staleDef)
expect(result).to.deep.equal(staleDef)
expect(logger.error.calledOnce).to.be.true
const { coordinates } = logger.error.args[0][1]
expect(coordinates).to.eq('npm/npmjs/-/lodash/4.17.11')
})
})
})

context('validate after set up', () => {
describe('Integration tests', () => {
let queue, upgrader

beforeEach(async () => {
queue = MemoryQueue()
upgrader = new DefinitionQueueUpgrader({ logger, queue: sinon.stub().returns(queue) })
await upgrader.initialize()
upgrader.currentSchema = '1.0.0'
})

it('does not queue null definition', async () => {
const result = await upgrader.validate(null)
expect(result).to.be.not.ok
expect(queue.queue.called).to.be.false
})

it('does not queue an up-to-date definition', async () => {
const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } }
const result = await upgrader.validate(definition)
expect(result).to.deep.equal(definition)
expect(queue.queue.called).to.be.false
})
it('queues the correct message that can be decoded correctly', async () => {
const staleDef = {
coordinates: {
type: 'npm',
provider: 'npmjs',
name: 'lodash',
revision: '4.17.11'
},
_meta: { schemaVersion: '0.0.1' }
}
const result = await upgrader.validate(staleDef)
expect(result).to.deep.equal(staleDef)
expect(queue.data.length).to.equal(1)

it('queues and returns a stale definition', async () => {
const definition = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } }
const result = await upgrader.validate(definition)
expect(result).to.deep.equal(definition)
expect(queue.queue.calledOnce).to.be.true
const message = await queue.dequeue()
const coordinates = message.data.coordinates
expect(coordinates).to.deep.equal(staleDef.coordinates)
})
})
})

0 comments on commit c3cdd04

Please sign in to comment.