Skip to content

Commit

Permalink
add callback to enable coroutines to shutdown the service
Browse files Browse the repository at this point in the history
  • Loading branch information
Foxcapades committed Apr 18, 2024
1 parent 2b9b7bd commit ed5a308
Show file tree
Hide file tree
Showing 25 changed files with 151 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import org.slf4j.LoggerFactory
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

abstract class AbstractJobExecutor(name: String, private val wakeInterval: Duration = 2.seconds)
abstract class AbstractJobExecutor(
name: String,
abortCB: (String?) -> Nothing,
private val wakeInterval: Duration = 2.seconds
)
: VDIModule
, AbstractVDIModule(name)
, AbstractVDIModule(name, abortCB)
{
private val log = LoggerFactory.getLogger(javaClass)

Expand All @@ -28,7 +32,12 @@ abstract class AbstractJobExecutor(name: String, private val wakeInterval: Durat
if (isShutDown())
break

runJob()
try {
runJob()
} catch (e: Throwable) {
log.error("executor $name run failed with exception", e)
abortCB(e.message)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import vdi.component.s3.DatasetManager
*
* @author Elizabeth Paige Harper - https://github.com/foxcapades
*/
abstract class AbstractVDIModule(override val name: String) : VDIModule {
abstract class AbstractVDIModule(override val name: String, protected val abortCB: (String?) -> Nothing) : VDIModule {
private val log = LoggerFactory.getLogger(javaClass)

protected val shutdownTrigger = ShutdownSignal()
Expand All @@ -37,7 +37,12 @@ abstract class AbstractVDIModule(override val name: String) : VDIModule {
log.info("starting module {}", name)

started = true
run()
try {
run()
} catch (e: Throwable) {
log.error("module $name execution failed", e)
abortCB(e.message)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import vdi.component.modules.AbstractJobExecutor
import kotlin.time.Duration.Companion.milliseconds
import vdi.component.reinstaller.DatasetReinstaller as Reinstaller

internal class DatasetReinstallerImpl(private val config: DatasetReinstallerConfig)
internal class DatasetReinstallerImpl(private val config: DatasetReinstallerConfig, abortCB: (String?) -> Nothing)
: DatasetReinstaller
, AbstractJobExecutor("dataset-reinstaller", config.wakeInterval)
, AbstractJobExecutor("dataset-reinstaller", abortCB, config.wakeInterval)
{
private val log = LoggerFactory.getLogger(javaClass)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package vdi.daemon.reinstaller

fun DatasetReinstaller(config: DatasetReinstallerConfig = DatasetReinstallerConfig()): DatasetReinstaller =
DatasetReinstallerImpl(config)
fun DatasetReinstaller(
abortCB: (String?) -> Nothing,
config: DatasetReinstallerConfig = DatasetReinstallerConfig(),
): DatasetReinstaller =
DatasetReinstallerImpl(config, abortCB)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vdi.daemon.events.routing

import com.fasterxml.jackson.module.kotlin.readValue
import kotlinx.coroutines.delay
import org.slf4j.LoggerFactory
import org.veupathdb.vdi.lib.common.field.DatasetID
import org.veupathdb.vdi.lib.common.field.UserID
Expand All @@ -15,8 +16,15 @@ import vdi.component.s3.paths.VDDatasetShareFilePath
import vdi.component.s3.paths.toVDPathOrNull
import vdi.daemon.events.routing.model.MinIOEvent
import vdi.daemon.events.routing.model.MinIOEventAction
import kotlin.time.Duration.Companion.milliseconds

internal class EventRouterImpl(private val config: EventRouterConfig) : EventRouter, AbstractVDIModule("event-router") {
private const val MaxPollingRetries = 5
private const val PollingRetryDelayMS = 500

internal class EventRouterImpl(private val config: EventRouterConfig, abortCB: (String?) -> Nothing)
: EventRouter
, AbstractVDIModule("event-router", abortCB)
{

private val log = LoggerFactory.getLogger(javaClass)

Expand Down Expand Up @@ -149,12 +157,17 @@ internal class EventRouterImpl(private val config: EventRouterConfig) : EventRou
}
}

private suspend fun RabbitMQEventIterator<*>.safeHasNext() =
private suspend fun RabbitMQEventIterator<*>.safeHasNext(currentTry: Int = 1): Boolean =
try {
hasNext()
} catch (e: Throwable) {
triggerShutdown()
log.error("failed to poll RabbitMQ for next message", e)
throw e
if (currentTry <= MaxPollingRetries) {
log.error("failed to poll RabbitMQ for next message, trying again in ${PollingRetryDelayMS}ms (attempt $currentTry/$MaxPollingRetries)", e)
delay(PollingRetryDelayMS.milliseconds)
safeHasNext(currentTry+1)
} else {
log.error("failed to poll RabbitMQ for next message $MaxPollingRetries times", e)
abortCB(e.message)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ package vdi.daemon.events.routing
*
* @return a new [EventRouter] instance.
*/
fun EventRouter(config: EventRouterConfig = EventRouterConfig()): EventRouter = EventRouterImpl(config)
fun EventRouter(abortCB: (String?) -> Nothing, config: EventRouterConfig = EventRouterConfig()): EventRouter =
EventRouterImpl(config, abortCB)
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import vdi.component.pruner.Pruner
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

internal class PrunerModuleImpl(private val config: PrunerModuleConfig) : PrunerModule,
AbstractJobExecutor("pruner", config.wakeupInterval) {
internal class PrunerModuleImpl(private val config: PrunerModuleConfig, abortCB: (String?) -> Nothing) : PrunerModule,
AbstractJobExecutor("pruner", abortCB, config.wakeupInterval) {

private val log = LoggerFactory.getLogger(javaClass)

Expand Down
4 changes: 2 additions & 2 deletions daemons/pruner/src/main/kotlin/vdi/daemon/pruner/index.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ package vdi.daemon.pruner
*
* @return A new [PrunerModule] instance.
*/
fun PrunerModule(config: PrunerModuleConfig = PrunerModuleConfig()): PrunerModule =
PrunerModuleImpl(config)
fun PrunerModule(abortCB: (String?) -> Nothing, config: PrunerModuleConfig = PrunerModuleConfig()): PrunerModule =
PrunerModuleImpl(config, abortCB)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import vdi.component.modules.AbstractJobExecutor
import vdi.component.s3.DatasetManager
import kotlin.time.Duration.Companion.milliseconds

internal class ReconcilerImpl(private val config: ReconcilerConfig) : Reconciler, AbstractJobExecutor("reconciler") {
internal class ReconcilerImpl(private val config: ReconcilerConfig, abortCB: (String?) -> Nothing)
: Reconciler
, AbstractJobExecutor("reconciler", abortCB)
{
private var datasetManager: DatasetManager

private var kafkaRouter: KafkaRouter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
package vdi.daemon.reconciler

fun Reconciler(config: ReconcilerConfig = ReconcilerConfig()): Reconciler = ReconcilerImpl(config)
fun Reconciler(abortCB: (String?) -> Nothing, config: ReconcilerConfig = ReconcilerConfig()): Reconciler =
ReconcilerImpl(config, abortCB)
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import kotlinx.coroutines.launch
import org.slf4j.LoggerFactory
import vdi.component.modules.AbstractVDIModule

internal class HardDeleteTriggerHandlerImpl(private val config: HardDeleteTriggerHandlerConfig)
internal class HardDeleteTriggerHandlerImpl(private val config: HardDeleteTriggerHandlerConfig, abortCB: (String?) -> Nothing)
: HardDeleteTriggerHandler
, AbstractVDIModule("hard-delete-trigger-handler")
, AbstractVDIModule("hard-delete-trigger-handler", abortCB)
{
private val log = LoggerFactory.getLogger(javaClass)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package vdi.lane.delete.hard

fun HardDeleteTriggerHandler(config: HardDeleteTriggerHandlerConfig = HardDeleteTriggerHandlerConfig()): HardDeleteTriggerHandler =
HardDeleteTriggerHandlerImpl(config)
fun HardDeleteTriggerHandler(
abortCB: (String?) -> Nothing,
config: HardDeleteTriggerHandlerConfig = HardDeleteTriggerHandlerConfig()
): HardDeleteTriggerHandler =
HardDeleteTriggerHandlerImpl(config, abortCB)
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.io.path.*

internal class ImportTriggerHandlerImpl(private val config: ImportTriggerHandlerConfig)
internal class ImportTriggerHandlerImpl(private val config: ImportTriggerHandlerConfig, abortCB: (String?) -> Nothing)
: ImportTriggerHandler
, AbstractVDIModule("import-trigger-handler")
, AbstractVDIModule("import-trigger-handler", abortCB)
{
private val log = logger()

Expand All @@ -49,8 +49,6 @@ internal class ImportTriggerHandlerImpl(private val config: ImportTriggerHandler

private val cacheDB = vdi.component.db.cache.CacheDB()

override val name = "import lane"

override suspend fun run() {
log.trace("run()")

Expand Down
7 changes: 5 additions & 2 deletions lanes/import/src/main/kotlin/vdi/lane/imports/index.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@ package vdi.lane.imports

import vdi.lane.imports.config.ImportTriggerHandlerConfig

fun ImportTriggerHandler(config: ImportTriggerHandlerConfig = ImportTriggerHandlerConfig()): ImportTriggerHandler =
ImportTriggerHandlerImpl(config)
fun ImportTriggerHandler(
abortCB: (String?) -> Nothing,
config: ImportTriggerHandlerConfig = ImportTriggerHandlerConfig()
): ImportTriggerHandler =
ImportTriggerHandlerImpl(config, abortCB)
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ import kotlin.io.path.deleteIfExists
import kotlin.io.path.inputStream
import kotlin.io.path.outputStream

internal class InstallDataTriggerHandlerImpl(private val config: InstallTriggerHandlerConfig)
internal class InstallDataTriggerHandlerImpl(
private val config: InstallTriggerHandlerConfig,
abortCB: (String?) -> Nothing
)
: InstallDataTriggerHandler
, AbstractVDIModule("install-data-trigger-handler")
, AbstractVDIModule("install-data-trigger-handler", abortCB)
{
private val log = LoggerFactory.getLogger(javaClass)

Expand Down
7 changes: 5 additions & 2 deletions lanes/install/src/main/kotlin/vdi/lane/install/index.kt
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package vdi.lane.install

fun InstallDataTriggerHandler(config: InstallTriggerHandlerConfig = InstallTriggerHandlerConfig()): InstallDataTriggerHandler =
InstallDataTriggerHandlerImpl(config)
fun InstallDataTriggerHandler(
abortCB: (String?) -> Nothing,
config: InstallTriggerHandlerConfig = InstallTriggerHandlerConfig()
): InstallDataTriggerHandler =
InstallDataTriggerHandlerImpl(config, abortCB)
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import vdi.component.metrics.Metrics
import vdi.component.modules.AbstractVDIModule
import java.util.concurrent.ConcurrentHashMap

internal class ReconciliationEventHandlerImpl(private val config: ReconciliationEventHandlerConfig)
internal class ReconciliationEventHandlerImpl(
private val config: ReconciliationEventHandlerConfig,
abortCB: (String?) -> Nothing
)
: ReconciliationEventHandler
, AbstractVDIModule("reconciliation-event-handler")
, AbstractVDIModule("reconciliation-event-handler", abortCB)
{
private val log = LoggerFactory.getLogger(javaClass)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package vdi.lane.reconciliation

fun ReconciliationEventHandler(config: ReconciliationEventHandlerConfig = ReconciliationEventHandlerConfig()): ReconciliationEventHandler =
ReconciliationEventHandlerImpl(config)
fun ReconciliationEventHandler(
abortCB: (String?) -> Nothing,
config: ReconciliationEventHandlerConfig = ReconciliationEventHandlerConfig()
): ReconciliationEventHandler =
ReconciliationEventHandlerImpl(config, abortCB)
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ private data class ShareInfo(
* This trigger handler processes trigger events for dataset shares being
* created or removed.
*/
internal class ShareTriggerHandlerImpl(private val config: ShareTriggerHandlerConfig)
: ShareTriggerHandler
, AbstractVDIModule("share-trigger-handler") {
internal class ShareTriggerHandlerImpl(private val config: ShareTriggerHandlerConfig, abortCB: (String?) -> Nothing)
: ShareTriggerHandler
, AbstractVDIModule("share-trigger-handler", abortCB)
{

private val log = LoggerFactory.getLogger(javaClass)

Expand Down
7 changes: 5 additions & 2 deletions lanes/sharing/src/main/kotlin/vdi/lane/sharing/index.kt
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package vdi.lane.sharing

fun ShareTriggerHandler(config: ShareTriggerHandlerConfig = ShareTriggerHandlerConfig()): ShareTriggerHandler =
ShareTriggerHandlerImpl(config)
fun ShareTriggerHandler(
abortCB: (String?) -> Nothing,
config: ShareTriggerHandlerConfig = ShareTriggerHandlerConfig()
): ShareTriggerHandler =
ShareTriggerHandlerImpl(config, abortCB)
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import vdi.component.plugin.client.response.uni.UninstallResponseType
import vdi.component.plugin.client.response.uni.UninstallUnexpectedErrorResponse
import vdi.component.plugin.mapping.PluginHandlers

internal class SoftDeleteTriggerHandlerImpl(private val config: SoftDeleteTriggerHandlerConfig)
internal class SoftDeleteTriggerHandlerImpl(
private val config: SoftDeleteTriggerHandlerConfig,
abortCB: (String?) -> Nothing
)
: SoftDeleteTriggerHandler
, AbstractVDIModule("soft-delete-trigger-handler")
, AbstractVDIModule("soft-delete-trigger-handler", abortCB)
{
private val log = LoggerFactory.getLogger(javaClass)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package vdi.lane.delete.soft

fun SoftDeleteTriggerHandler(config: SoftDeleteTriggerHandlerConfig = SoftDeleteTriggerHandlerConfig()): SoftDeleteTriggerHandler =
SoftDeleteTriggerHandlerImpl(config)
fun SoftDeleteTriggerHandler(
abortCB: (String?) -> Nothing,
config: SoftDeleteTriggerHandlerConfig = SoftDeleteTriggerHandlerConfig()
): SoftDeleteTriggerHandler =
SoftDeleteTriggerHandlerImpl(config, abortCB)
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ import vdi.component.s3.DatasetManager
import java.sql.SQLException
import java.time.OffsetDateTime

internal class UpdateMetaTriggerHandlerImpl(private val config: UpdateMetaTriggerHandlerConfig)
internal class UpdateMetaTriggerHandlerImpl(
private val config: UpdateMetaTriggerHandlerConfig,
abortCB: (String?) -> Nothing,
)
: UpdateMetaTriggerHandler
, AbstractVDIModule("update-meta-trigger-handler")
, AbstractVDIModule("update-meta-trigger-handler", abortCB)
{
private val log = LoggerFactory.getLogger(javaClass)

Expand Down
7 changes: 5 additions & 2 deletions lanes/update-meta/src/main/kotlin/vdi/lane/meta/index.kt
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package vdi.lane.meta

fun UpdateMetaTriggerHandler(config: UpdateMetaTriggerHandlerConfig = UpdateMetaTriggerHandlerConfig()): UpdateMetaTriggerHandler =
UpdateMetaTriggerHandlerImpl(config)
fun UpdateMetaTriggerHandler(
abortCB: (String?) -> Nothing,
config: UpdateMetaTriggerHandlerConfig = UpdateMetaTriggerHandlerConfig()
): UpdateMetaTriggerHandler =
UpdateMetaTriggerHandlerImpl(config, abortCB)
Loading

0 comments on commit ed5a308

Please sign in to comment.