Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Foxcapades committed May 17, 2024
1 parent 4b9b287 commit 911dfb6
Showing 1 changed file with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ internal class UpdateMetaTriggerHandlerImpl(
kc.fetchMessages(config.kafkaRouterConfig.updateMetaTriggerMessageKey)
.forEach {
log.info("Received install-meta job for dataset {}/{} from source {}", it.userID, it.datasetID, it.eventSource)
wp.submit { updateMeta(dm, kr, it) }
wp.submit { tryUpdateMeta(dm, kr, it) }
}
}
}
Expand All @@ -81,9 +81,13 @@ internal class UpdateMetaTriggerHandlerImpl(
confirmShutdown()
}

private suspend fun updateMeta(dm: DatasetManager, kr: KafkaRouter, msg: EventMessage) {
private suspend fun tryUpdateMeta(dm: DatasetManager, kr: KafkaRouter, msg: EventMessage) {
try {
updateMeta(dm, msg.userID, msg.datasetID)
} catch (e: PluginException) {
e.log(log::error)
} catch (e: Throwable) {
PluginException.installMeta("N/A", "N/A", msg.userID, msg.datasetID, cause = e).log(log::error)
} finally {
if (msg.eventSource != EventSource.FullReconciler)
kr.sendReconciliationTrigger(msg.userID, msg.datasetID, msg.eventSource)
Expand Down Expand Up @@ -154,10 +158,27 @@ internal class UpdateMetaTriggerHandlerImpl(
val ph = PluginHandlers[datasetMeta.type.name, datasetMeta.type.version]!!

datasetMeta.projects
.forEach { projectID -> updateTargetMeta(ph, datasetMeta, metaTimestamp, datasetID, projectID, userID) }
.forEach { projectID -> tryUpdateTargetMeta(ph, datasetMeta, metaTimestamp, datasetID, projectID, userID) }

timer.observeDuration()
}

private suspend fun tryUpdateTargetMeta(
ph: PluginHandler,
meta: VDIDatasetMeta,
metaTimestamp: OffsetDateTime,
datasetID: DatasetID,
projectID: ProjectID,
userID: UserID,
) {
try {
updateTargetMeta(ph, meta, metaTimestamp, datasetID, projectID, userID)
} catch (e: PluginException) {
throw e
} catch (e: Throwable) {
throw PluginException.installMeta(ph.displayName, projectID, userID, datasetID, cause = e)
}
}

private suspend fun updateTargetMeta(
ph: PluginHandler,
Expand All @@ -172,8 +193,7 @@ internal class UpdateMetaTriggerHandlerImpl(
return
}

val appDb = appDB.accessor(projectID)
if (appDb == null) {
val appDb = appDB.accessor(projectID) or {
log.info("skipping dataset {}/{}, project {} update meta due to target being disabled", userID, datasetID, projectID)
return
}
Expand Down Expand Up @@ -247,7 +267,7 @@ internal class UpdateMetaTriggerHandlerImpl(
val result = try {
ph.client.postInstallMeta(datasetID, projectID, meta)
} catch (e: Throwable){
throw PluginRequestException("install-meta", ph.displayName, projectID, userID, datasetID, e)
throw PluginRequestException.installMeta(ph.displayName, projectID, userID, datasetID, cause = e)
}

Metrics.MetaUpdates.count.labels(meta.type.name, meta.type.version, result.responseCode.toString()).inc()
Expand Down

0 comments on commit 911dfb6

Please sign in to comment.