diff --git a/src/main.ts b/src/main.ts index 1972e60..5dfb2cd 100644 --- a/src/main.ts +++ b/src/main.ts @@ -148,6 +148,11 @@ yargs(process.argv.slice(2)) if (receiveMessageResponse.Messages?.length) { const message = receiveMessageResponse.Messages[0]; const body = Task.parse(JSON.parse(message.Body!)); + const taskCompletionStatus = await checkTaskCompletionStatus(ctx, job.id, body.id); + if (taskCompletionStatus === "COMPLETED") { + console.log(`Task ${body.id} already completed, skipping.`); + continue; + } const command = [...args.worker, body.id]; let visibilityTimeoutHandle = setInterval(async () => { await updateMessageVisibilityTimeout( @@ -297,6 +302,23 @@ async function getPreviouslyRunTaskStatuses( return response; } +async function checkTaskCompletionStatus( + { dynamodbClient }: Context, + jobId: string, + taskId: string +): Promise { + const response = await dynamodbClient.send( + new dynamodb.GetItemCommand({ + TableName: env.PARALLELIZER_DYNAMODB_TABLE, + Key: { + TaskListId: { S: jobId }, + TaskId: { S: taskId }, + }, + }) + ); + return response.Item?.Status?.S || "PENDING"; +} + async function updateTaskStatusInDynamoDB( { dynamodbClient }: Context, jobId: string,