From d241d12b7b2efd529e4677602c9d31ca1ba0e495 Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 30 Sep 2024 11:56:50 +0200 Subject: [PATCH] Fix logs for the schema validation job (#5157) * Fix logs for the schema validation job --------- Co-authored-by: Simon Dumas --- .../job/SchemaValidationCoordinator.scala | 2 +- .../schemas/job/SchemaValidationStream.scala | 29 +++++++++---------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala index 4f5e5de6f9..ae278d52b5 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala @@ -19,7 +19,7 @@ object SchemaValidationCoordinator { private val logger = Logger[SchemaValidationCoordinator] def projectionMetadata(project: ProjectRef): ProjectionMetadata = - ProjectionMetadata("schema", s"schema-$project-validate-resources", Some(project), None) + ProjectionMetadata("schema", s"schema-validate-resources-$project", Some(project), None) def apply(supervisor: Supervisor, schemaValidationStream: SchemaValidationStream): SchemaValidationCoordinator = new SchemaValidationCoordinator { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala index b619596ec4..6f39e8fcc7 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala @@ -43,21 +43,18 @@ object SchemaValidationStream { } } yield (Some(())) - private def log(message: String) = Stream.eval(logger.info(message)) - - override def apply(project: ProjectRef, offset: Offset): ElemStream[Unit] = { - for { - _ <- log(s"Starting validation of resources for project '$project'") - stream <- resourceStream(project, offset).evalMap { - _.evalMapFilter { - case r if r.deprecated => IO.none - case r if r.schema.iri == schemas.resources => IO.none - case r => validateSingle(r) - } - } - _ <- log(s"Validation of resources for project '$project' has been completed.") - } yield stream - - } + override def apply(project: ProjectRef, offset: Offset): ElemStream[Unit] = + Stream.eval(logger.info(s"Starting validation of resources for project '$project'")) >> + resourceStream(project, offset) + .evalMap { + _.evalMapFilter { + case r if r.deprecated => IO.none + case r if r.schema.iri == schemas.resources => IO.none + case r => validateSingle(r) + } + } + .onFinalize { + logger.info(s"Validation of resources for project '$project' has been completed.") + } } }