Skip to content

Commit

Permalink
fix: Fix deadlock when streaming out the shacl validation report (#3372)
Browse files Browse the repository at this point in the history
  • Loading branch information
seakayone authored Sep 30, 2024
1 parent e9f592e commit 88b278e
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.apache.pekko.stream.scaladsl.StreamConverters
import org.apache.pekko.util.ByteString
import zio.*

import java.io.ByteArrayInputStream
import java.io.FileInputStream
import java.io.OutputStream
import java.io.PipedInputStream
import java.io.PipedOutputStream
Expand All @@ -23,22 +23,23 @@ import org.knora.webapi.slice.shacl.domain.ValidationOptions
final case class ShaclApiService(validator: ShaclValidator) {

def validate(formData: ValidationFormData): Task[Source[ByteString, Any]] = {
val dataStream = ByteArrayInputStream(formData.`data.ttl`.getBytes)
val shaclStream = ByteArrayInputStream(formData.`shacl.ttl`.getBytes)
val options = ValidationOptions(
formData.validateShapes.getOrElse(ValidationOptions.default.validateShapes),
formData.reportDetails.getOrElse(ValidationOptions.default.reportDetails),
formData.addBlankNodes.getOrElse(ValidationOptions.default.addBlankNodes),
)
for {
report <- validator.validate(dataStream, shaclStream, options)
src <- ZIO.attemptBlockingIO {
val (out, src) = makeOutputStreamAndSource()
val (out, src) = makeOutputStreamAndSource()
ZIO.scoped {
for {
dataStream <- ZIO.fromAutoCloseable(ZIO.succeed(new FileInputStream(formData.`data.ttl`)))
shaclStream <- ZIO.fromAutoCloseable(ZIO.succeed(new FileInputStream(formData.`shacl.ttl`)))
report <- validator.validate(dataStream, shaclStream, options)
_ <- ZIO.attemptBlockingIO {
try { RDFDataMgr.write(out, report.getModel, RDFFormat.TURTLE) }
finally { out.close() }
src
}
} yield src
}.forkDaemon
} yield ()
}.as(src)
}

private def makeOutputStreamAndSource(): (OutputStream, Source[ByteString, _]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ import sttp.tapir.Schema.annotations.description
import sttp.tapir.generic.auto.*
import zio.ZLayer

import java.io.File

import dsp.errors.RequestRejectedException
import org.knora.webapi.slice.common.api.BaseEndpoints

case class ValidationFormData(
@description("The data to be validated.")
`data.ttl`: String,
`data.ttl`: File,
@description("The shapes for validation.")
`shacl.ttl`: String,
`shacl.ttl`: File,
@description(s"Should shapes also be validated.")
validateShapes: Option[Boolean],
@description("Add `sh:details` to the validation report.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.topbraid.shacl.validation.ValidationEngineConfiguration
import org.topbraid.shacl.validation.ValidationUtil
import zio.*

import java.io.ByteArrayInputStream
import java.io.InputStream

final case class ValidationOptions(validateShapes: Boolean, reportDetails: Boolean, addBlankNodes: Boolean)
Expand All @@ -24,6 +25,9 @@ object ValidationOptions {

final case class ShaclValidator() { self =>

def validate(data: String, shapes: String, opts: ValidationOptions): Task[Resource] =
validate(new ByteArrayInputStream(data.getBytes), new ByteArrayInputStream(shapes.getBytes), opts)

def validate(data: InputStream, shapes: InputStream, opts: ValidationOptions): Task[Resource] = for {
dataModel <- readModel(data)
shapesModel <- readModel(shapes)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright © 2021 - 2024 Swiss National Data and Service Center for the Humanities and/or DaSCH Service Platform contributors.
* SPDX-License-Identifier: Apache-2.0
*/

package org.knora.webapi.slice.shacl.api

import org.apache.pekko.actor.*
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.util.ByteString
import zio.*
import zio.Scope
import zio.nio.file.Files
import zio.test.*

import scala.concurrent.Await
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future

import org.knora.webapi.slice.shacl.domain.ShaclValidator

object ShaclApiServiceSpec extends ZIOSpecDefault {

private val shaclApiService = ZIO.serviceWithZIO[ShaclApiService]

val spec: Spec[TestEnvironment & Scope, Any] = suite("ShaclApiService")(
test("validate") {
for {
data <- Files.createTempFile("data.ttl", None, Seq.empty)
shacl <- Files.createTempFile("shacl.ttl", None, Seq.empty)
formData = ValidationFormData(data.toFile, shacl.toFile, None, None, None)
result <- shaclApiService(_.validate(formData))
} yield assertTrue(reportConforms(result))
},
).provide(ShaclApiService.layer, ShaclValidator.layer)

private def reportConforms(result: Source[ByteString, Any]): Boolean = {
implicit val system: ActorSystem = ActorSystem.create()
implicit val ec: ExecutionContextExecutor = system.dispatcher
implicit val mat: Materializer = Materializer(system)

val str: Future[String] = result
.runWith(Sink.fold(ByteString.empty)(_ ++ _)) // Accumulate ByteString
.map(_.utf8String)
val reportStr = Await.result(str, 5.seconds.asScala)
reportStr.contains("sh:conforms true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,47 @@ import zio.test.Spec
import zio.test.ZIOSpecDefault
import zio.test.assertTrue

import java.io.ByteArrayInputStream

object ShaclValidatorSpec extends ZIOSpecDefault {

val shaclValidator = ZIO.serviceWithZIO[ShaclValidator]

private val shapes =
new ByteArrayInputStream("""
|@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
|@prefix schema: <http://schema.org/> .
|@prefix sh: <http://www.w3.org/ns/shacl#> .
|@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
|
|schema:PersonShape
| a sh:NodeShape ;
| sh:targetClass schema:Person ;
| sh:property [
| sh:path schema:givenName ;
| sh:datatype xsd:string ;
| sh:name "given name" ;
| ] .
|""".stripMargin.getBytes)
"""
|@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
|@prefix schema: <http://schema.org/> .
|@prefix sh: <http://www.w3.org/ns/shacl#> .
|@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
|
|schema:PersonShape
| a sh:NodeShape ;
| sh:targetClass schema:Person ;
| sh:property [
| sh:path schema:givenName ;
| sh:datatype xsd:string ;
| ] .
|""".stripMargin

private val invalidData =
new ByteArrayInputStream("""
|@prefix ex: <http://example.org/ns#> .
|@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
|@prefix schema: <http://schema.org/> .
|
|ex:Bob
| a schema:Person ;
| schema:givenName 0 .
|""".stripMargin.getBytes)
"""
|@prefix ex: <http://example.org/ns#> .
|@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
|@prefix schema: <http://schema.org/> .
|
|ex:Bob
| a schema:Person ;
| schema:givenName 0 .
|""".stripMargin

private val validData =
new ByteArrayInputStream("""
|@prefix ex: <http://example.org/ns#> .
|@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
|@prefix schema: <http://schema.org/> .
|
|ex:Bob
| a schema:Person ;
| schema:givenName "valid name" .
|""".stripMargin.getBytes)
"""
|@prefix ex: <http://example.org/ns#> .
|@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
|@prefix schema: <http://schema.org/> .
|
|ex:Bob
| a schema:Person ;
| schema:givenName "valid name" .
|""".stripMargin

def spec: Spec[Any, Throwable] =
suite("ShaclValidator")(
Expand Down

0 comments on commit 88b278e

Please sign in to comment.