Skip to content

Commit

Permalink
Merge pull request #223 from VEuPathDB/install-data-locking
Browse files Browse the repository at this point in the history
Install data locking
  • Loading branch information
Foxcapades authored Feb 14, 2024
2 parents 5493971 + 62999e2 commit 5822998
Showing 1 changed file with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import vdi.component.modules.VDIServiceModuleBase
import java.nio.file.Path
import java.sql.SQLException
import java.time.OffsetDateTime
import java.util.concurrent.ConcurrentHashMap
import kotlin.io.path.inputStream
import kotlin.io.path.outputStream

Expand All @@ -40,6 +41,8 @@ internal class InstallDataTriggerHandlerImpl(private val config: InstallTriggerH
{
private val log = LoggerFactory.getLogger(javaClass)

private val datasetsInProgress = ConcurrentHashMap.newKeySet<DatasetID>(32)

override suspend fun run() {
val kc = requireKafkaConsumer(config.installDataTriggerTopic, config.kafkaConsumerConfig)
val dm = DatasetManager(requireS3Bucket(requireS3Client(config.s3Config), config.s3Bucket))
Expand All @@ -53,7 +56,7 @@ internal class InstallDataTriggerHandlerImpl(private val config: InstallTriggerH
kc.fetchMessages(config.installDataTriggerMessageKey, InstallTrigger::class)
.forEach { (userID, datasetID) ->
log.info("received install job for dataset $datasetID, user $userID")
wp.submit { executeJob(userID, datasetID, dm) }
wp.submit { tryExecute(userID, datasetID, dm) }
}
}

Expand All @@ -66,6 +69,18 @@ internal class InstallDataTriggerHandlerImpl(private val config: InstallTriggerH
confirmShutdown()
}

private fun tryExecute(userID: UserID, datasetID: DatasetID, dm: DatasetManager) {
if (datasetsInProgress.add(datasetID)) {
try {
executeJob(userID, datasetID, dm)
} finally {
datasetsInProgress.remove(datasetID)
}
} else {
log.info("data installation already in progress for dataset {}/{}, skipping install job", userID, datasetID)
}
}

/**
* Execute Job 1
*
Expand Down Expand Up @@ -448,4 +463,4 @@ internal class InstallDataTriggerHandlerImpl(private val config: InstallTriggerH

throw Exception(res.message)
}
}
}

0 comments on commit 5822998

Please sign in to comment.