Skip to content

Commit

Permalink
Merge pull request #1208 from jembi/TB-173-resolve-forever-processing…
Browse files Browse the repository at this point in the history
…-transactions

TB-173 Handle transactions stuck in processing state
  • Loading branch information
arran-standish authored Sep 15, 2023
2 parents 4ab86ea + cade24e commit 563d83c
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 16 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "openhim-core",
"description": "The OpenHIM core application that provides logging and routing of http requests",
"version": "8.1.1",
"version": "8.1.2",
"main": "./lib/server.js",
"bin": {
"openhim-core": "./bin/openhim-core.js"
Expand Down
8 changes: 1 addition & 7 deletions src/middleware/messageStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@ import * as metrics from '../metrics'
import * as transactions from '../model/transactions'
import * as utils from '../utils'

export const transactionStatus = {
PROCESSING: 'Processing',
SUCCESSFUL: 'Successful',
COMPLETED: 'Completed',
COMPLETED_W_ERR: 'Completed with error(s)',
FAILED: 'Failed'
}
const { transactionStatus } = transactions

function copyMapWithEscapedReservedCharacters(map) {
const escapedMap = {}
Expand Down
39 changes: 32 additions & 7 deletions src/model/transactions.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ import {Schema} from 'mongoose'

import {connectionAPI, connectionDefault} from '../config'

export const transactionStatus = {
PROCESSING: 'Processing',
SUCCESSFUL: 'Successful',
COMPLETED: 'Completed',
COMPLETED_W_ERR: 'Completed with error(s)',
FAILED: 'Failed'
}

// Request Schema definition
const RequestDef = {
host: String,
Expand Down Expand Up @@ -94,13 +102,7 @@ const TransactionSchema = new Schema({
status: {
type: String,
required: true,
enum: [
'Processing',
'Failed',
'Completed',
'Successful',
'Completed with error(s)'
]
enum: Object.values(transactionStatus)
}
})

Expand All @@ -118,3 +120,26 @@ export const TransactionModel = connectionDefault.model(
'Transaction',
TransactionSchema
)

/**
* Resolve a transaction stuck in the processing state
*
* If OpenHIM crashes with an inflight transaction, that transaction's status will stay in processing
* So we run this function at start up and set all those transactions to failed
*
*/
export const resolveStuckProcessingState = async () => {
TransactionModelAPI.find({ status: transactionStatus.PROCESSING })
.cursor()
.on('data', async (transaction) => {
try {
if (transaction.$isEmpty('response') && transaction.$isEmpty('error'))
TransactionModelAPI.findByIdAndUpdate(transaction.id, {
status: transactionStatus.FAILED,
error: { message: 'OpenHIM crashed while still waiting for a response' },
}).exec()
} catch (err) {
console.error(`Error updating transaction stuck in processing: ${err}`)
}
})
}
3 changes: 3 additions & 0 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import * as upgradeDB from './upgradeDB'
import {KeystoreModel} from './model/keystore'
import {UserModel, createUser, updateTokenUser} from './model/users'
import {appRoot, config, connectionAgenda} from './config'
import { resolveStuckProcessingState } from './model/transactions'

mongoose.Promise = Promise

Expand Down Expand Up @@ -886,6 +887,8 @@ if (cluster.isMaster && !module.parent) {

return Promise.all(promises)
.then(() => {
resolveStuckProcessingState()

let audit = atna.construct.appActivityAudit(
true,
himSourceID,
Expand Down
76 changes: 75 additions & 1 deletion test/unit/transactionsTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/* eslint-env mocha */

import * as transactions from '../../src/api/transactions'
import {TaskModel, TransactionModel} from '../../src/model'
import {TaskModel, TransactionModel, resolveStuckProcessingState} from '../../src/model'
import {ObjectId} from 'mongodb'

describe('calculateTransactionBodiesByteLength()', () => {
Expand Down Expand Up @@ -89,3 +89,77 @@ describe('*createRerunTasks', () => {
tasks.length.should.be.exactly(2)
})
})

describe('TransactionModel tests', () => {
describe('.resolveStuckProcessingState()', () => {
const midFlightTransaction = Object.freeze({
status: 'Processing',
request: {
timestamp: new Date().toISOString()
},
updatedBy: {
id: new ObjectId(),
name: 'Test'
}
})

const validProcessingTransaction = Object.freeze({
status: 'Processing',
request: {
timestamp: new Date().toISOString()
},
response: {
status: 200,
timestamp: new Date().toISOString()
},
updatedBy: {
id: new ObjectId(),
name: 'Test'
}
})

const errorProcessingTransaction = Object.freeze({
status: 'Processing',
request: {
timestamp: new Date().toISOString()
},
error: {
message: 'something bad happened',
stack: 'stack trace'
},
updatedBy: {
id: new ObjectId(),
name: 'Test'
}
})

beforeEach(async () => {
await TransactionModel.deleteMany({})
})

afterEach(async () => {
await TransactionModel.deleteMany({})
})

it('should update a processing transaction to failed if no response or error set', async () => {
await TransactionModel(midFlightTransaction).save()

resolveStuckProcessingState()
await new Promise((resolve) => setTimeout(() => { resolve() }, 500))

const transactions = await TransactionModel.find({ status: 'Processing' });
transactions.length.should.be.exactly(0);
})

it('should not update a transaction processing state if response or error set', async () => {
await TransactionModel(validProcessingTransaction).save()
await TransactionModel(errorProcessingTransaction).save()

resolveStuckProcessingState()
await new Promise((resolve) => setTimeout(() => { resolve() }, 500))

const transactions = await TransactionModel.find({ status: 'Processing' });
transactions.length.should.be.exactly(2);
})
})
})

0 comments on commit 563d83c

Please sign in to comment.