Skip to content

Commit

Permalink
Merge branch 'main' into fatality
Browse files Browse the repository at this point in the history
  • Loading branch information
Foxcapades committed Apr 18, 2024
2 parents ed5a308 + 81d73f4 commit bf27d95
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 16 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ dependencies {
implementation("org.veupathdb.lib:jaxrs-container-core")

implementation("org.slf4j:slf4j-api")
implementation("org.slf4j:jul-to-slf4j")
implementation("org.apache.logging.log4j:log4j-api")
implementation("org.apache.logging.log4j:log4j-core")
implementation("org.apache.logging.log4j:log4j-slf4j-impl")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package vdi.component.async

import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

Expand All @@ -19,6 +20,8 @@ class AtomicULong(initialValue: ULong = 0uL) {
}
}

suspend fun get() = mtx.withLock { atm }

suspend operator fun dec(): AtomicULong {
mtx.withLock { atm-- }
return this
Expand All @@ -31,4 +34,10 @@ class AtomicULong(initialValue: ULong = 0uL) {
suspend operator fun minusAssign(value: ULong) {
mtx.withLock { atm -= value }
}

override fun toString() = runBlocking { mtx.withLock { atm.toString() } }

override fun hashCode() = runBlocking { mtx.withLock { atm.hashCode() } }

override fun equals(other: Any?) = other is AtomicULong && runBlocking { mtx.withLock { atm } == other.get() }
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package vdi.component.async

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.logging.log4j.kotlin.CoroutineThreadContext
import org.apache.logging.log4j.kotlin.ThreadContextData
import org.apache.logging.log4j.kotlin.logger
Expand All @@ -27,7 +24,7 @@ class WorkerPool(
fun start() {
log.info("starting worker pool $name with queue size $jobQueueSize and worker count $workerCount")

runBlocking {
runBlocking(Dispatchers.IO) {
repeat(workerCount) { i ->
val j = i + 1
launch (CoroutineThreadContext(contextData = ThreadContextData(map = mapOf("workerID" to "$name-$j"), Stack()))) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package vdi.component.reinstaller

import kotlinx.coroutines.sync.Mutex
import org.slf4j.LoggerFactory
import org.veupathdb.lib.s3.s34k.S3Api
import org.veupathdb.vdi.lib.common.compression.Zip
Expand All @@ -24,7 +25,6 @@ import vdi.component.s3.DatasetManager
import vdi.component.s3.paths.S3Paths
import java.io.InputStream
import java.nio.file.Path
import java.util.concurrent.locks.ReentrantLock
import kotlin.io.path.deleteIfExists
import kotlin.io.path.inputStream
import kotlin.io.path.outputStream
Expand All @@ -33,7 +33,7 @@ object DatasetReinstaller {

private val log = LoggerFactory.getLogger(javaClass)

private val lock = ReentrantLock()
private val lock = Mutex()

private val config = DatasetReinstallerConfig()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal class DatasetReinstallerImpl(private val config: DatasetReinstallerConf
private var lastRun = 0.milliseconds

override suspend fun runJob() {
val now = now()
val now = 0.milliseconds

if (lastRun + config.runInterval < now) {
log.info("attempting to start automatic dataset reinstaller run")
Expand Down
45 changes: 45 additions & 0 deletions daemons/rest-service/api.raml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ title: VEuPathDB Dataset Installer
version: 1.0.0
mediaType: application/json

traits:
adminEnabled:
headers:
Admin-Token?: string
adminRequired:
headers:
Admin-Token: string

uses:
lib: schema/library.raml

Expand Down Expand Up @@ -434,6 +442,43 @@ uses:
message: Dataset store is unreachable
requestId: b296c3d9-4032-41b1-906e-c97ccfc447e3

/run-reinstaller:
displayName: Execute Dataset Reinstaller
post: |
Runs dataset reinstaller process which will scan all target application
databases for datasets in the `ready-for-reinstall` status and attempts to
uninstall and then reinstall them.
This endpoint executes the dataset reinstaller process synchronously,
meaning calls to this endpoint through Apache will timeout.
headers:
Admin-Token: string
responses:
204:
description: |
Success.
Dataset reinstall process completed. This status does not relate in
any way to the success or failure of any or all attempted reinstalls.
401:
description: Unauthorized.
body:
application/json:
type: lib.UnauthorizedError
example:
status: unauthorized
message: Users must be logged in to access this resource.
500:
description: Internal Server Error.
body:
application/json:
type: lib.ServerError
example:
status: server-error
message: Dataset store is unreachable
requestId: b296c3d9-4032-41b1-906e-c97ccfc447e3


/install-cleanup:
displayName: Install Cleanup
post:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import com.fasterxml.jackson.module.kotlin.readValue
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.apache.logging.log4j.kotlin.logger
import org.veupathdb.vdi.lib.common.DatasetManifestFilename
import org.veupathdb.vdi.lib.common.DatasetMetaFilename
Expand All @@ -19,6 +21,7 @@ import org.veupathdb.vdi.lib.common.util.isNull
import org.veupathdb.vdi.lib.common.util.or
import org.veupathdb.vdi.lib.json.JSON
import vdi.component.async.WorkerPool
import vdi.component.db.cache.CacheDB
import vdi.component.db.cache.CacheDBTransaction
import vdi.component.db.cache.model.DatasetImpl
import vdi.component.db.cache.model.DatasetImportStatus
Expand All @@ -33,8 +36,6 @@ import vdi.lane.imports.config.ImportTriggerHandlerConfig
import vdi.lane.imports.model.WarningsFile
import java.nio.file.Path
import java.time.OffsetDateTime
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.io.path.*

internal class ImportTriggerHandlerImpl(private val config: ImportTriggerHandlerConfig, abortCB: (String?) -> Nothing)
Expand All @@ -43,11 +44,11 @@ internal class ImportTriggerHandlerImpl(private val config: ImportTriggerHandler
{
private val log = logger()

private val lock = ReentrantLock()
private val lock = Mutex()

private val activeIDs = HashSet<DatasetID>(24)

private val cacheDB = vdi.component.db.cache.CacheDB()
private val cacheDB = CacheDB()

override suspend fun run() {
log.trace("run()")
Expand Down Expand Up @@ -159,7 +160,7 @@ internal class ImportTriggerHandlerImpl(private val config: ImportTriggerHandler
}

cacheDB.withTransaction {
log.info("attempting to insert import control record (if one does not exist)")
log.info("attempting to insert import control record (if one does not exist) for dataset $datasetID")
it.upsertImportControl(datasetID, DatasetImportStatus.InProgress)
}

Expand All @@ -180,7 +181,7 @@ internal class ImportTriggerHandlerImpl(private val config: ImportTriggerHandler
ImportResponseType.UnhandledError -> handleImport500Result(userID, datasetID, result as ImportUnhandledErrorResponse)
}
} catch (e: Throwable) {
log.debug("import request to handler server failed with exception:", e)
log.error("import request to handler server for dataset $datasetID failed with exception:", e)
cacheDB.withTransaction { tran ->
tran.updateImportControl(datasetID, DatasetImportStatus.Failed)
tran.tryInsertImportMessages(datasetID, "Process error: ${e.message}")
Expand Down Expand Up @@ -310,7 +311,7 @@ internal class ImportTriggerHandlerImpl(private val config: ImportTriggerHandler
))
}

private fun vdi.component.db.cache.CacheDB.initializeDataset(datasetID: DatasetID, meta: VDIDatasetMeta) {
private fun CacheDB.initializeDataset(datasetID: DatasetID, meta: VDIDatasetMeta) {
log.trace("CacheDB.initializeDataset(datasetID: $datasetID, meta: $meta)")
openTransaction().use {
try {
Expand Down
1 change: 1 addition & 0 deletions platform/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {

// Logging
api("org.slf4j:slf4j-api:1.7.36")
api("org.slf4j:jul-to-slf4j:1.7.36")
api("org.apache.logging.log4j:log4j-api-kotlin:1.4.0")
api("org.apache.logging.log4j:log4j-api:2.23.1")
api("org.apache.logging.log4j:log4j-core:2.23.1")
Expand Down
9 changes: 9 additions & 0 deletions src/main/kotlin/vdi/bootstrap/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
import vdi.component.modules.VDIModule
import org.slf4j.bridge.SLF4JBridgeHandler
import org.veupathdb.service.vdi.RestService
import vdi.daemon.events.routing.EventRouter
import vdi.daemon.pruner.PrunerModule
import vdi.daemon.reconciler.Reconciler
Expand All @@ -16,6 +18,7 @@ import vdi.lane.install.InstallDataTriggerHandler
import vdi.lane.meta.UpdateMetaTriggerHandler
import vdi.lane.reconciliation.ReconciliationEventHandler
import vdi.lane.sharing.ShareTriggerHandler
import kotlin.concurrent.thread
import kotlin.system.exitProcess

object Main {
Expand All @@ -24,6 +27,10 @@ object Main {

@JvmStatic
fun main(args: Array<String>) {
// JUL -> SLF4J
SLF4JBridgeHandler.removeHandlersForRootLogger()
SLF4JBridgeHandler.install()

log.info("initializing modules")
val modules = listOf(
DatasetReinstaller(::fatality),
Expand All @@ -41,6 +48,8 @@ object Main {

Runtime.getRuntime().addShutdownHook(Thread { shutdownModules(modules) })

thread { RestService.main(args) }

log.info("starting modules")
runBlocking(Dispatchers.Unconfined) { modules.forEach { launch { it.start() } } }
}
Expand Down
2 changes: 1 addition & 1 deletion startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ waitFor cache-db cache-db 5432
waitFor rabbit "$GLOBAL_RABBIT_HOST" "${GLOBAL_RABBIT_PORT:-5672}"
waitFor minio "$S3_HOST" "$S3_PORT"

exec java -jar -XX:+CrashOnOutOfMemoryError $JVM_MEM_ARGS $JVM_ARGS /service.jar
exec java -jar -XX:+CrashOnOutOfMemoryError $JVM_MEM_ARGS $JVM_ARGS -Djdk.httpclient.HttpClient.log= /service.jar

0 comments on commit bf27d95

Please sign in to comment.