Skip to content

Commit

Permalink
Use SELECT ... FOR UPDATE correctly in a transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
sausage-todd committed Feb 26, 2024
1 parent 67547bf commit 1c99cf0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 32 deletions.
62 changes: 31 additions & 31 deletions services/apps/integration_data_worker/src/jobs/processOldData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,45 +14,45 @@ export const processOldDataJob = async (
): Promise<void> => {
const store = new DbStore(log, dbConn)
const repo = new IntegrationDataRepository(store, log)
const service = new IntegrationDataService(
redis,
streamWorkerEmitter,
dataSinkWorkerEmitter,
store,
log,
)

const loadNextBatch = async (): Promise<string[]> => {
return await repo.transactionally(async (txRepo) => {
const dataIds = await txRepo.getOldDataToProcess(5)
await txRepo.touchUpdatedAt(dataIds)
return dataIds
})
}

// load 5 oldest apiData and try process them
let dataToProcess = await loadNextBatch()

let successCount = 0
let errorCount = 0

while (dataToProcess.length > 0) {
for (const dataId of dataToProcess) {
try {
const result = await service.processData(dataId)
if (result) {
successCount++
} else {
while (true) {
const processedSomething = await repo.transactionally(async (txRepo) => {
const dataIds = await txRepo.getOldDataToProcess(5)
await txRepo.touchUpdatedAt(dataIds)

const txService = new IntegrationDataService(
redis,
streamWorkerEmitter,
dataSinkWorkerEmitter,
store,
log,
txRepo,
)

for (const dataId of dataIds) {
try {
const result = await txService.processData(dataId)
if (result) {
successCount++
} else {
errorCount++
}
} catch (err) {
log.error(err, 'Failed to process data!')
errorCount++
}
} catch (err) {
log.error(err, 'Failed to process data!')
errorCount++
}
}

log.info(`Processed ${successCount} old data successfully and ${errorCount} with errors.`)
log.info(`Processed ${successCount} old data successfully and ${errorCount} with errors.`)

dataToProcess = await loadNextBatch()
return dataIds.length > 0
})

if (!processedSomething) {
break
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ export default class IntegrationDataService extends LoggerBase {
private readonly dataSinkWorkerEmitter: DataSinkWorkerEmitter,
store: DbStore,
parentLog: Logger,
repo?: IntegrationDataRepository,
) {
super(parentLog)

this.repo = new IntegrationDataRepository(store, this.log)
if (repo) {
this.repo = repo
} else {
this.repo = new IntegrationDataRepository(store, this.log)
}
}

private async triggerRunError(
Expand Down

0 comments on commit 1c99cf0

Please sign in to comment.