From 772ebfb7cdb1b73a33768d8718990043adad4110 Mon Sep 17 00:00:00 2001 From: Simon Date: Fri, 27 Sep 2024 09:47:24 +0200 Subject: [PATCH] Add schema validation job endpoints (#5155) * Add schema validation job endpoints --------- Co-authored-by: Simon Dumas --- delta/app/src/main/resources/app.conf | 1 + .../nexus/delta/routes/ElemRoutes.scala | 14 +- .../nexus/delta/routes/SchemaJobRoutes.scala | 96 +++++++++++ .../nexus/delta/wiring/SchemasModule.scala | 33 +++- .../delta/routes/SchemaJobRoutesSpec.scala | 160 ++++++++++++++++++ .../archive/routes/ArchiveRoutes.scala | 2 +- .../plugins/archive/ArchiveDownloadSpec.scala | 14 +- .../plugins/archive/ArchiveRoutesSpec.scala | 4 +- .../model/BlazegraphViewValue.scala | 2 +- .../routes/list-indexing-errors.json | 8 +- .../model/CompositeViewSource.scala | 2 +- .../routes/list-indexing-errors.json | 8 +- .../indexing/ElasticSearchSink.scala | 13 +- .../model/ElasticSearchViewValue.scala | 2 +- .../routes/list-indexing-errors.json | 8 +- .../indexing/ElasticSearchSinkSuite.scala | 2 +- .../delta/plugins/storage/files/Files.scala | 2 +- .../delta/sdk/directives/FileResponse.scala | 6 +- .../delta/sdk/permissions/Permissions.scala | 1 + .../job/SchemaValidationCoordinator.scala | 2 +- .../schemas/job/SchemaValidationStream.scala | 30 +++- .../sdk/directives/ResponseToJsonLdSpec.scala | 2 +- .../nexus/delta/sdk/utils/RouteHelpers.scala | 14 +- ...V1_11_M02_001__change_failed_elem_logs.ddl | 3 +- .../sourcing/model/FailedElemLogRow.scala | 6 +- .../projections/FailedElemLogStore.scala | 6 +- .../delta/sourcing/query/SelectFilter.scala | 14 +- .../delta/sourcing/query/StreamingQuery.scala | 1 + .../delta/sourcing/stream/FailureReason.scala | 12 +- .../stream/utils/StreamingUtils.scala | 5 + .../sourcing/query/StreamingQuerySuite.scala | 12 +- .../stream/utils/StreamingUtilsSuite.scala | 5 + .../nexus/tests/iam/types/AclListing.scala | 6 +- .../schemas/SchemaValidationJobSpec.scala | 115 +++++++++++++ 34 files changed, 535 insertions(+), 76 deletions(-) create mode 100644 delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutes.scala create mode 100644 delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutesSpec.scala create mode 100644 tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/schemas/SchemaValidationJobSpec.scala diff --git a/delta/app/src/main/resources/app.conf b/delta/app/src/main/resources/app.conf index 446dbd9343..9b50fcc310 100644 --- a/delta/app/src/main/resources/app.conf +++ b/delta/app/src/main/resources/app.conf @@ -135,6 +135,7 @@ app { "views/query", "views/write", "schemas/write", + "schemas/run", "files/write", "storages/write", "version/read", diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala index 38644ce6d5..86bdbc4700 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/ElemRoutes.scala @@ -56,20 +56,24 @@ class ElemRoutes( concat( (get & pathPrefix("continuous")) { operationName(s"$prefixSegment/$project/elems/continuous") { - emit(sseElemStream.continuous(project, SelectFilter(types, tag.getOrElse(Latest)), offset)) + emit( + sseElemStream.continuous(project, SelectFilter(None, types, tag.getOrElse(Latest)), offset) + ) } }, (get & pathPrefix("currents")) { operationName(s"$prefixSegment/$project/elems/currents") { - emit(sseElemStream.currents(project, SelectFilter(types, tag.getOrElse(Latest)), offset)) + emit(sseElemStream.currents(project, SelectFilter(None, types, tag.getOrElse(Latest)), offset)) } }, (get & pathPrefix("remaining")) { operationName(s"$prefixSegment/$project/elems/remaining") { emit( - sseElemStream.remaining(project, SelectFilter(types, tag.getOrElse(Latest)), offset).map { - r => r.getOrElse(RemainingElems(0L, Instant.EPOCH)) - } + sseElemStream + .remaining(project, SelectFilter(None, types, tag.getOrElse(Latest)), offset) + .map { r => + r.getOrElse(RemainingElems(0L, Instant.EPOCH)) + } ) } }, diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutes.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutes.scala new file mode 100644 index 0000000000..4acfe9a316 --- /dev/null +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutes.scala @@ -0,0 +1,96 @@ +package ch.epfl.bluebrain.nexus.delta.routes + +import akka.http.scaladsl.model.{ContentTypes, StatusCodes} +import akka.http.scaladsl.server.Route +import akka.util.ByteString +import cats.effect.IO +import cats.implicits.catsSyntaxApplicativeError +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution +import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering +import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck +import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._ +import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, FileResponse} +import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities +import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri +import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions +import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext +import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.SchemaValidationCoordinator +import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections} +import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils.StreamingUtils + +/** + * Routes to trigger and get results from a schema validation job + */ +class SchemaJobRoutes( + identities: Identities, + aclCheck: AclCheck, + fetchContext: FetchContext, + schemaValidationCoordinator: SchemaValidationCoordinator, + projections: Projections, + projectionsErrors: ProjectionErrors +)(implicit + baseUri: BaseUri, + cr: RemoteContextResolution, + ordering: JsonKeyOrdering +) extends AuthDirectives(identities, aclCheck) { + + private def projectionName(project: ProjectRef) = SchemaValidationCoordinator.projectionMetadata(project).name + + private def projectExists(project: ProjectRef) = fetchContext.onRead(project).void + + private def streamValidationErrors(project: ProjectRef): IO[FileResponse] = { + IO.delay { + StreamConverter( + projectionsErrors + .failedElemEntries(projectionName(project), Offset.start) + .map(_.failedElemData) + .through(StreamingUtils.ndjson) + .map(ByteString(_)) + ) + } + }.map { s => + FileResponse("validation.json", ContentTypes.`application/json`, None, s) + } + + def routes: Route = + baseUriPrefix(baseUri.prefix) { + pathPrefix("jobs") { + extractCaller { implicit caller => + pathPrefix("schemas") { + (pathPrefix("validation") & projectRef) { project => + authorizeFor(project, Permissions.schemas.run).apply { + concat( + (post & pathEndOrSingleSlash) { + emit( + StatusCodes.Accepted, + projectExists(project) >> schemaValidationCoordinator.run(project).start.void + ) + }, + (pathPrefix("statistics") & get & pathEndOrSingleSlash) { + emit( + projectExists(project) >> projections + .statistics( + project, + SelectFilter.latestOfEntity(Resources.entityType), + projectionName(project) + ) + ) + }, + (pathPrefix("errors") & get & pathEndOrSingleSlash) { + emit( + projectExists(project) >> streamValidationErrors(project).attemptNarrow[Nothing] + ) + } + ) + } + } + } + } + } + } +} diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala index 18d29ddd85..17f3ecec10 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala @@ -9,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ValidateShacl import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering -import ch.epfl.bluebrain.nexus.delta.routes.SchemasRoutes +import ch.epfl.bluebrain.nexus.delta.routes.{SchemaJobRoutes, SchemasRoutes} import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction import ch.epfl.bluebrain.nexus.delta.sdk._ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck @@ -27,6 +27,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.schemas._ import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.{SchemaValidationCoordinator, SchemaValidationStream} import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.{Schema, SchemaEvent} import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} import izumi.distage.model.definition.{Id, ModuleDef} @@ -120,6 +121,32 @@ object SchemasModule extends ModuleDef { ) } + make[SchemaJobRoutes].from { + ( + identities: Identities, + aclCheck: AclCheck, + fetchContext: FetchContext, + schemaValidationCoordinator: SchemaValidationCoordinator, + projections: Projections, + projectionsErrors: ProjectionErrors, + baseUri: BaseUri, + cr: RemoteContextResolution @Id("aggregate"), + ordering: JsonKeyOrdering + ) => + new SchemaJobRoutes( + identities, + aclCheck, + fetchContext, + schemaValidationCoordinator, + projections, + projectionsErrors + )( + baseUri, + cr, + ordering + ) + } + many[SseEncoder[_]].add { base: BaseUri => SchemaEvent.sseEncoder(base) } many[ScopedEventMetricEncoder[_]].add { SchemaEvent.schemaEventMetricEncoder } @@ -144,6 +171,10 @@ object SchemasModule extends ModuleDef { PriorityRoute(pluginsMaxPriority + 8, route.routes, requiresStrictEntity = true) } + many[PriorityRoute].add { (route: SchemaJobRoutes) => + PriorityRoute(pluginsMaxPriority + 8, route.routes, requiresStrictEntity = true) + } + make[Schema.Shift].from { (schemas: Schemas, base: BaseUri) => Schema.shift(schemas)(base) } diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutesSpec.scala new file mode 100644 index 0000000000..9ad3347816 --- /dev/null +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutesSpec.scala @@ -0,0 +1,160 @@ +package ch.epfl.bluebrain.nexus.delta.routes + +import akka.http.scaladsl.model.MediaRanges.`*/*` +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.headers.{Accept, OAuth2BearerToken} +import akka.http.scaladsl.server.Route +import cats.effect.{IO, Ref} +import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv +import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck +import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.Root +import ch.epfl.bluebrain.nexus.delta.sdk.generators.ProjectGen +import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy +import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller +import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions +import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.SchemaValidationCoordinator +import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{FailureReason, ProjectionProgress} + +import java.time.Instant + +class SchemaJobRoutesSpec extends BaseRouteSpec { + + private val caller = Caller(alice, Set(alice, Anonymous, Authenticated(realm), Group("group", realm))) + + private val asAlice = addCredentials(OAuth2BearerToken("alice")) + + private val project = ProjectGen.project("org", "project") + private val rev = 1 + private val resourceId = nxv + "myid" + + private val fetchContext = FetchContextDummy(List(project)) + + private val identities = IdentitiesDummy(caller) + + private val aclCheck = AclSimpleCheck((alice, Root, Set(Permissions.schemas.run))).accepted + + private lazy val projections = Projections(xas, queryConfig, clock) + private lazy val projectionErrors = ProjectionErrors(xas, queryConfig, clock) + + private val progress = ProjectionProgress(Offset.at(15L), Instant.EPOCH, 9000L, 400L, 30L) + + private val runTrigger = Ref.unsafe[IO, Boolean](false) + + private val schemaValidationCoordinator = new SchemaValidationCoordinator { + override def run(project: ProjectRef): IO[Unit] = runTrigger.set(true).void + } + + private lazy val routes = Route.seal( + new SchemaJobRoutes( + identities, + aclCheck, + fetchContext, + schemaValidationCoordinator, + projections, + projectionErrors + ).routes + ) + + override def beforeAll(): Unit = { + super.beforeAll() + val projectionMetadata = SchemaValidationCoordinator.projectionMetadata(project.ref) + + val reason = FailureReason("ValidationFail", json"""{ "details": "..." }""") + val fail1 = FailedElem(EntityType("ACL"), resourceId, Some(project.ref), Instant.EPOCH, Offset.At(42L), reason, rev) + val fail2 = + FailedElem(EntityType("Schema"), resourceId, Some(project.ref), Instant.EPOCH, Offset.At(42L), reason, rev) + + ( + projections.save(projectionMetadata, progress) >> + projectionErrors.saveFailedElems(projectionMetadata, List(fail1, fail2)) + ).accepted + } + + "The schema validation job route" should { + "fail to start a validation job without permission" in { + Post("/v1/jobs/schemas/validation/org/project") ~> routes ~> check { + response.shouldBeForbidden + runTrigger.get.accepted shouldEqual false + } + } + + "fail to start a validation job for an unknown project" in { + Post("/v1/jobs/schemas/validation/xxx/xxx") ~> asAlice ~> routes ~> check { + response.shouldFail(StatusCodes.NotFound, "ProjectNotFound") + runTrigger.get.accepted shouldEqual false + } + } + + "start a validation job on the project with appropriate access" in { + Post("/v1/jobs/schemas/validation/org/project") ~> asAlice ~> routes ~> check { + response.status shouldEqual StatusCodes.Accepted + runTrigger.get.accepted shouldEqual true + } + } + + "fail to get statistics on a validation job without permission" in { + Get("/v1/jobs/schemas/validation/org/project/statistics") ~> routes ~> check { + response.shouldBeForbidden + } + } + + "fail to get statistics on a validation job for an unknown project" in { + Get("/v1/jobs/schemas/validation/xxx/xxx/statistics") ~> asAlice ~> routes ~> check { + response.shouldFail(StatusCodes.NotFound, "ProjectNotFound") + } + } + + "get statistics on a validation job with appropriate access" in { + val expectedResponse = + json""" + { + "@context": "https://bluebrain.github.io/nexus/contexts/statistics.json", + "delayInSeconds" : 0, + "discardedEvents": 400, + "evaluatedEvents": 8570, + "failedEvents": 30, + "lastEventDateTime": "${Instant.EPOCH}", + "lastProcessedEventDateTime": "${Instant.EPOCH}", + "processedEvents": 9000, + "remainingEvents": 0, + "totalEvents": 9000 + }""" + + Get("/v1/jobs/schemas/validation/org/project/statistics") ~> asAlice ~> routes ~> check { + response.status shouldEqual StatusCodes.OK + response.asJson shouldEqual expectedResponse + } + } + + "fail to download errors on a validation job without permission" in { + Get("/v1/jobs/schemas/validation/org/project/errors") ~> routes ~> check { + response.shouldBeForbidden + } + } + + "fail to download errors on a validation job for an unknown project" in { + Get("/v1/jobs/schemas/validation/xxx/xxx/errors") ~> asAlice ~> routes ~> check { + response.shouldFail(StatusCodes.NotFound, "ProjectNotFound") + } + } + + "download errors on a validation job with appropriate access" in { + val expectedResponse = + s"""{"id":"$resourceId","project":"${project.ref}","offset":{"value":42,"@type":"At"},"rev":1,"reason":{"type":"ValidationFail","value":{"details":"..."}}} + |{"id":"$resourceId","project":"${project.ref}","offset":{"value":42,"@type":"At"},"rev":1,"reason":{"type":"ValidationFail","value":{"details":"..."}}} + |""".stripMargin + + Get("/v1/jobs/schemas/validation/org/project/errors") ~> Accept(`*/*`) ~> asAlice ~> routes ~> check { + response.status shouldEqual StatusCodes.OK + response.asString shouldEqual expectedResponse + } + } + } +} diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala index d6368ed5c8..adfd924f33 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/routes/ArchiveRoutes.scala @@ -78,7 +78,7 @@ class ArchiveRoutes( private def emitArchiveFile(source: IO[AkkaSource]) = { val response = source.map { s => - FileResponse(s"archive.zip", Zip.contentType, 0L, s) + FileResponse(s"archive.zip", Zip.contentType, None, s) } emit(response.attemptNarrow[ArchiveRejection]) } diff --git a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala index a48e49a687..f5d4b09900 100644 --- a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala +++ b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala @@ -123,11 +123,21 @@ class ArchiveDownloadSpec val fetchFileContent: (Iri, ProjectRef) => IO[FileResponse] = { case (`id1`, `projectRef`) => IO.pure( - FileResponse(file1Name, ContentTypes.`text/plain(UTF-8)`, file1Size, Source.single(ByteString(file1Content))) + FileResponse( + file1Name, + ContentTypes.`text/plain(UTF-8)`, + Some(file1Size), + Source.single(ByteString(file1Content)) + ) ) case (`id2`, `projectRef`) => IO.pure( - FileResponse(file2Name, ContentTypes.`text/plain(UTF-8)`, file2Size, Source.single(ByteString(file2Content))) + FileResponse( + file2Name, + ContentTypes.`text/plain(UTF-8)`, + Some(file2Size), + Source.single(ByteString(file2Content)) + ) ) case (id, ref) => IO.raiseError(FileNotFound(id, ref)) diff --git a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveRoutesSpec.scala b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveRoutesSpec.scala index b853bf0d58..8e59fa46cb 100644 --- a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveRoutesSpec.scala +++ b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveRoutesSpec.scala @@ -128,7 +128,9 @@ class ArchiveRoutesSpec extends BaseRouteSpec with StorageFixtures with ArchiveH case (_, _, `subjectNoFilePerms`) => IO.raiseError(AuthorizationFailed(AclAddress.Project(p), Permission.unsafe("disk/read"))) case (`fileId`, `projectRef`, _) => - IO.pure(FileResponse("file.txt", ContentTypes.`text/plain(UTF-8)`, 12L, Source.single(ByteString(fileContent)))) + IO.pure( + FileResponse("file.txt", ContentTypes.`text/plain(UTF-8)`, Some(12L), Source.single(ByteString(fileContent))) + ) case (id, ref, _) => IO.raiseError(FileNotFound(id, ref)) } diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/BlazegraphViewValue.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/BlazegraphViewValue.scala index b200154dfb..44cd076ad8 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/BlazegraphViewValue.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/BlazegraphViewValue.scala @@ -101,7 +101,7 @@ object BlazegraphViewValue { * Creates a [[SelectFilter]] for this view */ def selectFilter: SelectFilter = - SelectFilter(resourceTypes, resourceTag.getOrElse(Latest)) + SelectFilter(None, resourceTypes, resourceTag.getOrElse(Latest)) } /** diff --git a/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json b/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json index f0921daf06..c4b3d5f714 100644 --- a/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json +++ b/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json @@ -16,8 +16,8 @@ "_rev": 1, "reason": { "type": "UnexpectedError", - "message": "boom", - "details": { + "value": { + "message": "boom", "exception" : "java.lang.Exception" } } @@ -31,8 +31,8 @@ "_rev": 1, "reason": { "type": "UnexpectedError", - "message": "boom", - "details": { + "value": { + "message": "boom", "exception" : "java.lang.Exception" } } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/model/CompositeViewSource.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/model/CompositeViewSource.scala index 74f03aea79..62057619f7 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/model/CompositeViewSource.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/model/CompositeViewSource.scala @@ -58,7 +58,7 @@ sealed trait CompositeViewSource extends Product with Serializable { * the [[SelectFilter]] for the given view; used to filter the data that is indexed */ def selectFilter: SelectFilter = - SelectFilter(resourceTypes, resourceTag.getOrElse(Latest)) + SelectFilter(None, resourceTypes, resourceTag.getOrElse(Latest)) /** * @return diff --git a/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json b/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json index 3b3327233c..98eeb66b7c 100644 --- a/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json +++ b/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json @@ -16,8 +16,8 @@ "_rev": 1, "reason": { "type": "UnexpectedError", - "message": "boom", - "details": { + "value": { + "message": "boom", "exception" : "java.lang.Exception" } } @@ -31,8 +31,8 @@ "_rev": 1, "reason": { "type": "UnexpectedError", - "message": "boom", - "details": { + "value": { + "message": "boom", "exception" : "java.lang.Exception" } } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala index dda5ea9760..4583bb1c81 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, FailureReason} import fs2.Chunk +import io.circe.syntax.KeyOps import io.circe.{Json, JsonObject} import shapeless.Typeable @@ -101,7 +102,7 @@ object ElasticSearchSink { items.get(documentId(element)) match { case None => element.failed(onMissingInResponse(element.id)) case Some(MixedOutcomes.Outcome.Success) => element.void - case Some(MixedOutcomes.Outcome.Error(json)) => element.failed(onIndexingFailure(element.id, json)) + case Some(MixedOutcomes.Outcome.Error(json)) => element.failed(onIndexingFailure(json)) } } } @@ -168,13 +169,9 @@ object ElasticSearchSink { private def onMissingInResponse(id: Iri) = FailureReason( "MissingInResponse", - s"$id was not found in Elasticsearch response", - JsonObject.empty + Json.obj("message" := s"$id was not found in Elasticsearch response") ) - private def onIndexingFailure(id: Iri, error: JsonObject) = FailureReason( - "IndexingFailure", - s"$id could not be indexed", - error - ) + private def onIndexingFailure(error: JsonObject) = + FailureReason("IndexingFailure", error) } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/model/ElasticSearchViewValue.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/model/ElasticSearchViewValue.scala index 1bb7231bea..a09e603748 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/model/ElasticSearchViewValue.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/model/ElasticSearchViewValue.scala @@ -110,7 +110,7 @@ object ElasticSearchViewValue { filterByTypeConfig.map(_.types).getOrElse(IriFilter.None) } .getOrElse(IriFilter.None) - SelectFilter(types, resourceTag.getOrElse(Latest)) + SelectFilter(None, types, resourceTag.getOrElse(Latest)) } /** diff --git a/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json b/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json index b58a981b79..50c2b44be2 100644 --- a/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json +++ b/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json @@ -16,8 +16,8 @@ "_rev": 1, "reason": { "type": "UnexpectedError", - "message": "boom", - "details": { + "value": { + "message": "boom", "exception" : "java.lang.Exception" } } @@ -31,8 +31,8 @@ "_rev": 1, "reason": { "type": "UnexpectedError", - "message": "boom", - "details": { + "value": { + "message": "boom", "exception" : "java.lang.Exception" } } diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala index edf0551cf0..c8ff0a4e18 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala @@ -111,7 +111,7 @@ class ElasticSearchSinkSuite extends NexusSuite with ElasticSearchClientSetup.Fi f.throwable match { case reason: FailureReason => assertEquals(reason.`type`, "IndexingFailure") - val detailKeys = reason.details.asObject.map(_.keys.toSet) + val detailKeys = reason.value.asObject.map(_.keys.toSet) assertEquals(detailKeys, Some(Set("type", "reason", "caused_by"))) case t => fail(s"An indexing failure was expected, got '$t'", t) } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala index f1dd21d212..85a5b99e17 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala @@ -465,7 +465,7 @@ final class Files( _ <- validateAuth(id.project, storage.value.storageValue.readPermission) s = fetchFile(storage.value, attributes, file.id) mediaType = attributes.mediaType.getOrElse(`application/octet-stream`) - } yield FileResponse(attributes.filename, mediaType, attributes.bytes, s.attemptNarrow[FileRejection]) + } yield FileResponse(attributes.filename, mediaType, Some(attributes.bytes), s.attemptNarrow[FileRejection]) }.span("fetchFileContent") private def fetchFile(storage: Storage, attr: FileAttributes, fileId: Iri): IO[AkkaSource] = diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/FileResponse.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/FileResponse.scala index 21e0d51e95..48037a9eea 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/FileResponse.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/FileResponse.scala @@ -33,12 +33,12 @@ object FileResponse { * @param bytes * the file size */ - final case class Metadata(filename: String, contentType: ContentType, bytes: Long) + final case class Metadata(filename: String, contentType: ContentType, bytes: Option[Long]) def apply[E: JsonLdEncoder: HttpResponseFields]( filename: String, contentType: ContentType, - bytes: Long, + bytes: Option[Long], io: IO[Either[E, AkkaSource]] ) = new FileResponse( @@ -50,6 +50,6 @@ object FileResponse { } ) - def apply(filename: String, contentType: ContentType, bytes: Long, source: AkkaSource): FileResponse = + def apply(filename: String, contentType: ContentType, bytes: Option[Long], source: AkkaSource): FileResponse = new FileResponse(Metadata(filename, contentType, bytes), IO.pure(Right(source))) } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/permissions/Permissions.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/permissions/Permissions.scala index ea31d277de..2c49c151a2 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/permissions/Permissions.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/permissions/Permissions.scala @@ -201,6 +201,7 @@ object Permissions { object schemas { final val read: Permission = resources.read final val write: Permission = Permission.unsafe("schemas/write") + final val run: Permission = Permission.unsafe("schemas/run") } /** diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala index e0db77524f..4f5e5de6f9 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala @@ -18,7 +18,7 @@ object SchemaValidationCoordinator { private val logger = Logger[SchemaValidationCoordinator] - private[job] def projectionMetadata(project: ProjectRef): ProjectionMetadata = + def projectionMetadata(project: ProjectRef): ProjectionMetadata = ProjectionMetadata("schema", s"schema-$project-validate-resources", Some(project), None) def apply(supervisor: Supervisor, schemaValidationStream: SchemaValidationStream): SchemaValidationCoordinator = diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala index cfca1556cb..b619596ec4 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.schemas.job import cats.effect.IO import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.schemas import ch.epfl.bluebrain.nexus.delta.sdk.resources.ValidateResource import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.{ResourceRejection, ResourceState} @@ -10,6 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, SuccessElemStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.FailureReason +import fs2.Stream /** * Streams the latest version of resources from a project and revalidate them with the latest version of the schema @@ -24,6 +26,8 @@ trait SchemaValidationStream { object SchemaValidationStream { + private val logger = Logger[SchemaValidationStream] + def apply( resourceStream: (ProjectRef, Offset) => SuccessElemStream[ResourceState], fetchSchema: FetchSchema, @@ -35,17 +39,25 @@ object SchemaValidationStream { jsonld <- IO.fromEither(resource.toAssembly) schema <- fetchSchema(Latest(resource.schema.iri), resource.schemaProject) _ <- validateResource(jsonld, schema).adaptErr { case r: ResourceRejection => - FailureReason("ValidateSchema", r.reason, r) + FailureReason("ValidateSchema", r) } } yield (Some(())) - override def apply(project: ProjectRef, offset: Offset): ElemStream[Unit] = - resourceStream(project, offset).evalMap { - _.evalMapFilter { - case r if r.deprecated => IO.none - case r if r.schema.iri == schemas.resources => IO.none - case r => validateSingle(r) - } - } + private def log(message: String) = Stream.eval(logger.info(message)) + + override def apply(project: ProjectRef, offset: Offset): ElemStream[Unit] = { + for { + _ <- log(s"Starting validation of resources for project '$project'") + stream <- resourceStream(project, offset).evalMap { + _.evalMapFilter { + case r if r.deprecated => IO.none + case r if r.schema.iri == schemas.resources => IO.none + case r => validateSingle(r) + } + } + _ <- log(s"Validation of resources for project '$project' has been completed.") + } yield stream + + } } } diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToJsonLdSpec.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToJsonLdSpec.scala index 2b886f546f..8421941d83 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToJsonLdSpec.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToJsonLdSpec.scala @@ -54,7 +54,7 @@ class ResponseToJsonLdSpec extends RouteHelpers with JsonSyntax with RouteConcat FileResponse( "file.name", contentType, - 1024, + Some(1024L), contents ) ) diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/utils/RouteHelpers.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/utils/RouteHelpers.scala index 0c76f097cd..c1c827851d 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/utils/RouteHelpers.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/utils/RouteHelpers.scala @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.utils import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart import akka.http.scaladsl.model.MediaTypes.`application/json` -import akka.http.scaladsl.model.{HttpEntity, HttpResponse, RequestEntity, StatusCodes} +import akka.http.scaladsl.model.{HttpEntity, HttpResponse, RequestEntity, StatusCode, StatusCodes} import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} import akka.stream.Materializer import akka.stream.scaladsl.Source @@ -85,9 +85,15 @@ final class HttpResponseOps(private val http: HttpResponse) extends Consumer { case Right(value) => value } - def shouldBeForbidden(implicit position: Position, materializer: Materializer): Assertion = { - http.status shouldEqual StatusCodes.Forbidden - asJsonObject(materializer)("@type") shouldEqual Some("AuthorizationFailed".asJson) + def shouldBeForbidden(implicit position: Position, materializer: Materializer): Assertion = + shouldFail(StatusCodes.Forbidden, "AuthorizationFailed") + + def shouldFail(statusCode: StatusCode, errorType: String)(implicit + position: Position, + materializer: Materializer + ): Assertion = { + http.status shouldEqual statusCode + asJsonObject(materializer)("@type") shouldEqual Some(errorType.asJson) } } diff --git a/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M02_001__change_failed_elem_logs.ddl b/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M02_001__change_failed_elem_logs.ddl index ef0e6bd6e0..e33bf6257b 100644 --- a/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M02_001__change_failed_elem_logs.ddl +++ b/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M02_001__change_failed_elem_logs.ddl @@ -1,6 +1,7 @@ -- Stacktrace and message are now deprecated so they can be nullable +ALTER TABLE failed_elem_logs ALTER COLUMN message DROP NOT NULL; ALTER TABLE failed_elem_logs ALTER COLUMN stack_trace DROP NOT NULL; -- Adding a new column -ALTER TABLE failed_elem_logs ADD COLUMN IF NOT EXISTS details JSONB; +ALTER TABLE failed_elem_logs ADD COLUMN IF NOT EXISTS reason JSONB; diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala index f5e7146f1d..70d382e01c 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala @@ -39,7 +39,7 @@ object FailedElemLogRow { Option[ProjectRef], Int, String, - String, + Option[String], Option[String], Instant, Option[Json] @@ -86,9 +86,9 @@ object FailedElemLogRow { ) => val reason = details .map { d => - FailureReason(errorType, message, d) + FailureReason(errorType, d) } - .getOrElse(FailureReason(errorType, message, stackTrace)) + .getOrElse(FailureReason(errorType, message.getOrElse(""), stackTrace)) FailedElemLogRow( ordering, diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala index 9b21cfe809..496c2929d0 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala @@ -161,8 +161,7 @@ object FailedElemLogStore { | elem_project, | rev, | error_type, - | message, - | details, + | reason, | instant | ) | VALUES ( @@ -176,8 +175,7 @@ object FailedElemLogStore { | ${failure.project}, | ${failure.rev}, | ${failureReason.`type`}, - | ${failureReason.message}, - | ${failureReason.details}, + | ${failureReason.value}, | $instant | )""".stripMargin.update.run.void } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/SelectFilter.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/SelectFilter.scala index f9ce428832..6ac9a81645 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/SelectFilter.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/SelectFilter.scala @@ -1,6 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.query -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{IriFilter, Tag} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, IriFilter, Tag} /** * Contains the information that can be used for filtering when streaming states @@ -9,14 +9,20 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{IriFilter, Tag} * @param tag * tag that a view is indexing */ -case class SelectFilter(types: IriFilter, tag: Tag) +case class SelectFilter(entityType: Option[EntityType], types: IriFilter, tag: Tag) object SelectFilter { /** All types with specified tag */ - val tag: Tag => SelectFilter = SelectFilter(IriFilter.None, _) + val tag: Tag => SelectFilter = SelectFilter(None, IriFilter.None, _) /** All types with latest tag */ - val latest: SelectFilter = SelectFilter(IriFilter.None, Tag.Latest) + val latest: SelectFilter = SelectFilter(None, IriFilter.None, Tag.Latest) + + /** + * All of given entity with latest tag + */ + val latestOfEntity: EntityType => SelectFilter = + entityType => SelectFilter(Some(entityType), IriFilter.None, Tag.Latest) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala index c66c482a69..a4e608d397 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala @@ -271,6 +271,7 @@ object StreamingQuery { val typeFragment = selectFilter.types.asRestrictedTo.map(restriction => fr"value -> 'types' ??| ${typesSqlArray(restriction)}") Fragments.whereAndOpt( + selectFilter.entityType.map { entityType => fr"type = $entityType" }, Scope(projectRef).asFragment, offset.asFragment, selectFilter.tag.asFragment, diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailureReason.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailureReason.scala index 995ae76804..a9e02c1347 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailureReason.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailureReason.scala @@ -9,7 +9,7 @@ import io.circe.{Codec, Encoder, Json} import scala.util.control.NoStackTrace -case class FailureReason(`type`: String, message: String, details: Json) extends Exception with NoStackTrace +case class FailureReason(`type`: String, value: Json) extends Exception with NoStackTrace object FailureReason { @@ -23,18 +23,14 @@ object FailureReason { def apply(errorType: String, message: String, stackTrace: Option[String]): FailureReason = FailureReason( "UnexpectedError", - message, Json.obj( + "message" := message, "exception" := errorType, "stacktrace" := stackTrace ) ) - def apply[A: Encoder](tpe: String, message: String, value: A): FailureReason = - FailureReason( - tpe, - message, - value.asJson - ) + def apply[A: Encoder](tpe: String, value: A): FailureReason = + FailureReason(tpe, value.asJson) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtils.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtils.scala index 967b1686c0..399ce1b3a6 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtils.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtils.scala @@ -4,6 +4,8 @@ import cats.effect.{IO, Resource} import cats.effect.std.Hotswap import fs2.io.file.{FileHandle, Files, Flag, Flags, Path, WriteCursor} import fs2.{text, Pipe, Pull, Stream} +import io.circe.Encoder +import io.circe.syntax.EncoderOps object StreamingUtils { @@ -13,6 +15,9 @@ object StreamingUtils { private val newLine = Stream.emit(lineSeparator) + def ndjson[A: Encoder]: Pipe[IO, A, String] = + _.map(_.asJson.noSpaces).intersperse(lineSeparator).append(newLine) + def readLines(path: Path) = Files[IO].readUtf8Lines(path).filter(_.nonEmpty) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala index 5f5ace1b37..69b1e2594f 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala @@ -146,7 +146,7 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { test("Running a stream on latest states on project 1 from the beginning, filtering for types") { val allowedViewTypes = IriFilter.fromSet(Set(nxv + "Fix", nxv + "Feature")) - val (iri, void) = stream(project1, Offset.start, SelectFilter(allowedViewTypes, Tag.Latest)) + val (iri, void) = stream(project1, Offset.start, SelectFilter(None, allowedViewTypes, Tag.Latest)) val expected = List( SuccessElem(PullRequest.entityType, id3, Some(project1), Instant.EPOCH, Offset.at(7L), id3, rev), @@ -237,6 +237,16 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { ) } + test("Get the remaining elems for project 1 for the PullRequest type on latest from the beginning") { + StreamingQuery + .remaining(project1, SelectFilter.latestOfEntity(PullRequest.entityType), Offset.start, xas) + .assertEquals( + Some( + RemainingElems(4L, Instant.EPOCH) + ) + ) + } + test("Get the remaining elems for project 1 on latest from offset 6") { StreamingQuery .remaining(project1, SelectFilter.latest, Offset.at(6L), xas) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtilsSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtilsSuite.scala index d069b79ef1..f145b024a6 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtilsSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/utils/StreamingUtilsSuite.scala @@ -31,4 +31,9 @@ class StreamingUtilsSuite extends NexusSuite with TempDirectory.Fixture { } yield () } + test("Create a ndjson stream") { + val values = Stream.emits(List(json"""{"a" : 1}""", json"""{"b" : 2}""", json"""{"c" : 3}""")) + values.through(StreamingUtils.ndjson).assert("""{"a":1}""", "\n", """{"b":2}""", "\n", """{"c":3}""", "\n") + } + } diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/iam/types/AclListing.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/iam/types/AclListing.scala index eb482686dd..c1304d95fb 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/iam/types/AclListing.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/iam/types/AclListing.scala @@ -124,8 +124,10 @@ object Permission { object Schemas { val name = "schemas" val Write: Permission = Permission(name, "write") + val Run: Permission = Permission(name, "run") - val list: List[Permission] = Write :: Nil + val admin: List[Permission] = Write :: Nil + val list: List[Permission] = Write :: Run :: Nil } object Views { @@ -198,7 +200,7 @@ object Permission { Projects.list ++ Resolvers.list ++ Resources.list ++ - Schemas.list ++ + Schemas.admin ++ Views.list ++ Storages.list ++ Quotas.list).toSet diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/schemas/SchemaValidationJobSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/schemas/SchemaValidationJobSpec.scala new file mode 100644 index 0000000000..b577f8ae7f --- /dev/null +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/schemas/SchemaValidationJobSpec.scala @@ -0,0 +1,115 @@ +package ch.epfl.bluebrain.nexus.tests.schemas + +import akka.http.scaladsl.model.StatusCodes +import ch.epfl.bluebrain.nexus.tests.Identity.Anonymous +import ch.epfl.bluebrain.nexus.tests.Identity.resources.Rick +import ch.epfl.bluebrain.nexus.tests.Optics.filterNestedKeys +import ch.epfl.bluebrain.nexus.tests.iam.types.Permission.Schemas +import ch.epfl.bluebrain.nexus.tests.resources.SimpleResource +import ch.epfl.bluebrain.nexus.tests.{BaseIntegrationSpec, SchemaPayload} +import io.circe.Json + +final class SchemaValidationJobSpec extends BaseIntegrationSpec { + + private val orgId = genId() + private val projId = genId() + private val project = s"$orgId/$projId" + + private val unconstrainedSchema = "_" + + private val testSchema = "test-schema" + + private val initiallyValidResource = genId() + + private def createResource(id: String, tpe: Option[String], schema: String) = { + for { + payload <- tpe match { + case None => SimpleResource.sourcePayload(id, 42) + case Some(tpe) => SimpleResource.sourcePayloadWithType(tpe, 42) + } + _ <- deltaClient.post[Json](s"/resources/$project/$schema/", payload, Rick) { + expectCreated + } + } yield () + } + + override def beforeAll(): Unit = { + super.beforeAll() + val setup = for { + _ <- createOrg(Rick, orgId) + _ <- createProjects(Rick, orgId, projId) + // Adding the permission to run the schema validation job + _ <- aclDsl.addPermission("/", Rick, Schemas.Run) + // Create schema + schemaPayload <- SchemaPayload.loadSimpleNoId() + _ <- deltaClient.put[Json](s"/schemas/$project/$testSchema", schemaPayload, Rick) { expectCreated } + // Creating a validated resource which will not resist the update of the schema because of its type + _ <- createResource(initiallyValidResource, None, testSchema) + // Creating unconstrained resource + unconstrainedResource = genId() + _ <- createResource(unconstrainedResource, None, unconstrainedSchema) + // Creating deprecated resource + deprecatedResource = genId() + _ <- createResource(deprecatedResource, None, testSchema) + _ <- deltaClient.delete(s"/resources/$project/_/$deprecatedResource?rev=1", Rick) { expectOk } + // Updating the schema + anotherType = "schema:AnotherType" + updatedSchemaPayload <- SchemaPayload.loadSimpleNoId(anotherType) + _ <- deltaClient.put[Json](s"/schemas/$project/$testSchema?rev=1", updatedSchemaPayload, Rick) { expectOk } + // Creating a validated resource with the new type which should succeed the job + anotherValidatedResource = genId() + _ <- createResource(anotherValidatedResource, Some(anotherType), testSchema) + } yield () + setup.accepted + } + + "Running the validation job" should { + + "not be allowed for an unauthorized user" in { + deltaClient.post[Json](s"/jobs/schemas/validation/$project/", Json.Null, Anonymous) { + expectForbidden + } + } + + "be run for an authorized user" in { + deltaClient.post[Json](s"/jobs/schemas/validation/$project/", Json.Null, Rick) { (_, response) => + response.status shouldEqual StatusCodes.Accepted + } + } + + "return the expected stats" in eventually { + val expected = + json""" + { + "@context": "https://bluebrain.github.io/nexus/contexts/statistics.json", + "delayInSeconds": 0, + "discardedEvents": 2, + "evaluatedEvents": 1, + "failedEvents": 1, + "processedEvents": 4, + "remainingEvents": 0, + "totalEvents": 4 + }""" + deltaClient.get[Json](s"/jobs/schemas/validation/$project/statistics", Rick) { (json, response) => + response.status shouldEqual StatusCodes.OK + filterNestedKeys("lastEventDateTime", "lastProcessedEventDateTime")(json) shouldEqual expected + } + } + + "return the schema validation job errors" in { + deltaClient.get[Json](s"/jobs/schemas/validation/$project/errors", Rick) { (json, response) => + response.status shouldEqual StatusCodes.OK + val expected = + json""" + { + "id" : "http://delta:8080/v1/resources/$project/_/$initiallyValidResource", + "project" : "$project", + "rev" : 1 + }""" + filterNestedKeys("offset", "reason")(json) shouldEqual expected + } + } + + } + +}