diff --git a/delta/plugins/composite-views/src/main/resources/composite-views.conf b/delta/plugins/composite-views/src/main/resources/composite-views.conf
index 1a7e40a794..c691e14bd3 100644
--- a/delta/plugins/composite-views/src/main/resources/composite-views.conf
+++ b/delta/plugins/composite-views/src/main/resources/composite-views.conf
@@ -45,4 +45,7 @@ plugins.composite-views {
# set to false to disable composite view indexing
indexing-enabled = ${app.defaults.indexing.enable}
+
+ # type of composite sink to use for composite view indexing
+ sink-config = single
}
\ No newline at end of file
diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala
index d4229a9acb..2175b4b02b 100644
--- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala
+++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala
@@ -1,7 +1,23 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews
import cats.implicits._
-import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.QueryGraph
+import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
+import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink, GraphResourceToNTriples}
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig.SinkConfig
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.{BatchQueryGraph, SingleQueryGraph}
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection}
+import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
+import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
+import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSearchSink, GraphResourceToDocument}
+import ch.epfl.bluebrain.nexus.delta.rdf.graph.Graph
+import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
+import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
+import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery
+import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax
+import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
+import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
@@ -12,6 +28,12 @@ import shapeless.Typeable
import scala.concurrent.duration.FiniteDuration
+/**
+ * A composite sink handles querying the common blazegraph namespace, transforming the result into a format that can be
+ * pushed into a target namespace or index, and finally sinks it into the target.
+ */
+trait CompositeSink extends Sink
+
/**
* A sink that queries N-Triples in Blazegraph, transforms them, and pushes the result to the provided sink
* @param queryGraph
@@ -19,21 +41,21 @@ import scala.concurrent.duration.FiniteDuration
* @param transform
* function to transform a graph into the format needed by the sink
* @param sink
- * function that allows
+ * function that defines how to sink a chunk of Elem[SinkFormat]
* @param chunkSize
- * the maximum number of elements to be pushed in ES at once
+ * the maximum number of elements to be pushed into the sink
* @param maxWindow
- * the maximum number of elements to be pushed at once
+ * the maximum time to wait for the chunk to gather [[chunkSize]] elements
* @tparam SinkFormat
* the type of data accepted by the sink
*/
-final class CompositeSink[SinkFormat](
- queryGraph: QueryGraph,
+final class Single[SinkFormat](
+ queryGraph: SingleQueryGraph,
transform: GraphResource => Task[Option[SinkFormat]],
sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]],
override val chunkSize: Int,
override val maxWindow: FiniteDuration
-) extends Sink {
+) extends CompositeSink {
override type In = GraphResource
override def inType: Typeable[GraphResource] = Typeable[GraphResource]
@@ -54,3 +76,166 @@ final class CompositeSink[SinkFormat](
.flatMap(sink)
}
+
+/**
+ * A sink that queries N-Triples in Blazegraph for multiple resources, transforms it for each resource, and pushes the
+ * result to the provided sink
+ * @param queryGraph
+ * how to query the blazegraph
+ * @param transform
+ * function to transform a graph into the format needed by the sink
+ * @param sink
+ * function that defines how to sink a chunk of Elem[SinkFormat]
+ * @param chunkSize
+ * the maximum number of elements to be pushed into the sink
+ * @param maxWindow
+ * the maximum time to wait for the chunk to gather [[chunkSize]] elements
+ * @tparam SinkFormat
+ * the type of data accepted by the sink
+ */
+final class Batch[SinkFormat](
+ queryGraph: BatchQueryGraph,
+ transform: GraphResource => Task[Option[SinkFormat]],
+ sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]],
+ override val chunkSize: Int,
+ override val maxWindow: FiniteDuration
+)(implicit rcr: RemoteContextResolution)
+ extends CompositeSink {
+
+ override type In = GraphResource
+
+ override def inType: Typeable[GraphResource] = Typeable[GraphResource]
+
+ /** Performs the sparql query only using [[SuccessElem]]s from the chunk */
+ private def query(elements: Chunk[Elem[GraphResource]]): Task[Option[Graph]] =
+ elements.mapFilter(elem => elem.map(_.id).toOption) match {
+ case ids if ids.nonEmpty => queryGraph(ids)
+ case _ => Task.none
+ }
+
+ /** Replaces the graph of a provided [[GraphResource]] by extracting its new graph from the provided (full) graph. */
+ private def replaceGraph(gr: GraphResource, fullGraph: Graph) = {
+ implicit val api: JsonLdApi = JsonLdJavaApi.lenient
+ fullGraph
+ .replaceRootNode(iri"${gr.id}/alias")
+ .toCompactedJsonLd(ContextValue.empty)
+ .flatMap(_.toGraph)
+ .map(g => gr.copy(graph = g.replaceRootNode(gr.id)))
+ }
+
+ override def apply(elements: Chunk[Elem[GraphResource]]): Task[Chunk[Elem[Unit]]] =
+ for {
+ graph <- query(elements)
+ transformed <- graph match {
+ case Some(fullGraph) =>
+ elements.traverse { elem =>
+ elem.evalMapFilter { gr =>
+ replaceGraph(gr, fullGraph).flatMap(transform)
+ }
+ }
+ case None =>
+ Task.pure(elements.map(_.drop))
+ }
+ sank <- sink(transformed)
+ } yield sank
+}
+
+object CompositeSink {
+
+ /**
+ * @param blazeClient
+ * client used to connect to blazegraph
+ * @param namespace
+ * name of the target blazegraph namespace
+ * @param common
+ * name of the common blazegraph namespace
+ * @param cfg
+ * configuration of the composite views
+ * @return
+ * a function that given a sparql view returns a composite sink that has the view as target
+ */
+ def blazeSink(
+ blazeClient: BlazegraphClient,
+ namespace: String,
+ common: String,
+ cfg: CompositeViewsConfig
+ )(implicit baseUri: BaseUri, rcr: RemoteContextResolution): SparqlProjection => CompositeSink = { target =>
+ compositeSink(
+ blazeClient,
+ common,
+ target.query,
+ GraphResourceToNTriples.graphToNTriples,
+ BlazegraphSink(blazeClient, cfg.blazegraphBatch, namespace).apply,
+ cfg.blazegraphBatch,
+ cfg.sinkConfig
+ )
+ }
+
+ /**
+ * @param blazeClient
+ * blazegraph client used to query the common space
+ * @param esClient
+ * client used to push to elasticsearch
+ * @param index
+ * name of the target elasticsearch index
+ * @param common
+ * name of the common blazegraph namespace
+ * @param cfg
+ * configuration of the composite views
+ * @return
+ * a function that given a elasticsearch view returns a composite sink that has the view as target
+ */
+ def elasticSink(
+ blazeClient: BlazegraphClient,
+ esClient: ElasticSearchClient,
+ index: IndexLabel,
+ common: String,
+ cfg: CompositeViewsConfig
+ )(implicit rcr: RemoteContextResolution): ElasticSearchProjection => CompositeSink = { target =>
+ val esSink =
+ ElasticSearchSink.states(
+ esClient,
+ cfg.elasticsearchBatch.maxElements,
+ cfg.elasticsearchBatch.maxInterval,
+ index,
+ Refresh.False
+ )
+ compositeSink(
+ blazeClient,
+ common,
+ target.query,
+ new GraphResourceToDocument(target.context, target.includeContext).graphToDocument,
+ esSink.apply,
+ cfg.elasticsearchBatch,
+ cfg.sinkConfig
+ )
+ }
+
+ private def compositeSink[SinkFormat](
+ blazeClient: BlazegraphClient,
+ common: String,
+ query: SparqlConstructQuery,
+ transform: GraphResource => Task[Option[SinkFormat]],
+ sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]],
+ batchConfig: BatchConfig,
+ sinkConfig: SinkConfig
+ )(implicit rcr: RemoteContextResolution): CompositeSink = sinkConfig match {
+ case SinkConfig.Single =>
+ new Single(
+ new SingleQueryGraph(blazeClient, common, query),
+ transform,
+ sink,
+ batchConfig.maxElements,
+ batchConfig.maxInterval
+ )
+ case SinkConfig.Batch =>
+ new Batch(
+ new BatchQueryGraph(blazeClient, common, query),
+ transform,
+ sink,
+ batchConfig.maxElements,
+ batchConfig.maxInterval
+ )
+ }
+
+}
diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala
index 1a1d76aaf4..6dfcf2f718 100644
--- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala
+++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala
@@ -164,7 +164,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate")
) =>
- CompositeSpaces.Builder(cfg.prefix, esClient, cfg.elasticsearchBatch, blazeClient, cfg.blazegraphBatch)(
+ CompositeSpaces.Builder(cfg.prefix, esClient, blazeClient, cfg)(
baseUri,
cr
)
diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/config/CompositeViewsConfig.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/config/CompositeViewsConfig.scala
index 5310fc9918..5e89ea192a 100644
--- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/config/CompositeViewsConfig.scala
+++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/config/CompositeViewsConfig.scala
@@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config
import akka.http.scaladsl.model.Uri
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig.Credentials
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig.SinkConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.{BlazegraphAccess, RemoteSourceClientConfig, SourcesConfig}
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientConfig
import ch.epfl.bluebrain.nexus.delta.sdk.instances._
@@ -9,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{BatchConfig, EventLogConfig}
import com.typesafe.config.Config
import monix.bio.UIO
+import pureconfig.error.CannotConvert
import pureconfig.generic.auto._
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigReader, ConfigSource}
@@ -42,6 +44,8 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
* the interval at which a view will look for requested restarts
* @param indexingEnabled
* if false, disables composite view indexing
+ * @param sinkConfig
+ * type of sink used for composite indexing
*/
final case class CompositeViewsConfig(
sources: SourcesConfig,
@@ -55,7 +59,8 @@ final case class CompositeViewsConfig(
blazegraphBatch: BatchConfig,
elasticsearchBatch: BatchConfig,
restartCheckInterval: FiniteDuration,
- indexingEnabled: Boolean
+ indexingEnabled: Boolean,
+ sinkConfig: SinkConfig
)
object CompositeViewsConfig {
@@ -106,6 +111,26 @@ object CompositeViewsConfig {
maxTimeWindow: FiniteDuration
)
+ object SinkConfig {
+
+ /** Represents the choice of composite sink */
+ sealed trait SinkConfig
+
+ /** A sink that only supports querying one resource at once from blazegraph */
+ case object Single extends SinkConfig
+
+ /** A sink that supports querying multiple resources at once from blazegraph */
+ case object Batch extends SinkConfig
+
+ implicit val sinkConfigReaderString: ConfigReader[SinkConfig] =
+ ConfigReader.fromString {
+ case "batch" => Right(Batch)
+ case "single" => Right(Single)
+ case value =>
+ Left(CannotConvert(value, SinkConfig.getClass.getSimpleName, s"$value is not one of: [single, batch]"))
+ }
+ }
+
/**
* Converts a [[Config]] into an [[CompositeViewsConfig]]
*/
diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/BatchQueryGraph.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/BatchQueryGraph.scala
new file mode 100644
index 0000000000..30f253af29
--- /dev/null
+++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/BatchQueryGraph.scala
@@ -0,0 +1,47 @@
+package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing
+
+import ch.epfl.bluebrain.nexus.delta.kernel.Logger
+import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
+import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.SparqlNTriples
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.idTemplating
+import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
+import ch.epfl.bluebrain.nexus.delta.rdf.graph.{Graph, NTriples}
+import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery
+import fs2.Chunk
+import monix.bio.Task
+
+import java.util.regex.Pattern.quote
+
+/**
+ * Provides a way to query for the multiple incoming resources (from a chunk). This assumes that the query contains the
+ * template: `VALUE ?id { {resource_id} }`. The result is a single Graph for all given resources.
+ * @param client
+ * the blazegraph client used to query
+ * @param namespace
+ * the namespace to query
+ * @param query
+ * the sparql query to perform
+ */
+final class BatchQueryGraph(client: BlazegraphClient, namespace: String, query: SparqlConstructQuery) {
+
+ private val logger: Logger = Logger[BatchQueryGraph]
+
+ private def newGraph(ntriples: NTriples): Task[Option[Graph]] =
+ if (ntriples.isEmpty) Task.none
+ else Task.fromEither(Graph(ntriples)).map(Some(_))
+
+ def apply(ids: Chunk[Iri]): Task[Option[Graph]] =
+ for {
+ ntriples <- client.query(Set(namespace), replaceIds(query, ids), SparqlNTriples)
+ graphResult <- newGraph(ntriples.value)
+ _ <- Task.when(graphResult.isEmpty)(
+ logger.debug(s"Querying blazegraph did not return any triples, '$ids' will be dropped.")
+ )
+ } yield graphResult
+
+ private def replaceIds(query: SparqlConstructQuery, iris: Chunk[Iri]): SparqlConstructQuery = {
+ val replacement = iris.foldLeft("") { (acc, iri) => acc + " " + s"<$iri>" }
+ SparqlConstructQuery.unsafe(query.value.replaceAll(quote(idTemplating), replacement))
+ }
+
+}
diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala
index b4399ea2be..d073a86f4d 100644
--- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala
+++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala
@@ -2,18 +2,16 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
-import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink, GraphResourceToNTriples}
+import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphSink
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeSink
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection}
-import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
-import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSearchSink, GraphResourceToDocument}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
-import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import com.typesafe.scalalogging.Logger
import monix.bio.Task
@@ -54,41 +52,21 @@ object CompositeSpaces {
def apply(
prefix: String,
esClient: ElasticSearchClient,
- esBatchConfig: BatchConfig,
blazeClient: BlazegraphClient,
- blazeBatchConfig: BatchConfig
+ cfg: CompositeViewsConfig
)(implicit base: BaseUri, rcr: RemoteContextResolution): CompositeSpaces.Builder = (view: ActiveViewDef) => {
// Operations and sinks for the common space
val common = commonNamespace(view.uuid, view.rev, prefix)
- val commonSink = BlazegraphSink(blazeClient, blazeBatchConfig, common)
+ val commonSink = BlazegraphSink(blazeClient, cfg.blazegraphBatch, common)
val createCommon = blazeClient.createNamespace(common)
val deleteCommon = blazeClient.deleteNamespace(common)
- // Create the Blazegraph sink
- def createBlazeSink(namespace: String): SparqlProjection => Sink = { target =>
- val blazeSink = BlazegraphSink(blazeClient, blazeBatchConfig, namespace)
- new CompositeSink(
- QueryGraph(blazeClient, common, target.query),
- GraphResourceToNTriples.graphToNTriples,
- blazeSink.apply,
- blazeBatchConfig.maxElements,
- blazeBatchConfig.maxInterval
- )
- }
-
- // Create the Elasticsearch index
- def createEsSink(index: IndexLabel): ElasticSearchProjection => Sink = { target =>
- val esSink =
- ElasticSearchSink.states(esClient, esBatchConfig.maxElements, esBatchConfig.maxInterval, index, Refresh.False)
- new CompositeSink(
- QueryGraph(blazeClient, common, target.query),
- new GraphResourceToDocument(target.context, target.includeContext).graphToDocument,
- esSink.apply,
- esBatchConfig.maxElements,
- esBatchConfig.maxInterval
- )
- }
+ // Create sinks
+ def createBlazeSink(namespace: String): SparqlProjection => Sink =
+ CompositeSink.blazeSink(blazeClient, namespace, common, cfg)
+ def createEsSink(index: IndexLabel): ElasticSearchProjection => Sink =
+ CompositeSink.elasticSink(blazeClient, esClient, index, common, cfg)
// Compute the init and destroy operations as well as the sink for the different projections of the composite views
val start: (Task[Unit], Task[Unit], Map[Iri, Sink]) = (createCommon.void, deleteCommon.void, Map.empty[Iri, Sink])
diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/QueryGraph.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/SingleQueryGraph.scala
similarity index 85%
rename from delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/QueryGraph.scala
rename to delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/SingleQueryGraph.scala
index 6ccc375134..65a4b3fda3 100644
--- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/QueryGraph.scala
+++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/SingleQueryGraph.scala
@@ -13,17 +13,18 @@ import monix.bio.Task
import java.util.regex.Pattern.quote
/**
- * Pipe that performs the provided query for the incoming resource and replaces the graph with the result of query
+ * Provides a way to query for the incoming resource and replaces the graph with the result of query
+ *
* @param client
- * the blazegraph client
+ * the blazegraph client used to query
* @param namespace
* the namespace to query
* @param query
* the query to perform on each resource
*/
-final case class QueryGraph(client: BlazegraphClient, namespace: String, query: SparqlConstructQuery) {
+final class SingleQueryGraph(client: BlazegraphClient, namespace: String, query: SparqlConstructQuery) {
- private val logger: Logger = Logger[QueryGraph]
+ private val logger: Logger = Logger[SingleQueryGraph]
private def newGraph(ntriples: NTriples, id: Iri): Task[Option[Graph]] =
if (ntriples.isEmpty) {
diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsFixture.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsFixture.scala
index e5f872f7cf..1efa8ab7f8 100644
--- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsFixture.scala
+++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsFixture.scala
@@ -5,7 +5,7 @@ import cats.data.NonEmptySet
import ch.epfl.bluebrain.nexus.delta.kernel.Secret
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig
-import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.{BlazegraphAccess, RemoteSourceClientConfig, SourcesConfig}
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.{BlazegraphAccess, RemoteSourceClientConfig, SinkConfig, SourcesConfig}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeView.Interval
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjectionFields.{ElasticSearchProjectionFields, SparqlProjectionFields}
@@ -178,7 +178,8 @@ trait CompositeViewsFixture extends ConfigFixtures with EitherValuable {
batchConfig,
batchConfig,
3.seconds,
- false
+ false,
+ SinkConfig.Batch
)
}
diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala
index 721e14bc7f..a6fc5c3a75 100644
--- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala
+++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala
@@ -2,16 +2,20 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Query
+import cats.Semigroup
import cats.data.{NonEmptyList, NonEmptySet}
import cats.effect.Resource
import cats.effect.concurrent.Ref
-import cats.kernel.Semigroup
import cats.syntax.all._
+import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.SparqlNTriples
-import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeIndexingSuite.{batchConfig, Album, Band, Music}
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig.SinkConfig
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeIndexingSuite._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.Queries.{batchQuery, singleQuery}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.{CrossProjectSource, ProjectSource, RemoteProjectSource}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.{permissions, CompositeView, CompositeViewSource, CompositeViewValue}
@@ -19,20 +23,20 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.projections.Composit
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.store.CompositeRestartStore
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeBranch.Run.{Main, Rebuild}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.{CompositeBranch, CompositeGraphStream, CompositeProgress}
-import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.{CompositeViews, Fixtures}
+import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.{CompositeViews, CompositeViewsFixture, Fixtures}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel, QueryBuilder}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.{nxv, rdf, rdfs, schemas}
import ch.epfl.bluebrain.nexus.delta.rdf.graph.Graph
-import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue.ContextObject
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
+import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery
+import ch.epfl.bluebrain.nexus.delta.rdf.syntax.{iriStringContextSyntax, jsonOpsSyntax}
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
-import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{Sort, SortList}
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{BatchConfig, QueryConfig}
@@ -40,15 +44,15 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
+import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.{DiscardMetadata, FilterByType, FilterDeprecated}
+import ch.epfl.bluebrain.nexus.testkit.TestHelpers
import ch.epfl.bluebrain.nexus.testkit.bio.ResourceFixture.TaskFixture
-import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, JsonAssertions, PatienceConfig, ResourceFixture, TextAssertions}
-import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie
-import ch.epfl.bluebrain.nexus.testkit.{IOFixedClock, TestHelpers}
+import ch.epfl.bluebrain.nexus.testkit.bio._
import fs2.Stream
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredEncoder
@@ -56,27 +60,68 @@ import io.circe.generic.semiauto.deriveEncoder
import io.circe.syntax._
import io.circe.{Encoder, Json}
import monix.bio.{Task, UIO}
-import monix.execution.Scheduler
import munit.AnyFixture
import java.time.Instant
import java.util.UUID
-import scala.concurrent.duration._
+import scala.concurrent.duration.DurationInt
+
+class SingleCompositeIndexingSuite extends CompositeIndexingSuite(SinkConfig.Single, singleQuery)
+class BatchCompositeIndexingSuite extends CompositeIndexingSuite(SinkConfig.Batch, batchQuery)
+
+trait CompositeIndexingFixture extends BioSuite {
+
+ implicit private val baseUri: BaseUri = BaseUri("http://localhost", Label.unsafe("v1"))
+ implicit private val rcr: RemoteContextResolution = RemoteContextResolution.never
+
+ private val queryConfig = QueryConfig(10, RefreshStrategy.Delay(10.millis))
+ val batchConfig = BatchConfig(2, 50.millis)
+ private val compositeConfig =
+ CompositeViewsFixture.config.copy(
+ blazegraphBatch = batchConfig,
+ elasticsearchBatch = batchConfig
+ )
+
+ type Result = (ElasticSearchClient, BlazegraphClient, CompositeProjections, CompositeSpaces.Builder)
+
+ private def resource(sinkConfig: SinkConfig): Resource[Task, Result] = {
+ (Doobie.resource(), ElasticSearchClientSetup.resource(), BlazegraphClientSetup.resource()).parMapN {
+ case (xas, esClient, bgClient) =>
+ val compositeRestartStore = new CompositeRestartStore(xas)
+ val projections =
+ CompositeProjections(compositeRestartStore, xas, queryConfig, batchConfig, 3.seconds)
+ val spacesBuilder =
+ CompositeSpaces.Builder("delta", esClient, bgClient, compositeConfig.copy(sinkConfig = sinkConfig))(
+ baseUri,
+ rcr
+ )
+ (esClient, bgClient, projections, spacesBuilder)
+ }
+ }
+
+ def suiteLocalFixture(name: String, sinkConfig: SinkConfig): TaskFixture[Result] =
+ ResourceFixture.suiteLocal(name, resource(sinkConfig))
+
+ def compositeIndexing(sinkConfig: SinkConfig): ResourceFixture.TaskFixture[Result] =
+ suiteLocalFixture("compositeIndexing", sinkConfig)
+
+}
-class CompositeIndexingSuite
- extends BioSuite
- with CompositeIndexingSuite.Fixture
+abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConstructQuery)
+ extends CompositeIndexingFixture
with TestHelpers
with Fixtures
with JsonAssertions
with TextAssertions {
- override def munitFixtures: Seq[AnyFixture[_]] = List(compositeIndexing)
+ private val fixture = compositeIndexing(sinkConfig)
+
+ override def munitFixtures: Seq[AnyFixture[_]] = List(fixture)
implicit private val patienceConfig: PatienceConfig = PatienceConfig(10.seconds, 100.millis)
private val prefix = "delta"
- private lazy val (esClient, bgClient, projections, spacesBuilder) = compositeIndexing()
+ private lazy val (esClient, bgClient, projections, spacesBuilder) = fixture()
// Data to index
private val museId = iri"http://music.com/muse"
@@ -95,27 +140,6 @@ class CompositeIndexingSuite
private val theGatewayId = iri"http://music.com/the_getaway"
private val theGateway = Album(theGatewayId, "The Getaway", redHotId)
- private val query = SparqlConstructQuery.unsafe(
- """
- |prefix music:
- |CONSTRUCT {
- | {resource_id} music:name ?bandName ;
- | music:genre ?bandGenre ;
- | music:start ?bandStartYear ;
- | music:album ?albumId .
- | ?albumId music:title ?albumTitle .
- |} WHERE {
- | {resource_id} music:name ?bandName ;
- | music:start ?bandStartYear;
- | music:genre ?bandGenre .
- | OPTIONAL {
- | {resource_id} ^music:by ?albumId .
- | ?albumId music:title ?albumTitle .
- | }
- |}
- |""".stripMargin
- )
-
private val project1 = ProjectRef.unsafe("org", "proj")
private val project2 = ProjectRef.unsafe("org", "proj2")
private val project3 = ProjectRef.unsafe("org", "proj3")
@@ -180,7 +204,8 @@ class CompositeIndexingSuite
private val mainCompleted = Ref.unsafe[Task, Map[ProjectRef, Int]](Map.empty)
private val rebuildCompleted = Ref.unsafe[Task, Map[ProjectRef, Int]](Map.empty)
- private def resetCompleted = mainCompleted.set(Map.empty) >> rebuildCompleted.set(Map.empty)
+ private def resetCompleted = mainCompleted.set(Map.empty) >> rebuildCompleted.set(Map.empty)
+
private def increment(map: Ref[Task, Map[ProjectRef, Int]], project: ProjectRef) =
map.update(_.updatedWith(project)(_.map(_ + 1).orElse(Some(1))))
@@ -529,41 +554,17 @@ class CompositeIndexingSuite
} yield ()
}
- private val checkQuery = SparqlConstructQuery.unsafe("CONSTRUCT {?s ?p ?o} WHERE {?s ?p ?o} ORDER BY ?s")
+ private val checkQuery = SparqlConstructQuery.unsafe("CONSTRUCT {?s ?p ?o} WHERE {?s ?p ?o} ORDER BY ?s")
+
private def checkBlazegraphTriples(namespace: String, expected: String) =
bgClient
.query(Set(namespace), checkQuery, SparqlNTriples)
.map(_.value.toString)
.map(_.equalLinesUnordered(expected))
-}
-
-object CompositeIndexingSuite extends IOFixedClock {
-
- implicit private val baseUri: BaseUri = BaseUri("http://localhost", Label.unsafe("v1"))
- implicit private val rcr: RemoteContextResolution = RemoteContextResolution.never
-
- private val queryConfig: QueryConfig = QueryConfig(10, RefreshStrategy.Delay(10.millis))
- val batchConfig: BatchConfig = BatchConfig(2, 50.millis)
- type Result = (ElasticSearchClient, BlazegraphClient, CompositeProjections, CompositeSpaces.Builder)
-
- private def resource()(implicit s: Scheduler, cl: ClassLoader): Resource[Task, Result] = {
- (Doobie.resource(), ElasticSearchClientSetup.resource(), BlazegraphClientSetup.resource()).parMapN {
- case (xas, esClient, bgClient) =>
- val compositeRestartStore = new CompositeRestartStore(xas)
- val projections =
- CompositeProjections(compositeRestartStore, xas, queryConfig, batchConfig, 3.seconds)
- val spacesBuilder = CompositeSpaces.Builder("delta", esClient, batchConfig, bgClient, batchConfig)(baseUri, rcr)
- (esClient, bgClient, projections, spacesBuilder)
- }
- }
-
- def suiteLocalFixture(name: String)(implicit s: Scheduler, cl: ClassLoader): TaskFixture[Result] =
- ResourceFixture.suiteLocal(name, resource())
+}
- trait Fixture { self: BioSuite =>
- val compositeIndexing: ResourceFixture.TaskFixture[Result] = suiteLocalFixture("compositeIndexing")
- }
+object CompositeIndexingSuite {
private val ctxIri = ContextValue(iri"http://music.com/context")
@@ -576,7 +577,9 @@ object CompositeIndexingSuite extends IOFixedClock {
sealed trait Music extends Product with Serializable {
def id: Iri
+
def tpe: Iri
+
def label: String
}
@@ -584,16 +587,19 @@ object CompositeIndexingSuite extends IOFixedClock {
override val tpe: Iri = iri"http://music.com/Band"
override val label: String = name
}
+
object Band {
implicit val bandEncoder: Encoder.AsObject[Band] =
deriveConfiguredEncoder[Band].mapJsonObject(_.add("@type", "Band".asJson))
implicit val bandJsonLdEncoder: JsonLdEncoder[Band] =
JsonLdEncoder.computeFromCirce((b: Band) => b.id, ctxIri)
}
- final case class Album(id: Iri, title: String, by: Iri) extends Music {
+
+ final case class Album(id: Iri, title: String, by: Iri) extends Music {
override val tpe: Iri = iri"http://music.com/Album"
override val label: String = title
}
+
object Album {
implicit val albumEncoder: Encoder.AsObject[Album] =
deriveConfiguredEncoder[Album].mapJsonObject(_.add("@type", "Album".asJson))
@@ -602,6 +608,7 @@ object CompositeIndexingSuite extends IOFixedClock {
}
final case class Metadata(uuid: UUID)
+
object Metadata {
implicit private val encoderMetadata: Encoder.AsObject[Metadata] = deriveEncoder
implicit val jsonLdEncoderMetadata: JsonLdEncoder[Metadata] = JsonLdEncoder.computeFromCirce(ctxIri)
@@ -609,3 +616,52 @@ object CompositeIndexingSuite extends IOFixedClock {
}
}
+
+object Queries {
+ val batchQuery: SparqlConstructQuery = SparqlConstructQuery.unsafe(
+ """
+ |prefix music:
+ |CONSTRUCT {
+ | ?alias music:name ?bandName ;
+ | music:genre ?bandGenre ;
+ | music:start ?bandStartYear ;
+ | music:album ?albumId .
+ | ?albumId music:title ?albumTitle .
+ |} WHERE {
+ | VALUES ?id { {resource_id} } .
+ | BIND( IRI(concat(str(?id), '/', 'alias')) AS ?alias ) .
+ |
+ | ?id music:name ?bandName ;
+ | music:start ?bandStartYear;
+ | music:genre ?bandGenre .
+ | OPTIONAL {
+ | ?id ^music:by ?albumId .
+ | ?albumId music:title ?albumTitle .
+ | }
+ |}
+ |""".stripMargin
+ )
+
+ val singleQuery: SparqlConstructQuery = SparqlConstructQuery.unsafe(
+ """
+ |prefix music:
+ |CONSTRUCT {
+ | ?id music:name ?bandName ;
+ | music:genre ?bandGenre ;
+ | music:start ?bandStartYear ;
+ | music:album ?albumId .
+ | ?albumId music:title ?albumTitle .
+ |} WHERE {
+ | BIND( {resource_id} AS ?id ) .
+ |
+ | ?id music:name ?bandName ;
+ | music:start ?bandStartYear;
+ | music:genre ?bandGenre .
+ | OPTIONAL {
+ | ?id ^music:by ?albumId .
+ | ?albumId music:title ?albumTitle .
+ | }
+ |}
+ |""".stripMargin
+ )
+}
diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala
index 56060ec34e..63d05219e1 100644
--- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala
+++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala
@@ -78,6 +78,13 @@ sealed trait Elem[+A] extends Product with Serializable {
*/
def dropped: DroppedElem = DroppedElem(tpe, id, project, instant, offset, rev)
+ /** Action of dropping an Elem */
+ def drop: Elem[Nothing] = this match {
+ case e: SuccessElem[A] => e.dropped
+ case e: FailedElem => e
+ case e: DroppedElem => e
+ }
+
/**
* Maps the underlying element value if this is a [[Elem.SuccessElem]] using f.
* @param f
diff --git a/tests/docker/config/construct-query.sparql b/tests/docker/config/construct-query.sparql
index 77ba1b1b34..f7069580e9 100644
--- a/tests/docker/config/construct-query.sparql
+++ b/tests/docker/config/construct-query.sparql
@@ -10,7 +10,7 @@ prefix :
CONSTRUCT {
# Metadata
- ?id a ?type ;
+ ?alias a ?type ;
:name ?name ;
:description ?description ;
:createdAt ?createdAt ;
@@ -20,68 +20,68 @@ CONSTRUCT {
:deprecated ?deprecated ;
:self ?self .
- ?id :project ?projectBN .
- ?projectBN :identifier ?projectId ;
+ ?alias :project ?projectId .
+ ?projectId :identifier ?projectId ;
:label ?projectLabel .
# Common properties
## Annotations
- ?id :mType ?mType .
+ ?alias :mType ?mType .
?mType :identifier ?mType ;
:label ?mTypeLabel ;
:idLabel ?mTypeIdLabel .
- ?id :eType ?eType .
+ ?alias :eType ?eType .
?eType :identifier ?eType ;
:label ?eTypeLabel ;
:idLabel ?eTypeIdLabel .
- ?id :sType ?sType .
+ ?alias :sType ?sType .
?sType :identifier ?sType ;
:label ?sTypeLabel ;
:idLabel ?sTypeIdLabel .
## Atlas
- ?id :coordinatesInBrainAtlas ?coordinates .
+ ?alias :coordinatesInBrainAtlas ?coordinates .
?coordinates :valueX ?valueX ;
:valueY ?valueY ;
:valueZ ?valueZ .
## Brain region / layer
- ?id :brainRegion ?brainRegionId .
- ?brainRegionId :identifier ?brainRegionId ;
- :label ?brainRegionLabel ;
- :idLabel ?brainRegionIdLabel .
- ?id :layer ?layerId .
+ ?alias :brainRegion ?brainRegionId .
+ ?brainRegionId :identifier ?brainRegionId ;
+ :label ?brainRegionLabel ;
+ :idLabel ?brainRegionIdLabel .
+ ?alias :layer ?layerId .
?layerId :identifier ?layerId ;
:label ?layerLabel ;
:idLabel ?layerIdLabel .
## Contributors / Organizations
- ?id :contributors ?personId .
+ ?alias :contributors ?personId .
?personId :identifier ?personId ;
:label ?personName ;
:idLabel ?personIdName ;
:affiliation ?affiliation .
?affiliation :label ?affiliationName .
- ?id :organizations ?orgId .
+ ?alias :organizations ?orgId .
?orgId :identifier ?orgId ;
:label ?organizationName ;
:idLabel ?organizationIdName .
## Derivation
- ?id :derivation ?derivation .
+ ?alias :derivation ?derivation .
?derivation :identifier ?entity ;
rdf:type ?entityType ;
:label ?entityName .
## Distribution
- ?id :distribution ?distribution .
+ ?alias :distribution ?distribution .
?distribution :label ?distributionName ;
:encodingFormat ?distributionEncodingFormat ;
:contentUrl ?distributionContentUrl ;
:contentSize ?distributionContentSize .
## Generation
- ?id :generation ?generation .
+ ?alias :generation ?generation .
?generation :protocol ?protocol ;
:startedAt ?generationStartedAtTime ;
:endedAt ?generationEndedAtTime .
@@ -90,25 +90,25 @@ CONSTRUCT {
:value ?protocolValue .
## Images
- ?id :image ?image .
+ ?alias :image ?image .
?image :identifier ?image ;
:about ?imageAbout ;
:repetition ?imageRepetition ;
:stimulusType ?imageStimulusTypeLabel .
## License
- ?id :license ?licenseBN .
- ?licenseBN :identifier ?licenseId ;
+ ?alias :license ?licenseId .
+ ?licenseId :identifier ?licenseId ;
:label ?licenseLabel .
## Series
- ?id :series ?series .
+ ?alias :series ?series .
?series :value ?seriesValue ;
:unit ?seriesUnit ;
:statistic ?seriesStatistic .
## Source
- ?id :source ?source .
+ ?alias :source ?source .
?source :title ?sourceTitle ;
rdf:type ?sourceType ;
:identifier ?sourceIdentifier .
@@ -116,79 +116,80 @@ CONSTRUCT {
:value ?sourceIdentifierValue .
## Species
- ?id :subjectSpecies ?species .
+ ?alias :subjectSpecies ?species .
?species :identifier ?species ;
:label ?speciesLabel .
## Start / end / status
- ?id :startedAt ?startedAt ;
+ ?alias :startedAt ?startedAt ;
:endedAt ?endedAt ;
:status ?status .
## Subject
- ?id :subjectAge ?age .
- ?age :value ?subjectAgeValue ;
- :minValue ?subjectAgeMinValue ;
- :maxValue ?subjectAgeMaxValue ;
- :unit ?subjectAgeUnit ;
- :period ?subjectAgePeriod ;
- :label ?subjectAgeLabel .
- ?id :subjectWeight ?subjectWeightBN .
- ?subjectWeightBN :value ?subjectWeightValue ;
- :unit ?subjectWeightUnit ;
- :minValue ?subjectWeightMinValue ;
- :maxValue ?subjectWeightMaxValue ;
- :label ?subjectWeightLabel .
+ ?alias :subjectAge ?age .
+ ?age :value ?subjectAgeValue ;
+ :minValue ?subjectAgeMinValue ;
+ :maxValue ?subjectAgeMaxValue ;
+ :unit ?subjectAgeUnit ;
+ :period ?subjectAgePeriod ;
+ :label ?subjectAgeLabel .
+ ?alias :subjectWeight ?weight .
+ ?weight :value ?subjectWeightValue ;
+ :unit ?subjectWeightUnit ;
+ :minValue ?subjectWeightMinValue ;
+ :maxValue ?subjectWeightMaxValue ;
+ :label ?subjectWeightLabel .
# Properties specific to types
## Bouton density
- ?id :boutonDensity ?boutonDensityBN .
- ?boutonDensityBN :value ?boutonDensityValue ;
+ ?alias :boutonDensity ?boutonDensityBN .
+ ?boutonDensityBN :value ?boutonDensityValue ;
:unit ?boutonDensityUnit ;
:label ?boutonDensityLabel .
## Circuit
- ?id :circuitType ?circuitType ;
+ ?alias :circuitType ?circuitType ;
:circuitBase ?circuitBaseUrlStr ;
:circuitConfigPath ?circuitConfigPathUrlStr ;
- :circuitBrainRegion ?circuitBrainRegionBN .
- ?circuitBrainRegionBN :identifier ?circuitBrainRegionId ;
+ :circuitBrainRegion ?circuitBrainRegionId .
+ ?circuitBrainRegionId :identifier ?circuitBrainRegionId ;
:label ?circuitBrainRegionLabel .
## Layer thickness
- ?id :layerThickness ?thicknessBN .
- ?thicknessBN :value ?thicknessValue ;
+ ?alias :layerThickness ?thicknessBN .
+ ?thicknessBN :value ?thicknessValue ;
:unit ?thicknessUnit ;
:nValue ?thicknessNValue ;
:label ?thicknessLabel .
## Neuron density
- ?id :neuronDensity ?neuronDensityBN .
- ?neuronDensityBN :value ?neuronDensityValue ;
+ ?alias :neuronDensity ?neuronDensityBN .
+ ?neuronDensityBN :value ?neuronDensityValue ;
:unit ?neuronDensityUnit ;
:nValue ?neuronDensityNValue ;
:label ?neuronDensityLabel .
## Simulation campaigns
- ?id :config ?campaignConfigId .
+ ?alias :config ?campaignConfigId .
?campaignConfigId :identifier ?campaignConfigId ;
:name ?campaignConfigName .
## Simulation campaigns / simulations
- ?id :parameter ?parameter .
+ ?alias :parameter ?parameter .
?parameter :attrs ?attrs .
?attrs ?attrs_prop ?attrs_value .
- ?id :parameter ?parameter .
+ ?alias :parameter ?parameter .
?parameter :coords ?coords .
?coords ?coords_prop ?coords_value .
## Simulations
- ?id :campaign ?campaignId .
+ ?alias :campaign ?campaignId .
?campaignId :identifier ?campaignId ;
:name ?campaignName .
} WHERE {
- BIND({resource_id} as ?id) .
+ VALUES ?id { {resource_id} } .
+ BIND( IRI(concat(str(?id), '/', 'alias')) AS ?alias ) .
?id a ?type .
@@ -211,7 +212,6 @@ CONSTRUCT {
nxv:self ?self ;
nxv:project ?projectId .
BIND( STRAFTER(STR(?projectId), "/v1/projects/") as ?projectLabel ) .
- BIND( BNODE() AS ?projectBN ) .
# We read the following nodes as text. This is done in order to avoid conflict
# when another triple uses the same @id. For instance, createdBy and updatedBy
@@ -401,10 +401,7 @@ CONSTRUCT {
# License
OPTIONAL {
?id schema:license ?licenseId .
- OPTIONAL {
- ?licenseId schema:name ?licenseLabel .
- } .
- BIND( BNODE() AS ?licenseBN ) .
+ OPTIONAL { ?licenseId schema:name ?licenseLabel . } .
} .
# Series
@@ -469,7 +466,6 @@ CONSTRUCT {
CONCAT(STR(?subjectWeightMinValue), " to ", STR(?subjectWeightMaxValue), " ", STR(?subjectWeightUnit))
) as ?subjectWeightLabel
) .
- BIND( BNODE() as ?subjectWeightBN ) .
} .
# Bouton density
@@ -485,7 +481,7 @@ CONSTRUCT {
STR(?boutonDensityUnit)
) as ?boutonDensityLabel
) .
- BIND( BNODE() as ?boutonDensityBN ) .
+ BIND( BNODE(CONCAT(STR(?id), '/boutonDensity')) as ?boutonDensityBN ) .
} .
# Circuit
@@ -524,12 +520,12 @@ CONSTRUCT {
OPTIONAL {
?id a nsg:LayerThickness ;
nsg:series ?meanSeries .
- ?meanSeries nsg:statistic "mean" ;
- schema:value ?thicknessValue ;
- schema:unitCode ?thicknessUnit .
- ?id nsg:series ?nSeries .
- ?nSeries nsg:statistic "N" ;
- schema:value ?thicknessNValue .
+ ?meanSeries nsg:statistic "mean" ;
+ schema:value ?thicknessValue ;
+ schema:unitCode ?thicknessUnit .
+ ?id nsg:series ?nSeries .
+ ?nSeries nsg:statistic "N" ;
+ schema:value ?thicknessNValue .
BIND(
CONCAT(
STR(?thicknessValue), " ",
@@ -537,7 +533,8 @@ CONSTRUCT {
STR(?thicknessNValue), ")"
) as ?thicknessLabel
) .
- BIND( BNODE() as ?thicknessBN ) .
+
+ BIND( BNODE(CONCAT(STR(?id), '/layerThickness')) as ?thicknessBN ) .
} .
# Neuron density
@@ -558,7 +555,8 @@ CONSTRUCT {
STR(?neuronDensityNValue), ")"
) as ?neuronDensityLabel
) .
- BIND( BNODE() as ?neuronDensityBN ) .
+
+ BIND( BNODE((CONCAT(STR(?id), '/neuronDensity'))) as ?neuronDensityBN ) .
} .
# Simulation campaign configuration
@@ -585,7 +583,6 @@ CONSTRUCT {
OPTIONAL {
?circuitBrainRegionId rdfs:label ?circuitBrainRegionLabel .
}
- BIND( BNODE() AS ?circuitBrainRegionBN ) .
} .
} .
} .
diff --git a/tests/docker/config/delta-postgres.conf b/tests/docker/config/delta-postgres.conf
index b27dded274..e5fe26cecd 100644
--- a/tests/docker/config/delta-postgres.conf
+++ b/tests/docker/config/delta-postgres.conf
@@ -55,6 +55,7 @@ plugins {
composite-views {
min-interval-rebuild = 5 seconds
+ sink-config = batch
}
elasticsearch {
diff --git a/tests/src/test/resources/kg/views/composite/composite-view-include-context.json b/tests/src/test/resources/kg/views/composite/composite-view-include-context.json
index 6afe1aeafe..6181f5546f 100644
--- a/tests/src/test/resources/kg/views/composite/composite-view-include-context.json
+++ b/tests/src/test/resources/kg/views/composite/composite-view-include-context.json
@@ -80,7 +80,7 @@
},
"dynamic": false
},
- "query": "prefix music: prefix nxv: CONSTRUCT { {resource_id} music:name ?bandName ; music:genre ?bandGenre ; music:album ?albumId . ?albumId music:released ?albumReleaseDate ; music:song ?songId . ?songId music:title ?songTitle ; music:number ?songNumber ; music:length ?songLength } WHERE { {resource_id} music:name ?bandName ; music:genre ?bandGenre . OPTIONAL { {resource_id} ^music:by ?albumId . ?albumId music:released ?albumReleaseDate . OPTIONAL {?albumId ^music:on ?songId . ?songId music:title ?songTitle ; music:number ?songNumber ; music:length ?songLength } } } ORDER BY(?songNumber)",
+ "query": "{{bandQuery}}",
"context": {
"@base": "https://music.example.com/",
"@vocab": "https://music.example.com/"
@@ -121,7 +121,7 @@
},
"dynamic": false
},
- "query": "prefix xsd: prefix music: prefix nxv: CONSTRUCT { {resource_id} music:name ?albumTitle ; music:length ?albumLength ; music:numberOfSongs ?numberOfSongs } WHERE {SELECT ?albumReleaseDate ?albumTitle (sum(xsd:integer(?songLength)) as ?albumLength) (count(?albumReleaseDate) as ?numberOfSongs) WHERE {OPTIONAL { {resource_id} ^music:on / music:length ?songLength } {resource_id} music:released ?albumReleaseDate ; music:title ?albumTitle . } GROUP BY ?albumReleaseDate ?albumTitle }",
+ "query": "{{albumQuery}}",
"context": {
"@base": "https://music.example.com/",
"@vocab": "https://music.example.com/"
diff --git a/tests/src/test/resources/kg/views/composite/composite-view.json b/tests/src/test/resources/kg/views/composite/composite-view.json
index d9da4e0983..58a1e0b9b8 100644
--- a/tests/src/test/resources/kg/views/composite/composite-view.json
+++ b/tests/src/test/resources/kg/views/composite/composite-view.json
@@ -80,7 +80,7 @@
},
"dynamic": false
},
- "query": "prefix music: prefix nxv: CONSTRUCT { {resource_id} music:name ?bandName ; music:genre ?bandGenre ; music:album ?albumId . ?albumId music:released ?albumReleaseDate ; music:song ?songId . ?songId music:title ?songTitle ; music:number ?songNumber ; music:length ?songLength } WHERE { {resource_id} music:name ?bandName ; music:genre ?bandGenre . OPTIONAL { {resource_id} ^music:by ?albumId . ?albumId music:released ?albumReleaseDate . OPTIONAL {?albumId ^music:on ?songId . ?songId music:title ?songTitle ; music:number ?songNumber ; music:length ?songLength } } } ORDER BY(?songNumber)",
+ "query": "{{bandQuery}}",
"context": {
"@base": "https://music.example.com/",
"@vocab": "https://music.example.com/"
@@ -120,7 +120,7 @@
},
"dynamic": false
},
- "query": "prefix xsd: prefix music: prefix nxv: CONSTRUCT { {resource_id} music:name ?albumTitle ; music:length ?albumLength ; music:numberOfSongs ?numberOfSongs } WHERE {SELECT ?albumReleaseDate ?albumTitle (sum(xsd:integer(?songLength)) as ?albumLength) (count(?albumReleaseDate) as ?numberOfSongs) WHERE {OPTIONAL { {resource_id} ^music:on / music:length ?songLength } {resource_id} music:released ?albumReleaseDate ; music:title ?albumTitle . } GROUP BY ?albumReleaseDate ?albumTitle }",
+ "query": "{{albumQuery}}",
"context": {
"@base": "https://music.example.com/",
"@vocab": "https://music.example.com/"
diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsSpec.scala
index b14a4bfbcc..32bd61d04c 100644
--- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsSpec.scala
+++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsSpec.scala
@@ -7,11 +7,13 @@ import ch.epfl.bluebrain.nexus.tests.HttpClient._
import ch.epfl.bluebrain.nexus.tests.Identity.compositeviews.Jerry
import ch.epfl.bluebrain.nexus.tests.Optics._
import ch.epfl.bluebrain.nexus.tests.iam.types.Permission.{Events, Organizations, Views}
+import ch.epfl.bluebrain.nexus.tests.kg.CompositeViewsSpec.{albumQuery, bandQuery}
import com.typesafe.scalalogging.Logger
import io.circe.Json
import io.circe.optics.JsonPath._
import monix.bio.Task
import monix.execution.Scheduler.Implicits.global
+
import scala.concurrent.duration._
class CompositeViewsSpec extends BaseSpec {
@@ -136,7 +138,9 @@ class CompositeViewsSpec extends BaseSpec {
"org" -> orgId,
"org2" -> orgId,
"remoteEndpoint" -> "http://delta:8080/v1",
- "token" -> jerryToken
+ "token" -> jerryToken,
+ "bandQuery" -> bandQuery,
+ "albumQuery" -> albumQuery
): _*
)
@@ -166,7 +170,9 @@ class CompositeViewsSpec extends BaseSpec {
"org" -> orgId,
"org2" -> orgId,
"remoteEndpoint" -> "http://delta:8080/v1/other",
- "token" -> jerryToken
+ "token" -> jerryToken,
+ "bandQuery" -> bandQuery,
+ "albumQuery" -> albumQuery
): _*
)
@@ -183,7 +189,9 @@ class CompositeViewsSpec extends BaseSpec {
"org" -> orgId,
"org2" -> orgId,
"remoteEndpoint" -> "http://delta:8080/v1",
- "token" -> s"${jerryToken}wrong"
+ "token" -> s"${jerryToken}wrong",
+ "bandQuery" -> bandQuery,
+ "albumQuery" -> albumQuery
): _*
)
@@ -207,7 +215,9 @@ class CompositeViewsSpec extends BaseSpec {
"org" -> orgId,
"org2" -> orgId,
"remoteEndpoint" -> "http://fail.does.not.exist.at.all.asndkajbskhabsdfjhabsdfjkh/v1",
- "token" -> jerryToken
+ "token" -> jerryToken,
+ "bandQuery" -> bandQuery,
+ "albumQuery" -> albumQuery
): _*
)
@@ -312,7 +322,9 @@ class CompositeViewsSpec extends BaseSpec {
"org" -> orgId,
"org2" -> orgId,
"remoteEndpoint" -> "http://delta:8080/v1",
- "token" -> jerryToken
+ "token" -> jerryToken,
+ "bandQuery" -> bandQuery,
+ "albumQuery" -> albumQuery
): _*
)
@@ -390,4 +402,75 @@ class CompositeViewsSpec extends BaseSpec {
}
}
}
+
+}
+
+object CompositeViewsSpec {
+
+ private val bandQuery =
+ raw"""
+ |PREFIX nxv:
+ |PREFIX music:
+ |
+ |CONSTRUCT
+ | {
+ | ?alias music:name ?bandName ;
+ | music:genre ?bandGenre ;
+ | music:album ?albumId .
+ | ?albumId music:released ?albumReleaseDate ;
+ | music:song ?songId .
+ | ?songId music:title ?songTitle ;
+ | music:number ?songNumber ;
+ | music:length ?songLength .
+ | }
+ |WHERE
+ | { VALUES ?id { {resource_id} }
+ | BIND(IRI(concat(str(?id), '/alias')) AS ?alias)
+ |
+ | ?id music:name ?bandName ;
+ | music:genre ?bandGenre
+ |
+ | OPTIONAL
+ | { ?id ^music:by ?albumId .
+ | ?albumId music:released ?albumReleaseDate
+ | OPTIONAL
+ | { ?albumId ^music:on ?songId .
+ | ?songId music:title ?songTitle ;
+ | music:number ?songNumber ;
+ | music:length ?songLength
+ | }
+ | }
+ | }
+ |ORDER BY ?songNumber
+ |""".stripMargin
+ .replaceAll("\\n", " ")
+
+ private val albumQuery =
+ raw"""
+ |PREFIX xsd:
+ |PREFIX music:
+ |PREFIX nxv:
+ |
+ |CONSTRUCT
+ | {
+ | ?alias music:name ?albumTitle ;
+ | music:length ?albumLength ;
+ | music:numberOfSongs ?numberOfSongs .
+ | }
+ |WHERE
+ | { { SELECT ?id ?albumReleaseDate ?albumTitle (SUM(xsd:integer(?songLength)) AS ?albumLength) (COUNT(?albumReleaseDate) AS ?numberOfSongs)
+ | WHERE
+ | { VALUES ?id { {resource_id} } .
+ | OPTIONAL
+ | { ?id ^music:on/music:length ?songLength }
+ | ?id music:released ?albumReleaseDate ;
+ | music:title ?albumTitle .
+ | }
+ | GROUP BY ?id ?albumReleaseDate ?albumTitle
+ | }
+ | BIND(IRI(concat(str(?id), '/alias')) AS ?alias)
+ | }
+ |""".stripMargin
+ .replaceAll("\\n", " ")
+
}
diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigSpec.scala
index 57ba062754..aaab7349c9 100644
--- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigSpec.scala
+++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigSpec.scala
@@ -89,6 +89,7 @@ class SearchConfigSpec extends BaseSpec {
json"""
{
"project" : {
+ "@id": "http://delta:8080/v1/projects/$id1",
"identifier" : "http://delta:8080/v1/projects/$id1",
"label" : "$id1"
}
@@ -106,6 +107,7 @@ class SearchConfigSpec extends BaseSpec {
json"""
{
"license" : {
+ "@id" : "https://bbp.epfl.ch/neurosciencegraph/data/licenses/97521f71-605d-4f42-8f1b-c37e742a30bf",
"identifier" : "https://bbp.epfl.ch/neurosciencegraph/data/licenses/97521f71-605d-4f42-8f1b-c37e742a30bf",
"label" : "SSCX Portal Data Licence final v1.0"
}