Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schema validation job endpoints #5155

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ app {
"views/query",
"views/write",
"schemas/write",
"schemas/run",
"files/write",
"storages/write",
"version/read",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,24 @@ class ElemRoutes(
concat(
(get & pathPrefix("continuous")) {
operationName(s"$prefixSegment/$project/elems/continuous") {
emit(sseElemStream.continuous(project, SelectFilter(types, tag.getOrElse(Latest)), offset))
emit(
sseElemStream.continuous(project, SelectFilter(None, types, tag.getOrElse(Latest)), offset)
)
}
},
(get & pathPrefix("currents")) {
operationName(s"$prefixSegment/$project/elems/currents") {
emit(sseElemStream.currents(project, SelectFilter(types, tag.getOrElse(Latest)), offset))
emit(sseElemStream.currents(project, SelectFilter(None, types, tag.getOrElse(Latest)), offset))
}
},
(get & pathPrefix("remaining")) {
operationName(s"$prefixSegment/$project/elems/remaining") {
emit(
sseElemStream.remaining(project, SelectFilter(types, tag.getOrElse(Latest)), offset).map {
r => r.getOrElse(RemainingElems(0L, Instant.EPOCH))
}
sseElemStream
.remaining(project, SelectFilter(None, types, tag.getOrElse(Latest)), offset)
.map { r =>
r.getOrElse(RemainingElems(0L, Instant.EPOCH))
}
)
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package ch.epfl.bluebrain.nexus.delta.routes

import akka.http.scaladsl.model.{ContentTypes, StatusCodes}
import akka.http.scaladsl.server.Route
import akka.util.ByteString
import cats.effect.IO
import cats.implicits.catsSyntaxApplicativeError
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, FileResponse}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.SchemaValidationCoordinator
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils.StreamingUtils

/**
* Routes to trigger and get results from a schema validation job
*/
class SchemaJobRoutes(
identities: Identities,
aclCheck: AclCheck,
fetchContext: FetchContext,
schemaValidationCoordinator: SchemaValidationCoordinator,
projections: Projections,
projectionsErrors: ProjectionErrors
)(implicit
baseUri: BaseUri,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck) {

private def projectionName(project: ProjectRef) = SchemaValidationCoordinator.projectionMetadata(project).name

private def projectExists(project: ProjectRef) = fetchContext.onRead(project).void

private def streamValidationErrors(project: ProjectRef): IO[FileResponse] = {
IO.delay {
StreamConverter(
projectionsErrors
.failedElemEntries(projectionName(project), Offset.start)
.map(_.failedElemData)
.through(StreamingUtils.ndjson)
.map(ByteString(_))
)
}
}.map { s =>
FileResponse("validation.json", ContentTypes.`application/json`, None, s)
}

def routes: Route =
baseUriPrefix(baseUri.prefix) {
pathPrefix("jobs") {
extractCaller { implicit caller =>
pathPrefix("schemas") {
(pathPrefix("validation") & projectRef) { project =>
authorizeFor(project, Permissions.schemas.run).apply {
concat(
(post & pathEndOrSingleSlash) {
emit(
StatusCodes.Accepted,
projectExists(project) >> schemaValidationCoordinator.run(project).start.void
)
},
(pathPrefix("statistics") & get & pathEndOrSingleSlash) {
emit(
projectExists(project) >> projections
.statistics(
project,
SelectFilter.latestOfEntity(Resources.entityType),
projectionName(project)
)
)
},
(pathPrefix("errors") & get & pathEndOrSingleSlash) {
emit(
projectExists(project) >> streamValidationErrors(project).attemptNarrow[Nothing]
)
}
)
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ValidateShacl
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.SchemasRoutes
import ch.epfl.bluebrain.nexus.delta.routes.{SchemaJobRoutes, SchemasRoutes}
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
Expand All @@ -27,6 +27,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.schemas._
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.{SchemaValidationCoordinator, SchemaValidationStream}
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.{Schema, SchemaEvent}
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors}
import izumi.distage.model.definition.{Id, ModuleDef}
Expand Down Expand Up @@ -120,6 +121,32 @@ object SchemasModule extends ModuleDef {
)
}

make[SchemaJobRoutes].from {
(
identities: Identities,
aclCheck: AclCheck,
fetchContext: FetchContext,
schemaValidationCoordinator: SchemaValidationCoordinator,
projections: Projections,
projectionsErrors: ProjectionErrors,
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new SchemaJobRoutes(
identities,
aclCheck,
fetchContext,
schemaValidationCoordinator,
projections,
projectionsErrors
)(
baseUri,
cr,
ordering
)
}

many[SseEncoder[_]].add { base: BaseUri => SchemaEvent.sseEncoder(base) }

many[ScopedEventMetricEncoder[_]].add { SchemaEvent.schemaEventMetricEncoder }
Expand All @@ -144,6 +171,10 @@ object SchemasModule extends ModuleDef {
PriorityRoute(pluginsMaxPriority + 8, route.routes, requiresStrictEntity = true)
}

many[PriorityRoute].add { (route: SchemaJobRoutes) =>
PriorityRoute(pluginsMaxPriority + 8, route.routes, requiresStrictEntity = true)
}

make[Schema.Shift].from { (schemas: Schemas, base: BaseUri) =>
Schema.shift(schemas)(base)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package ch.epfl.bluebrain.nexus.delta.routes

import akka.http.scaladsl.model.MediaRanges.`*/*`
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.{Accept, OAuth2BearerToken}
import akka.http.scaladsl.server.Route
import cats.effect.{IO, Ref}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.Root
import ch.epfl.bluebrain.nexus.delta.sdk.generators.ProjectGen
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.SchemaValidationCoordinator
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{FailureReason, ProjectionProgress}

import java.time.Instant

class SchemaJobRoutesSpec extends BaseRouteSpec {

private val caller = Caller(alice, Set(alice, Anonymous, Authenticated(realm), Group("group", realm)))

private val asAlice = addCredentials(OAuth2BearerToken("alice"))

private val project = ProjectGen.project("org", "project")
private val rev = 1
private val resourceId = nxv + "myid"

private val fetchContext = FetchContextDummy(List(project))

private val identities = IdentitiesDummy(caller)

private val aclCheck = AclSimpleCheck((alice, Root, Set(Permissions.schemas.run))).accepted

private lazy val projections = Projections(xas, queryConfig, clock)
private lazy val projectionErrors = ProjectionErrors(xas, queryConfig, clock)

private val progress = ProjectionProgress(Offset.at(15L), Instant.EPOCH, 9000L, 400L, 30L)

private val runTrigger = Ref.unsafe[IO, Boolean](false)

private val schemaValidationCoordinator = new SchemaValidationCoordinator {
override def run(project: ProjectRef): IO[Unit] = runTrigger.set(true).void
}

private lazy val routes = Route.seal(
new SchemaJobRoutes(
identities,
aclCheck,
fetchContext,
schemaValidationCoordinator,
projections,
projectionErrors
).routes
)

override def beforeAll(): Unit = {
super.beforeAll()
val projectionMetadata = SchemaValidationCoordinator.projectionMetadata(project.ref)

val reason = FailureReason("ValidationFail", json"""{ "details": "..." }""")
val fail1 = FailedElem(EntityType("ACL"), resourceId, Some(project.ref), Instant.EPOCH, Offset.At(42L), reason, rev)
val fail2 =
FailedElem(EntityType("Schema"), resourceId, Some(project.ref), Instant.EPOCH, Offset.At(42L), reason, rev)

(
projections.save(projectionMetadata, progress) >>
projectionErrors.saveFailedElems(projectionMetadata, List(fail1, fail2))
).accepted
}

"The schema validation job route" should {
"fail to start a validation job without permission" in {
Post("/v1/jobs/schemas/validation/org/project") ~> routes ~> check {
response.shouldBeForbidden
runTrigger.get.accepted shouldEqual false
}
}

"fail to start a validation job for an unknown project" in {
Post("/v1/jobs/schemas/validation/xxx/xxx") ~> asAlice ~> routes ~> check {
response.shouldFail(StatusCodes.NotFound, "ProjectNotFound")
runTrigger.get.accepted shouldEqual false
}
}

"start a validation job on the project with appropriate access" in {
Post("/v1/jobs/schemas/validation/org/project") ~> asAlice ~> routes ~> check {
response.status shouldEqual StatusCodes.Accepted
runTrigger.get.accepted shouldEqual true
}
}

"fail to get statistics on a validation job without permission" in {
Get("/v1/jobs/schemas/validation/org/project/statistics") ~> routes ~> check {
response.shouldBeForbidden
}
}

"fail to get statistics on a validation job for an unknown project" in {
Get("/v1/jobs/schemas/validation/xxx/xxx/statistics") ~> asAlice ~> routes ~> check {
response.shouldFail(StatusCodes.NotFound, "ProjectNotFound")
}
}

"get statistics on a validation job with appropriate access" in {
val expectedResponse =
json"""
{
"@context": "https://bluebrain.github.io/nexus/contexts/statistics.json",
"delayInSeconds" : 0,
"discardedEvents": 400,
"evaluatedEvents": 8570,
"failedEvents": 30,
"lastEventDateTime": "${Instant.EPOCH}",
"lastProcessedEventDateTime": "${Instant.EPOCH}",
"processedEvents": 9000,
"remainingEvents": 0,
"totalEvents": 9000
}"""

Get("/v1/jobs/schemas/validation/org/project/statistics") ~> asAlice ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual expectedResponse
}
}

"fail to download errors on a validation job without permission" in {
Get("/v1/jobs/schemas/validation/org/project/errors") ~> routes ~> check {
response.shouldBeForbidden
}
}

"fail to download errors on a validation job for an unknown project" in {
Get("/v1/jobs/schemas/validation/xxx/xxx/errors") ~> asAlice ~> routes ~> check {
response.shouldFail(StatusCodes.NotFound, "ProjectNotFound")
}
}

"download errors on a validation job with appropriate access" in {
val expectedResponse =
s"""{"id":"$resourceId","project":"${project.ref}","offset":{"value":42,"@type":"At"},"rev":1,"reason":{"type":"ValidationFail","value":{"details":"..."}}}
|{"id":"$resourceId","project":"${project.ref}","offset":{"value":42,"@type":"At"},"rev":1,"reason":{"type":"ValidationFail","value":{"details":"..."}}}
|""".stripMargin

Get("/v1/jobs/schemas/validation/org/project/errors") ~> Accept(`*/*`) ~> asAlice ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asString shouldEqual expectedResponse
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ArchiveRoutes(

private def emitArchiveFile(source: IO[AkkaSource]) = {
val response = source.map { s =>
FileResponse(s"archive.zip", Zip.contentType, 0L, s)
FileResponse(s"archive.zip", Zip.contentType, None, s)
}
emit(response.attemptNarrow[ArchiveRejection])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,21 @@ class ArchiveDownloadSpec
val fetchFileContent: (Iri, ProjectRef) => IO[FileResponse] = {
case (`id1`, `projectRef`) =>
IO.pure(
FileResponse(file1Name, ContentTypes.`text/plain(UTF-8)`, file1Size, Source.single(ByteString(file1Content)))
FileResponse(
file1Name,
ContentTypes.`text/plain(UTF-8)`,
Some(file1Size),
Source.single(ByteString(file1Content))
)
)
case (`id2`, `projectRef`) =>
IO.pure(
FileResponse(file2Name, ContentTypes.`text/plain(UTF-8)`, file2Size, Source.single(ByteString(file2Content)))
FileResponse(
file2Name,
ContentTypes.`text/plain(UTF-8)`,
Some(file2Size),
Source.single(ByteString(file2Content))
)
)
case (id, ref) =>
IO.raiseError(FileNotFound(id, ref))
Expand Down
Loading