diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala index 4f5e5de6f9..ae278d52b5 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala @@ -19,7 +19,7 @@ object SchemaValidationCoordinator { private val logger = Logger[SchemaValidationCoordinator] def projectionMetadata(project: ProjectRef): ProjectionMetadata = - ProjectionMetadata("schema", s"schema-$project-validate-resources", Some(project), None) + ProjectionMetadata("schema", s"schema-validate-resources-$project", Some(project), None) def apply(supervisor: Supervisor, schemaValidationStream: SchemaValidationStream): SchemaValidationCoordinator = new SchemaValidationCoordinator { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala index b619596ec4..6f39e8fcc7 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala @@ -43,21 +43,18 @@ object SchemaValidationStream { } } yield (Some(())) - private def log(message: String) = Stream.eval(logger.info(message)) - - override def apply(project: ProjectRef, offset: Offset): ElemStream[Unit] = { - for { - _ <- log(s"Starting validation of resources for project '$project'") - stream <- resourceStream(project, offset).evalMap { - _.evalMapFilter { - case r if r.deprecated => IO.none - case r if r.schema.iri == schemas.resources => IO.none - case r => validateSingle(r) - } - } - _ <- log(s"Validation of resources for project '$project' has been completed.") - } yield stream - - } + override def apply(project: ProjectRef, offset: Offset): ElemStream[Unit] = + Stream.eval(logger.info(s"Starting validation of resources for project '$project'")) >> + resourceStream(project, offset) + .evalMap { + _.evalMapFilter { + case r if r.deprecated => IO.none + case r if r.schema.iri == schemas.resources => IO.none + case r => validateSingle(r) + } + } + .onFinalize { + logger.info(s"Validation of resources for project '$project' has been completed.") + } } }