diff --git a/service/lane/update-meta/src/main/kotlin/vdi/lane/meta/UpdateMetaTriggerHandlerImpl.kt b/service/lane/update-meta/src/main/kotlin/vdi/lane/meta/UpdateMetaTriggerHandlerImpl.kt index bd5754c9..3f294d49 100644 --- a/service/lane/update-meta/src/main/kotlin/vdi/lane/meta/UpdateMetaTriggerHandlerImpl.kt +++ b/service/lane/update-meta/src/main/kotlin/vdi/lane/meta/UpdateMetaTriggerHandlerImpl.kt @@ -69,7 +69,7 @@ internal class UpdateMetaTriggerHandlerImpl( kc.fetchMessages(config.kafkaRouterConfig.updateMetaTriggerMessageKey) .forEach { log.info("Received install-meta job for dataset {}/{} from source {}", it.userID, it.datasetID, it.eventSource) - wp.submit { updateMeta(dm, kr, it) } + wp.submit { tryUpdateMeta(dm, kr, it) } } } } @@ -81,9 +81,13 @@ internal class UpdateMetaTriggerHandlerImpl( confirmShutdown() } - private suspend fun updateMeta(dm: DatasetManager, kr: KafkaRouter, msg: EventMessage) { + private suspend fun tryUpdateMeta(dm: DatasetManager, kr: KafkaRouter, msg: EventMessage) { try { updateMeta(dm, msg.userID, msg.datasetID) + } catch (e: PluginException) { + e.log(log::error) + } catch (e: Throwable) { + PluginException.installMeta("N/A", "N/A", msg.userID, msg.datasetID, cause = e).log(log::error) } finally { if (msg.eventSource != EventSource.FullReconciler) kr.sendReconciliationTrigger(msg.userID, msg.datasetID, msg.eventSource) @@ -154,10 +158,27 @@ internal class UpdateMetaTriggerHandlerImpl( val ph = PluginHandlers[datasetMeta.type.name, datasetMeta.type.version]!! datasetMeta.projects - .forEach { projectID -> updateTargetMeta(ph, datasetMeta, metaTimestamp, datasetID, projectID, userID) } + .forEach { projectID -> tryUpdateTargetMeta(ph, datasetMeta, metaTimestamp, datasetID, projectID, userID) } timer.observeDuration() } + + private suspend fun tryUpdateTargetMeta( + ph: PluginHandler, + meta: VDIDatasetMeta, + metaTimestamp: OffsetDateTime, + datasetID: DatasetID, + projectID: ProjectID, + userID: UserID, + ) { + try { + updateTargetMeta(ph, meta, metaTimestamp, datasetID, projectID, userID) + } catch (e: PluginException) { + throw e + } catch (e: Throwable) { + throw PluginException.installMeta(ph.displayName, projectID, userID, datasetID, cause = e) + } + } private suspend fun updateTargetMeta( ph: PluginHandler, @@ -172,8 +193,7 @@ internal class UpdateMetaTriggerHandlerImpl( return } - val appDb = appDB.accessor(projectID) - if (appDb == null) { + val appDb = appDB.accessor(projectID) or { log.info("skipping dataset {}/{}, project {} update meta due to target being disabled", userID, datasetID, projectID) return } @@ -247,7 +267,7 @@ internal class UpdateMetaTriggerHandlerImpl( val result = try { ph.client.postInstallMeta(datasetID, projectID, meta) } catch (e: Throwable){ - throw PluginRequestException("install-meta", ph.displayName, projectID, userID, datasetID, e) + throw PluginRequestException.installMeta(ph.displayName, projectID, userID, datasetID, cause = e) } Metrics.MetaUpdates.count.labels(meta.type.name, meta.type.version, result.responseCode.toString()).inc()