diff --git a/config/default.json b/config/default.json index 13258adbd..52cfe0cf1 100644 --- a/config/default.json +++ b/config/default.json @@ -39,7 +39,7 @@ "maxPayloadSizeMB": 50, "truncateSize": 15000, "truncateAppend": "\n[truncated ...]", - "authenticationTypes": ["token"] + "authenticationTypes": ["basic", "token"] }, "rerun": { "httpPort": 7786, @@ -47,7 +47,9 @@ "processor": { "enabled": true, "pollPeriodMillis": 2000 - } + }, + "taskTransactionsLength": 50, + "activeConcurrentTasks": 3 }, "tcpAdapter": { "httpReceiver": { diff --git a/package-lock.json b/package-lock.json index 46d0e3711..5cfcaecd4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "openhim-core", - "version": "7.0.2", + "version": "7.1.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 33741815d..cf6a3d82c 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "openhim-core", "description": "The OpenHIM core application that provides logging and routing of http requests", - "version": "7.0.2", + "version": "7.1.0", "main": "./lib/server.js", "bin": { "openhim-core": "./bin/openhim-core.js" diff --git a/src/alerts.js b/src/alerts.js index 0ba36bf8c..c617c2bb9 100644 --- a/src/alerts.js +++ b/src/alerts.js @@ -257,7 +257,7 @@ const findTransactionsMaxRetried = (channel, alert, dateFrom, callback) => status: 500, autoRetryAttempt: channel.autoRetryMaxAttempts }, - {transactionID: 'transactionID'} + {transactionID: 1} ) // .hint({created: 1}) .exec((err, transactions) => { diff --git a/src/api/transactions.js b/src/api/transactions.js index 311e5e244..26af5b626 100644 --- a/src/api/transactions.js +++ b/src/api/transactions.js @@ -9,9 +9,11 @@ import * as events from '../middleware/events' import * as utils from '../utils' import {ChannelModelAPI} from '../model/channels' import {TransactionModelAPI} from '../model/transactions' +import {TaskModelAPI} from '../model/tasks' import {config} from '../config' const apiConf = config.get('api') +const taskTransactionsLength = config.get('rerun').taskTransactionsLength function hasError(updates) { if (updates.error != null) { @@ -147,7 +149,7 @@ export async function getTransactions(ctx) { const filterSkip = filterPage * filterLimit // get filters object - const filters = + let filters = filtersObject.filters != null ? JSON.parse(filtersObject.filters) : {} // Test if the user is authorised @@ -175,148 +177,262 @@ export async function getTransactions(ctx) { filters.channelID = {$in: getChannelIDsArray(allChannels)} } - if ( - getActiveRoles('txViewFullAcl', ctx.authenticated.groups, allChannels) - .size > 0 - ) { - filterRepresentation = 'full' - } else if ( - getActiveRoles('txViewAcl', ctx.authenticated.groups, allChannels) - .size > 0 - ) { - filterRepresentation = 'simpledetails' - } else { - filterRepresentation = '' + if (filterRepresentation != 'bulkrerun') { + if ( + getActiveRoles('txViewFullAcl', ctx.authenticated.groups, allChannels) + .size > 0 + ) { + filterRepresentation = 'full' + } else if ( + getActiveRoles('txViewAcl', ctx.authenticated.groups, allChannels) + .size > 0 + ) { + filterRepresentation = 'simpledetails' + } else { + filterRepresentation = '' + } } } // get projection object const projectionFiltersObject = getProjectionObject(filterRepresentation) - // parse date to get it into the correct format for querying - if (filters['request.timestamp']) { - filters['request.timestamp'] = JSON.parse(filters['request.timestamp']) + filters = parseTransactionFilters(filters) + + // execute the query + let transactions = await TransactionModelAPI.find(filters, projectionFiltersObject) + .skip(filterSkip) + .limit(parseInt(filterLimit, 10)) + .sort({'request.timestamp': -1}) + .exec() + + if (filterRepresentation === 'bulkrerun') { + const count = await TransactionModelAPI.count(filters).exec() + ctx.body = { + count + } + } else { + ctx.body = transactions } - /* Transaction Filters */ - // build RegExp for transaction request path filter - if (filters['request.path']) { - filters['request.path'] = new RegExp(filters['request.path'], 'i') + if (filterRepresentation === 'fulltruncate') { + Array.from(ctx.body).map(trx => truncateTransactionDetails(trx)) } + } catch (e) { + utils.logAndSetResponse( + ctx, + 500, + `Could not retrieve transactions via the API: ${e}`, + 'error' + ) + } +} - // build RegExp for transaction request querystring filter - if (filters['request.querystring']) { - filters['request.querystring'] = new RegExp( - filters['request.querystring'], - 'i' +export async function rerunTransactions(ctx) { + try { + const filtersObject = ctx.request.body + const {batchSize, pauseQueue} = filtersObject + + let filters = filtersObject.filters + + // Test if the user is authorised + if (!authorisation.inGroup('admin', ctx.authenticated)) { + // if not an admin, restrict by transactions that this user can view + const fullViewChannels = await authorisation.getUserViewableChannels( + ctx.authenticated, + 'txViewFullAcl' ) - } + const partViewChannels = await authorisation.getUserViewableChannels( + ctx.authenticated + ) + const allChannels = fullViewChannels.concat(partViewChannels) - // response status pattern match checking - if ( - filters['response.status'] && - utils.statusCodePatternMatch(filters['response.status']) - ) { - filters['response.status'] = { - $gte: filters['response.status'][0] * 100, - $lt: filters['response.status'][0] * 100 + 100 + if (filters.channelID) { + if (!getChannelIDsArray(allChannels).includes(filters.channelID)) { + return utils.logAndSetResponse( + ctx, + 403, + `Forbidden: Unauthorized channel ${filters.channelID}`, + 'info' + ) + } + } else { + filters.channelID = {$in: getChannelIDsArray(allChannels)} } } - // check if properties exist - if (filters.properties) { - // we need to source the property key and re-construct filter - const key = Object.keys(filters.properties)[0] - filters[`properties.${key}`] = filters.properties[key] + filters = parseTransactionFilters(filters) - // if property has no value then check if property exists instead - if (filters.properties[key] === null) { - filters[`properties.${key}`] = {$exists: true} - } + const count = await TransactionModelAPI.count(filters).exec() + const pages = Math.floor(count/taskTransactionsLength) - // delete the old properties filter as its not needed - delete filters.properties - } + createRerunTasks(filters, batchSize, ctx.authenticated.email, 0, pages, pauseQueue, taskTransactionsLength) - // parse childIDs query to get it into the correct format for querying - if (filters['childIDs']) { - filters['childIDs'] = JSON.parse(filters['childIDs']) + ctx.body = { + success: true, + message: "Tasks created for bulk rerun of transactions" } + } catch (e) { + utils.logAndSetResponse( + ctx, + 500, + `Could not create rerun tasks via the API: ${e}`, + 'error' + ) + } +} - /* Route Filters */ - // build RegExp for route request path filter - if (filters['routes.request.path']) { - filters['routes.request.path'] = new RegExp( - filters['routes.request.path'], - 'i' - ) - } +let transactionsToRerun, taskObject, task - // build RegExp for transaction request querystring filter - if (filters['routes.request.querystring']) { - filters['routes.request.querystring'] = new RegExp( - filters['routes.request.querystring'], - 'i' - ) - } +async function createRerunTasks(filters, batchSize, email, page, pages, pauseQueue, filterLimit) { + transactionsToRerun = await TransactionModelAPI.find(filters, {_id: 1}) + .skip(filterLimit*page) + .limit(parseInt(filterLimit, 10)) + .sort({'request.timestamp': -1}) + .exec() - // route response status pattern match checking - if ( - filters['routes.response.status'] && - utils.statusCodePatternMatch(filters['routes.response.status']) - ) { - filters['routes.response.status'] = { - $gte: filters['routes.response.status'][0] * 100, - $lt: filters['routes.response.status'][0] * 100 + 100 - } - } + if (!transactionsToRerun.length) return - /* orchestration Filters */ - // build RegExp for orchestration request path filter - if (filters['orchestrations.request.path']) { - filters['orchestrations.request.path'] = new RegExp( - filters['orchestrations.request.path'], - 'i' - ) - } + transactionsToRerun = transactionsToRerun.map(trans => { + return { + tid: trans._id + }}) - // build RegExp for transaction request querystring filter - if (filters['orchestrations.request.querystring']) { - filters['orchestrations.request.querystring'] = new RegExp( - filters['orchestrations.request.querystring'], - 'i' - ) + taskObject = {} + taskObject.remainingTransactions = transactionsToRerun.length + taskObject.user = email + taskObject.transactions = transactionsToRerun + taskObject.totalTransactions = transactionsToRerun.length + taskObject.batchSize = batchSize + + if (pauseQueue) { + taskObject.status = 'Paused' + } + + task = await new TaskModelAPI(taskObject).save() + + logger.info(`Rerun task with id ${task._id} created!`) + + if (page < pages) { + return createRerunTasks(filters, batchSize, email, ++page, pages, pauseQueue, filterLimit) + } else { + transactionsToRerun = null + task = null + taskObject = null + return + } +} + +function parseTransactionFilters (filters) { + filters = Object.assign({}, filters) + + // parse date to get it into the correct format for querying + if (filters['request.timestamp']) { + filters['request.timestamp'] = JSON.parse(filters['request.timestamp']) + } + + /* Transaction Filters */ + // build RegExp for transaction request path filter + if (filters['request.path']) { + filters['request.path'] = new RegExp(filters['request.path'], 'i') + } + + // build RegExp for transaction request querystring filter + if (filters['request.querystring']) { + filters['request.querystring'] = new RegExp( + filters['request.querystring'], + 'i' + ) + } + + // response status pattern match checking + if ( + filters['response.status'] && + utils.statusCodePatternMatch(filters['response.status']) + ) { + filters['response.status'] = { + $gte: filters['response.status'][0] * 100, + $lt: filters['response.status'][0] * 100 + 100 } + } - // orchestration response status pattern match checking - if ( - filters['orchestrations.response.status'] && - utils.statusCodePatternMatch(filters['orchestrations.response.status']) - ) { - filters['orchestrations.response.status'] = { - $gte: filters['orchestrations.response.status'][0] * 100, - $lt: filters['orchestrations.response.status'][0] * 100 + 100 - } + // check if properties exist + if (filters.properties) { + // we need to source the property key and re-construct filter + const key = Object.keys(filters.properties)[0] + filters[`properties.${key}`] = filters.properties[key] + + // if property has no value then check if property exists instead + if (filters.properties[key] === null) { + filters[`properties.${key}`] = {$exists: true} } - // execute the query - ctx.body = await TransactionModelAPI.find(filters, projectionFiltersObject) - .skip(filterSkip) - .limit(parseInt(filterLimit, 10)) - .sort({'request.timestamp': -1}) - .exec() + // delete the old properties filter as its not needed + delete filters.properties + } - if (filterRepresentation === 'fulltruncate') { - Array.from(ctx.body).map(trx => truncateTransactionDetails(trx)) + // parse childIDs query to get it into the correct format for querying + if (filters['childIDs']) { + filters['childIDs'] = JSON.parse(filters['childIDs']) + } + + /* Route Filters */ + // build RegExp for route request path filter + if (filters['routes.request.path']) { + filters['routes.request.path'] = new RegExp( + filters['routes.request.path'], + 'i' + ) + } + + // build RegExp for transaction request querystring filter + if (filters['routes.request.querystring']) { + filters['routes.request.querystring'] = new RegExp( + filters['routes.request.querystring'], + 'i' + ) + } + + // route response status pattern match checking + if ( + filters['routes.response.status'] && + utils.statusCodePatternMatch(filters['routes.response.status']) + ) { + filters['routes.response.status'] = { + $gte: filters['routes.response.status'][0] * 100, + $lt: filters['routes.response.status'][0] * 100 + 100 } - } catch (e) { - utils.logAndSetResponse( - ctx, - 500, - `Could not retrieve transactions via the API: ${e}`, - 'error' + } + + /* orchestration Filters */ + // build RegExp for orchestration request path filter + if (filters['orchestrations.request.path']) { + filters['orchestrations.request.path'] = new RegExp( + filters['orchestrations.request.path'], + 'i' + ) + } + + // build RegExp for transaction request querystring filter + if (filters['orchestrations.request.querystring']) { + filters['orchestrations.request.querystring'] = new RegExp( + filters['orchestrations.request.querystring'], + 'i' ) } + + // orchestration response status pattern match checking + if ( + filters['orchestrations.response.status'] && + utils.statusCodePatternMatch(filters['orchestrations.response.status']) + ) { + filters['orchestrations.response.status'] = { + $gte: filters['orchestrations.response.status'][0] * 100, + $lt: filters['orchestrations.response.status'][0] * 100 + 100 + } + } + + return filters } function recursivelySearchObject(ctx, obj, ws, repeat) { @@ -678,4 +794,6 @@ export async function removeTransaction(ctx, transactionId) { if (process.env.NODE_ENV === 'test') { exports.calculateTransactionBodiesByteLength = calculateTransactionBodiesByteLength + exports.createRerunTasks = + createRerunTasks } diff --git a/src/koaApi.js b/src/koaApi.js index c505dafa1..e197771a8 100644 --- a/src/koaApi.js +++ b/src/koaApi.js @@ -82,6 +82,7 @@ export function setupApp(done) { app.use(route.get('/transactions', transactions.getTransactions)) app.use(route.post('/transactions', transactions.addTransaction)) + app.use(route.post('/bulkrerun', transactions.rerunTransactions)) app.use( route.get('/transactions/:transactionId', transactions.getTransactionById) ) diff --git a/src/tasks.js b/src/tasks.js index c06f5b3e2..2c5df6d97 100644 --- a/src/tasks.js +++ b/src/tasks.js @@ -19,6 +19,10 @@ let activeTasks = 0 export async function findAndProcessAQueuedTask() { let task try { + if (activeTasks > config.rerun.activeConcurrentTasks) { + return + } + task = await TaskModel.findOneAndUpdate( {status: 'Queued'}, {status: 'Processing'}, @@ -118,8 +122,22 @@ async function processNextTaskRound(task) { const transactions = Array.from( task.transactions.slice(nextI, nextI + task.batchSize) ) + const modifiedTask = await TaskModel.findById(task._id) + + if (modifiedTask.status === 'Paused') { + logger.info( + `Processing of task ${task._id} paused. Remaining transactions = ${task.remainingTransactions}` + ) + TaskModel.findByIdAndUpdate({_id: task._id}, { + remainingTransactions: task.remainingTransactions + }) + + return + } const promises = transactions.map(transaction => { + task.remainingTransactions-- + return new Promise(resolve => { rerunTransaction(transaction.tid, task._id, (err, response) => { if (err) { @@ -139,8 +157,6 @@ async function processNextTaskRound(task) { } else { transaction.tstatus = 'Completed' } - - task.remainingTransactions-- return resolve() }) @@ -149,15 +165,25 @@ async function processNextTaskRound(task) { }) await Promise.all(promises) - try { - await task.save() - } catch (err) { - logger.error( - `Failed to save current task while processing round: taskID=${task._id}, err=${err}`, - err - ) + logger.info( + `Round completed for rerun task #${task._id} - ${task.remainingTransactions} transactions remainings `, task.batchSize + ) + + if (task.remainingTransactions) { + await processNextTaskRound(task) + } else { + task.status = 'Completed' + task.completedDate = new Date() + logger.info(`Round completed for rerun task #${task._id} - Task completed`) + + await task.save().catch(err => { + logger.error( + `Failed to save current task while processing round: taskID=${task._id}, err=${err}`, + err + ) + }) + return } - return finalizeTaskRound(task) } function rerunTransaction(transactionID, taskID, callback) { @@ -412,4 +438,5 @@ if (process.env.NODE_ENV === 'test') { exports.rerunHttpRequestSend = rerunHttpRequestSend exports.rerunTcpRequestSend = rerunTcpRequestSend exports.findAndProcessAQueuedTask = findAndProcessAQueuedTask + exports.processNextTaskRound = processNextTaskRound } diff --git a/test/integration/transactionsAPITests.js b/test/integration/transactionsAPITests.js index b3e9e80d4..b8b2acd74 100644 --- a/test/integration/transactionsAPITests.js +++ b/test/integration/transactionsAPITests.js @@ -7,6 +7,7 @@ import request from 'supertest' import should from 'should' import {ObjectId} from 'mongodb' import {promisify} from 'util' +import sinon from 'sinon' import * as constants from '../constants' import * as server from '../../src/server' @@ -14,8 +15,9 @@ import * as testUtils from '../utils' import {AutoRetryModelAPI} from '../../src/model/autoRetry' import {ChannelModel} from '../../src/model/channels' import {EventModelAPI} from '../../src/model/events' -import {TransactionModel} from '../../src/model/transactions' +import {TransactionModel, TransactionModelAPI} from '../../src/model/transactions' import {config} from '../../src/config' +import {TaskModel} from '../../src/model' const ORIGINAL_API_CONFIG = config.api const ORIGINAL_APPLICATION_CONFIG = config.application @@ -1048,6 +1050,59 @@ describe('API Integration Tests', () => { ` { + const transactionData = { + _id: '111111111111111111111111', + status: 'Processing', + clientID: '999999999999999999999999', + channelID: channel._id, + request: requestDoc, + response: responseDoc, + routes: [ + { + name: 'dummy-route', + request: requestDoc, + response: responseDoc + } + ], + orchestrations: [ + { + name: 'dummy-orchestration', + request: requestDoc, + response: responseDoc + } + ], + properties: { + prop1: 'prop1-value1', + prop2: 'prop-value1' + } + } + await new TransactionModel(transactionData).save() + + const res = await request(constants.BASE_URL) + .get('/transactions?filterRepresentation=bulkrerun') + .set('auth-username', testUtils.nonRootUser.email) + .set('auth-ts', authDetails.authTS) + .set('auth-salt', authDetails.authSalt) + .set('auth-token', authDetails.authToken) + .expect(200) + + res.body.count.should.equal(1) + }) + + it('should fail to fetch transactions (intenal server error)', async () => { + const stub = sinon.stub(TransactionModelAPI, 'find') + stub.callsFake((() => Promise.reject())) + await request(constants.BASE_URL) + .get('/transactions') + .set('auth-username', testUtils.nonRootUser.email) + .set('auth-ts', authDetails.authTS) + .set('auth-salt', authDetails.authSalt) + .set('auth-token', authDetails.authToken) + .expect(500) + stub.restore() + }) }) describe('*getTransactionById (transactionId)', () => { @@ -1134,6 +1189,119 @@ describe('API Integration Tests', () => { }) }) + describe('*rerunTransactions', () => { + it('should call rerunTransactions', async () => { + await new TransactionModel( + Object.assign({}, transactionData, { + clientID: '555555555555555555555556' + }) + ).save() + const res = await request(constants.BASE_URL) + .post(`/bulkrerun`) + .set('auth-username', testUtils.rootUser.email) + .set('auth-ts', authDetails.authTS) + .set('auth-salt', authDetails.authSalt) + .set('auth-token', authDetails.authToken) + .send({ + batchSize: 1, + filters: {} + }) + .expect(200) + res.body.success.should.equal(true) + }) + + it('should create rerun task', async () => { + const tx = await new TransactionModel( + Object.assign({}, transactionData, { + clientID: '555555555555555555555556' + }) + ).save() + + await TaskModel.deleteMany({}) + const res = await request(constants.BASE_URL) + .post(`/bulkrerun`) + .set('auth-username', testUtils.rootUser.email) + .set('auth-ts', authDetails.authTS) + .set('auth-salt', authDetails.authSalt) + .set('auth-token', authDetails.authToken) + .send({ + batchSize: 1, + filters: {} + }) + .expect(200) + + await new Promise(resolve => { + setTimeout(() => resolve(), 1000) + }) + res.body.success.should.equal(true) + const task = await TaskModel.findOne() + task.transactions[0].tid.should.equal(tx._id.toString()) + }) + + it('should do bulk rerun for non-admin user', async () => { + await request(constants.BASE_URL) + .post('/bulkrerun') + .set('auth-username', testUtils.nonRootUser.email) + .set('auth-ts', authDetails.authTS) + .set('auth-salt', authDetails.authSalt) + .set('auth-token', authDetails.authToken) + .send({ + batchSize: 1, + filters: {} + }) + .expect(200) + }) + + it('should do bulk rerun on specific channel', async () => { + const res = await request(constants.BASE_URL) + .post('/bulkrerun') + .set('auth-username', testUtils.nonRootUser.email) + .set('auth-ts', authDetails.authTS) + .set('auth-salt', authDetails.authSalt) + .set('auth-token', authDetails.authToken) + .send({ + batchSize: 1, + filters: { + channelID: channel._id + } + }) + .expect(200) + }) + + it('should fail to do bulk rerun on rescrticted channel', async () => { + const res = await request(constants.BASE_URL) + .post('/bulkrerun') + .set('auth-username', testUtils.nonRootUser.email) + .set('auth-ts', authDetails.authTS) + .set('auth-salt', authDetails.authSalt) + .set('auth-token', authDetails.authToken) + .send({ + batchSize: 1, + filters: { + channelID: channel2._id + } + }) + .expect(403) + }) + + it('should fail to do bulk rerun (intenal server error)', async () => { + const stub = sinon.stub(TransactionModelAPI, 'count') + stub.callsFake((() => Promise.reject())) + const res = await request(constants.BASE_URL) + .post('/bulkrerun') + .set('auth-username', testUtils.nonRootUser.email) + .set('auth-ts', authDetails.authTS) + .set('auth-salt', authDetails.authSalt) + .set('auth-token', authDetails.authToken) + .send({ + batchSize: 1, + filters: {} + }) + .expect(500) + stub.restore() + }) + }) + describe('*findTransactionByClientId (clientId)', () => { it('should call findTransactionByClientId', async () => { const tx = await new TransactionModel( diff --git a/test/unit/tasksTest.js b/test/unit/tasksTest.js index 1453824a1..d9da82b26 100644 --- a/test/unit/tasksTest.js +++ b/test/unit/tasksTest.js @@ -309,62 +309,62 @@ describe('Rerun Task Tests', () => { }) }) - describe('findAndProcessAQueuedTask', async () => { - const DEFAULT_CHANNEL = Object.freeze({ - name: 'testChannel', - urlPattern: '.+', - type: 'http', - routes: [ - { - name: 'asdf', - host: 'localhost', - path: '/test1', - port: '12345' - } - ], - updatedBy: { - id: new ObjectId(), - name: 'Test' + const DEFAULT_CHANNEL = Object.freeze({ + name: 'testChannel', + urlPattern: '.+', + type: 'http', + routes: [ + { + name: 'asdf', + host: 'localhost', + path: '/test1', + port: '12345' } - }) + ], + updatedBy: { + id: new ObjectId(), + name: 'Test' + } + }) - const DEFAULT_TASK = Object.freeze({ - status: 'Queued', - user: 'user' - }) + const DEFAULT_TASK = Object.freeze({ + status: 'Queued', + user: 'user' + }) - const DEFAULT_TRANSACTION = Object.freeze({ - status: 'Processing', - request: { - timestamp: new Date() - } - }) + const DEFAULT_TRANSACTION = Object.freeze({ + status: 'Processing', + request: { + timestamp: new Date() + } + }) - function createTask(transactions = [], taskOverrides = {}) { - const taskDoc = Object.assign( - {}, - DEFAULT_TASK, - { - remainingTransactions: transactions.length, - totalTransactions: transactions.length, - transactions: transactions.map(t => ({ - tid: t._id, - tstatus: 'Queued' - })) - }, - taskOverrides - ) + function createTask(transactions = [], taskOverrides = {}) { + const taskDoc = Object.assign( + {}, + DEFAULT_TASK, + { + remainingTransactions: transactions.length, + totalTransactions: transactions.length, + transactions: transactions.map(t => ({ + tid: t._id, + tstatus: 'Queued' + })) + }, + taskOverrides + ) - return new TaskModel(taskDoc).save() - } + return new TaskModel(taskDoc).save() + } - const clearTasksFn = () => - Promise.all([ - TaskModel.deleteMany({}), - TransactionModel.deleteMany({}), - ChannelModel.deleteMany({}) - ]) + const clearTasksFn = () => + Promise.all([ + TaskModel.deleteMany({}), + TransactionModel.deleteMany({}), + ChannelModel.deleteMany({}) + ]) + describe('findAndProcessAQueuedTask', async () => { before(async () => { await clearTasksFn() }) @@ -429,14 +429,11 @@ describe('Rerun Task Tests', () => { ) await tasks.findAndProcessAQueuedTask() - spy.callCount.should.eql(2) + spy.callCount.should.eql(3) const updatedTask = await TaskModel.findById(originalTask._id) - updatedTask.status.should.eql('Queued') - updatedTask.remainingTransactions.should.be.equal(1) - updatedTask.transactions[0].tstatus.should.be.equal('Completed') - updatedTask.transactions[1].tstatus.should.be.equal('Completed') - updatedTask.transactions[2].tstatus.should.be.equal('Queued') + updatedTask.status.should.eql('Completed') + updatedTask.remainingTransactions.should.be.equal(0) }) it(`will process the transactions till they are completed`, async () => { @@ -457,14 +454,11 @@ describe('Rerun Task Tests', () => { ) await tasks.findAndProcessAQueuedTask() - spy.callCount.should.eql(2) + spy.callCount.should.eql(3) let updatedTask = await TaskModel.findById(originalTask._id) - updatedTask.status.should.eql('Queued') - updatedTask.remainingTransactions.should.be.equal(1) - updatedTask.transactions[0].tstatus.should.be.equal('Completed') - updatedTask.transactions[1].tstatus.should.be.equal('Completed') - updatedTask.transactions[2].tstatus.should.be.equal('Queued') + updatedTask.status.should.eql('Completed') + updatedTask.remainingTransactions.should.be.equal(0) await tasks.findAndProcessAQueuedTask() spy.callCount.should.eql(3) @@ -501,4 +495,27 @@ describe('Rerun Task Tests', () => { // TODO : Have to add the failed transaction test }) + + describe('*processNextTaskRound', () => { + before(async () => { + await clearTasksFn() + }) + + afterEach(async () => { + await clearTasksFn() + }) + + it('will not process task that is paused', async () => { + const channel = await new ChannelModel(DEFAULT_CHANNEL).save() + const originalTrans = await new TransactionModel( + Object.assign({channelID: channel._id}, DEFAULT_TRANSACTION) + ).save() + const originalTask = await createTask([originalTrans], { + status: 'Paused' + }) + await tasks.processNextTaskRound(originalTask) + const updatedTask = await TaskModel.findById(originalTask._id) + updatedTask.status.should.eql('Paused') + }) + }) }) diff --git a/test/unit/transactionsTest.js b/test/unit/transactionsTest.js index 61727870e..da3322998 100644 --- a/test/unit/transactionsTest.js +++ b/test/unit/transactionsTest.js @@ -3,6 +3,8 @@ /* eslint-env mocha */ import * as transactions from '../../src/api/transactions' +import {TaskModel, TransactionModel} from '../../src/model' +import {ObjectId} from 'mongodb' describe('calculateTransactionBodiesByteLength()', () => { it('should calculate the bodies length of a transaction', async () => { @@ -47,3 +49,43 @@ describe('calculateTransactionBodiesByteLength()', () => { lengthObj.length.should.be.exactly(0) }) }) + +describe('*createRerunTasks', () => { + const transaction = Object.freeze({ + status: 'Failed', + request: { + timestamp: new Date().toISOString() + }, + updatedBy: { + id: new ObjectId(), + name: 'Test' + } + }) + const userEmail = 'example@gmail.com' + + beforeEach(async () => { + await TransactionModel.deleteMany({}) + await TaskModel.deleteMany({}) + }) + + afterEach(async () => { + await TransactionModel.deleteMany({}) + await TaskModel.deleteMany({}) + }) + + it('should create rerun task', async () => { + await TransactionModel(transaction).save() + await transactions.createRerunTasks({}, 1, userEmail, 0, 0, 'Paused', 1) + const tasks = await TaskModel.find() + tasks.length.should.be.exactly(1) + }) + + it('should create multiple rerun tasks', async () => { + await TransactionModel(transaction).save() + await TransactionModel(transaction).save() + + await transactions.createRerunTasks({}, 1, userEmail, 0, 1, '', 1) + const tasks = await TaskModel.find() + tasks.length.should.be.exactly(2) + }) +})