diff --git a/delta/plugins/composite-views/src/main/resources/composite-views.conf b/delta/plugins/composite-views/src/main/resources/composite-views.conf index 1a7e40a794..c691e14bd3 100644 --- a/delta/plugins/composite-views/src/main/resources/composite-views.conf +++ b/delta/plugins/composite-views/src/main/resources/composite-views.conf @@ -45,4 +45,7 @@ plugins.composite-views { # set to false to disable composite view indexing indexing-enabled = ${app.defaults.indexing.enable} + + # type of composite sink to use for composite view indexing + sink-config = single } \ No newline at end of file diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala index d4229a9acb..2175b4b02b 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala @@ -1,7 +1,23 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews import cats.implicits._ -import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.QueryGraph +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink, GraphResourceToNTriples} +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig.SinkConfig +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.{BatchQueryGraph, SingleQueryGraph} +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection} +import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh +import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel} +import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSearchSink, GraphResourceToDocument} +import ch.epfl.bluebrain.nexus.delta.rdf.graph.Graph +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} +import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery +import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax +import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri +import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} @@ -12,6 +28,12 @@ import shapeless.Typeable import scala.concurrent.duration.FiniteDuration +/** + * A composite sink handles querying the common blazegraph namespace, transforming the result into a format that can be + * pushed into a target namespace or index, and finally sinks it into the target. + */ +trait CompositeSink extends Sink + /** * A sink that queries N-Triples in Blazegraph, transforms them, and pushes the result to the provided sink * @param queryGraph @@ -19,21 +41,21 @@ import scala.concurrent.duration.FiniteDuration * @param transform * function to transform a graph into the format needed by the sink * @param sink - * function that allows + * function that defines how to sink a chunk of Elem[SinkFormat] * @param chunkSize - * the maximum number of elements to be pushed in ES at once + * the maximum number of elements to be pushed into the sink * @param maxWindow - * the maximum number of elements to be pushed at once + * the maximum time to wait for the chunk to gather [[chunkSize]] elements * @tparam SinkFormat * the type of data accepted by the sink */ -final class CompositeSink[SinkFormat]( - queryGraph: QueryGraph, +final class Single[SinkFormat]( + queryGraph: SingleQueryGraph, transform: GraphResource => Task[Option[SinkFormat]], sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]], override val chunkSize: Int, override val maxWindow: FiniteDuration -) extends Sink { +) extends CompositeSink { override type In = GraphResource override def inType: Typeable[GraphResource] = Typeable[GraphResource] @@ -54,3 +76,166 @@ final class CompositeSink[SinkFormat]( .flatMap(sink) } + +/** + * A sink that queries N-Triples in Blazegraph for multiple resources, transforms it for each resource, and pushes the + * result to the provided sink + * @param queryGraph + * how to query the blazegraph + * @param transform + * function to transform a graph into the format needed by the sink + * @param sink + * function that defines how to sink a chunk of Elem[SinkFormat] + * @param chunkSize + * the maximum number of elements to be pushed into the sink + * @param maxWindow + * the maximum time to wait for the chunk to gather [[chunkSize]] elements + * @tparam SinkFormat + * the type of data accepted by the sink + */ +final class Batch[SinkFormat]( + queryGraph: BatchQueryGraph, + transform: GraphResource => Task[Option[SinkFormat]], + sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]], + override val chunkSize: Int, + override val maxWindow: FiniteDuration +)(implicit rcr: RemoteContextResolution) + extends CompositeSink { + + override type In = GraphResource + + override def inType: Typeable[GraphResource] = Typeable[GraphResource] + + /** Performs the sparql query only using [[SuccessElem]]s from the chunk */ + private def query(elements: Chunk[Elem[GraphResource]]): Task[Option[Graph]] = + elements.mapFilter(elem => elem.map(_.id).toOption) match { + case ids if ids.nonEmpty => queryGraph(ids) + case _ => Task.none + } + + /** Replaces the graph of a provided [[GraphResource]] by extracting its new graph from the provided (full) graph. */ + private def replaceGraph(gr: GraphResource, fullGraph: Graph) = { + implicit val api: JsonLdApi = JsonLdJavaApi.lenient + fullGraph + .replaceRootNode(iri"${gr.id}/alias") + .toCompactedJsonLd(ContextValue.empty) + .flatMap(_.toGraph) + .map(g => gr.copy(graph = g.replaceRootNode(gr.id))) + } + + override def apply(elements: Chunk[Elem[GraphResource]]): Task[Chunk[Elem[Unit]]] = + for { + graph <- query(elements) + transformed <- graph match { + case Some(fullGraph) => + elements.traverse { elem => + elem.evalMapFilter { gr => + replaceGraph(gr, fullGraph).flatMap(transform) + } + } + case None => + Task.pure(elements.map(_.drop)) + } + sank <- sink(transformed) + } yield sank +} + +object CompositeSink { + + /** + * @param blazeClient + * client used to connect to blazegraph + * @param namespace + * name of the target blazegraph namespace + * @param common + * name of the common blazegraph namespace + * @param cfg + * configuration of the composite views + * @return + * a function that given a sparql view returns a composite sink that has the view as target + */ + def blazeSink( + blazeClient: BlazegraphClient, + namespace: String, + common: String, + cfg: CompositeViewsConfig + )(implicit baseUri: BaseUri, rcr: RemoteContextResolution): SparqlProjection => CompositeSink = { target => + compositeSink( + blazeClient, + common, + target.query, + GraphResourceToNTriples.graphToNTriples, + BlazegraphSink(blazeClient, cfg.blazegraphBatch, namespace).apply, + cfg.blazegraphBatch, + cfg.sinkConfig + ) + } + + /** + * @param blazeClient + * blazegraph client used to query the common space + * @param esClient + * client used to push to elasticsearch + * @param index + * name of the target elasticsearch index + * @param common + * name of the common blazegraph namespace + * @param cfg + * configuration of the composite views + * @return + * a function that given a elasticsearch view returns a composite sink that has the view as target + */ + def elasticSink( + blazeClient: BlazegraphClient, + esClient: ElasticSearchClient, + index: IndexLabel, + common: String, + cfg: CompositeViewsConfig + )(implicit rcr: RemoteContextResolution): ElasticSearchProjection => CompositeSink = { target => + val esSink = + ElasticSearchSink.states( + esClient, + cfg.elasticsearchBatch.maxElements, + cfg.elasticsearchBatch.maxInterval, + index, + Refresh.False + ) + compositeSink( + blazeClient, + common, + target.query, + new GraphResourceToDocument(target.context, target.includeContext).graphToDocument, + esSink.apply, + cfg.elasticsearchBatch, + cfg.sinkConfig + ) + } + + private def compositeSink[SinkFormat]( + blazeClient: BlazegraphClient, + common: String, + query: SparqlConstructQuery, + transform: GraphResource => Task[Option[SinkFormat]], + sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]], + batchConfig: BatchConfig, + sinkConfig: SinkConfig + )(implicit rcr: RemoteContextResolution): CompositeSink = sinkConfig match { + case SinkConfig.Single => + new Single( + new SingleQueryGraph(blazeClient, common, query), + transform, + sink, + batchConfig.maxElements, + batchConfig.maxInterval + ) + case SinkConfig.Batch => + new Batch( + new BatchQueryGraph(blazeClient, common, query), + transform, + sink, + batchConfig.maxElements, + batchConfig.maxInterval + ) + } + +} diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala index 1a1d76aaf4..6dfcf2f718 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsPluginModule.scala @@ -164,7 +164,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef { baseUri: BaseUri, cr: RemoteContextResolution @Id("aggregate") ) => - CompositeSpaces.Builder(cfg.prefix, esClient, cfg.elasticsearchBatch, blazeClient, cfg.blazegraphBatch)( + CompositeSpaces.Builder(cfg.prefix, esClient, blazeClient, cfg)( baseUri, cr ) diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/config/CompositeViewsConfig.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/config/CompositeViewsConfig.scala index 5310fc9918..5e89ea192a 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/config/CompositeViewsConfig.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/config/CompositeViewsConfig.scala @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config import akka.http.scaladsl.model.Uri import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig.Credentials +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig.SinkConfig import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.{BlazegraphAccess, RemoteSourceClientConfig, SourcesConfig} import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientConfig import ch.epfl.bluebrain.nexus.delta.sdk.instances._ @@ -9,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig import ch.epfl.bluebrain.nexus.delta.sourcing.config.{BatchConfig, EventLogConfig} import com.typesafe.config.Config import monix.bio.UIO +import pureconfig.error.CannotConvert import pureconfig.generic.auto._ import pureconfig.generic.semiauto.deriveReader import pureconfig.{ConfigReader, ConfigSource} @@ -42,6 +44,8 @@ import scala.concurrent.duration.{Duration, FiniteDuration} * the interval at which a view will look for requested restarts * @param indexingEnabled * if false, disables composite view indexing + * @param sinkConfig + * type of sink used for composite indexing */ final case class CompositeViewsConfig( sources: SourcesConfig, @@ -55,7 +59,8 @@ final case class CompositeViewsConfig( blazegraphBatch: BatchConfig, elasticsearchBatch: BatchConfig, restartCheckInterval: FiniteDuration, - indexingEnabled: Boolean + indexingEnabled: Boolean, + sinkConfig: SinkConfig ) object CompositeViewsConfig { @@ -106,6 +111,26 @@ object CompositeViewsConfig { maxTimeWindow: FiniteDuration ) + object SinkConfig { + + /** Represents the choice of composite sink */ + sealed trait SinkConfig + + /** A sink that only supports querying one resource at once from blazegraph */ + case object Single extends SinkConfig + + /** A sink that supports querying multiple resources at once from blazegraph */ + case object Batch extends SinkConfig + + implicit val sinkConfigReaderString: ConfigReader[SinkConfig] = + ConfigReader.fromString { + case "batch" => Right(Batch) + case "single" => Right(Single) + case value => + Left(CannotConvert(value, SinkConfig.getClass.getSimpleName, s"$value is not one of: [single, batch]")) + } + } + /** * Converts a [[Config]] into an [[CompositeViewsConfig]] */ diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/BatchQueryGraph.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/BatchQueryGraph.scala new file mode 100644 index 0000000000..30f253af29 --- /dev/null +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/BatchQueryGraph.scala @@ -0,0 +1,47 @@ +package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing + +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.SparqlNTriples +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.idTemplating +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.rdf.graph.{Graph, NTriples} +import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery +import fs2.Chunk +import monix.bio.Task + +import java.util.regex.Pattern.quote + +/** + * Provides a way to query for the multiple incoming resources (from a chunk). This assumes that the query contains the + * template: `VALUE ?id { {resource_id} }`. The result is a single Graph for all given resources. + * @param client + * the blazegraph client used to query + * @param namespace + * the namespace to query + * @param query + * the sparql query to perform + */ +final class BatchQueryGraph(client: BlazegraphClient, namespace: String, query: SparqlConstructQuery) { + + private val logger: Logger = Logger[BatchQueryGraph] + + private def newGraph(ntriples: NTriples): Task[Option[Graph]] = + if (ntriples.isEmpty) Task.none + else Task.fromEither(Graph(ntriples)).map(Some(_)) + + def apply(ids: Chunk[Iri]): Task[Option[Graph]] = + for { + ntriples <- client.query(Set(namespace), replaceIds(query, ids), SparqlNTriples) + graphResult <- newGraph(ntriples.value) + _ <- Task.when(graphResult.isEmpty)( + logger.debug(s"Querying blazegraph did not return any triples, '$ids' will be dropped.") + ) + } yield graphResult + + private def replaceIds(query: SparqlConstructQuery, iris: Chunk[Iri]): SparqlConstructQuery = { + val replacement = iris.foldLeft("") { (acc, iri) => acc + " " + s"<$iri>" } + SparqlConstructQuery.unsafe(query.value.replaceAll(quote(idTemplating), replacement)) + } + +} diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala index b4399ea2be..d073a86f4d 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeSpaces.scala @@ -2,18 +2,16 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient -import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink, GraphResourceToNTriples} +import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphSink import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeSink +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection} -import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel} -import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSearchSink, GraphResourceToDocument} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri -import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import com.typesafe.scalalogging.Logger import monix.bio.Task @@ -54,41 +52,21 @@ object CompositeSpaces { def apply( prefix: String, esClient: ElasticSearchClient, - esBatchConfig: BatchConfig, blazeClient: BlazegraphClient, - blazeBatchConfig: BatchConfig + cfg: CompositeViewsConfig )(implicit base: BaseUri, rcr: RemoteContextResolution): CompositeSpaces.Builder = (view: ActiveViewDef) => { // Operations and sinks for the common space val common = commonNamespace(view.uuid, view.rev, prefix) - val commonSink = BlazegraphSink(blazeClient, blazeBatchConfig, common) + val commonSink = BlazegraphSink(blazeClient, cfg.blazegraphBatch, common) val createCommon = blazeClient.createNamespace(common) val deleteCommon = blazeClient.deleteNamespace(common) - // Create the Blazegraph sink - def createBlazeSink(namespace: String): SparqlProjection => Sink = { target => - val blazeSink = BlazegraphSink(blazeClient, blazeBatchConfig, namespace) - new CompositeSink( - QueryGraph(blazeClient, common, target.query), - GraphResourceToNTriples.graphToNTriples, - blazeSink.apply, - blazeBatchConfig.maxElements, - blazeBatchConfig.maxInterval - ) - } - - // Create the Elasticsearch index - def createEsSink(index: IndexLabel): ElasticSearchProjection => Sink = { target => - val esSink = - ElasticSearchSink.states(esClient, esBatchConfig.maxElements, esBatchConfig.maxInterval, index, Refresh.False) - new CompositeSink( - QueryGraph(blazeClient, common, target.query), - new GraphResourceToDocument(target.context, target.includeContext).graphToDocument, - esSink.apply, - esBatchConfig.maxElements, - esBatchConfig.maxInterval - ) - } + // Create sinks + def createBlazeSink(namespace: String): SparqlProjection => Sink = + CompositeSink.blazeSink(blazeClient, namespace, common, cfg) + def createEsSink(index: IndexLabel): ElasticSearchProjection => Sink = + CompositeSink.elasticSink(blazeClient, esClient, index, common, cfg) // Compute the init and destroy operations as well as the sink for the different projections of the composite views val start: (Task[Unit], Task[Unit], Map[Iri, Sink]) = (createCommon.void, deleteCommon.void, Map.empty[Iri, Sink]) diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/QueryGraph.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/SingleQueryGraph.scala similarity index 85% rename from delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/QueryGraph.scala rename to delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/SingleQueryGraph.scala index 6ccc375134..65a4b3fda3 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/QueryGraph.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/SingleQueryGraph.scala @@ -13,17 +13,18 @@ import monix.bio.Task import java.util.regex.Pattern.quote /** - * Pipe that performs the provided query for the incoming resource and replaces the graph with the result of query + * Provides a way to query for the incoming resource and replaces the graph with the result of query + * * @param client - * the blazegraph client + * the blazegraph client used to query * @param namespace * the namespace to query * @param query * the query to perform on each resource */ -final case class QueryGraph(client: BlazegraphClient, namespace: String, query: SparqlConstructQuery) { +final class SingleQueryGraph(client: BlazegraphClient, namespace: String, query: SparqlConstructQuery) { - private val logger: Logger = Logger[QueryGraph] + private val logger: Logger = Logger[SingleQueryGraph] private def newGraph(ntriples: NTriples, id: Iri): Task[Option[Graph]] = if (ntriples.isEmpty) { diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsFixture.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsFixture.scala index e5f872f7cf..1efa8ab7f8 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsFixture.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViewsFixture.scala @@ -5,7 +5,7 @@ import cats.data.NonEmptySet import ch.epfl.bluebrain.nexus.delta.kernel.Secret import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig -import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.{BlazegraphAccess, RemoteSourceClientConfig, SourcesConfig} +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.{BlazegraphAccess, RemoteSourceClientConfig, SinkConfig, SourcesConfig} import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeView.Interval import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection} import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjectionFields.{ElasticSearchProjectionFields, SparqlProjectionFields} @@ -178,7 +178,8 @@ trait CompositeViewsFixture extends ConfigFixtures with EitherValuable { batchConfig, batchConfig, 3.seconds, - false + false, + SinkConfig.Batch ) } diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala index 721e14bc7f..a6fc5c3a75 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala @@ -2,16 +2,20 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing import akka.http.scaladsl.model.Uri import akka.http.scaladsl.model.Uri.Query +import cats.Semigroup import cats.data.{NonEmptyList, NonEmptySet} import cats.effect.Resource import cats.effect.concurrent.Ref -import cats.kernel.Semigroup import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphClientSetup import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.SparqlNTriples -import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeIndexingSuite.{batchConfig, Album, Band, Music} +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig.SinkConfig +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeIndexingSuite._ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.Queries.{batchQuery, singleQuery} import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection} import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.{CrossProjectSource, ProjectSource, RemoteProjectSource} import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.{permissions, CompositeView, CompositeViewSource, CompositeViewValue} @@ -19,20 +23,20 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.projections.Composit import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.store.CompositeRestartStore import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeBranch.Run.{Main, Rebuild} import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.{CompositeBranch, CompositeGraphStream, CompositeProgress} -import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.{CompositeViews, Fixtures} +import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.{CompositeViews, CompositeViewsFixture, Fixtures} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel, QueryBuilder} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.{nxv, rdf, rdfs, schemas} import ch.epfl.bluebrain.nexus.delta.rdf.graph.Graph -import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue.ContextObject import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery +import ch.epfl.bluebrain.nexus.delta.rdf.syntax.{iriStringContextSyntax, jsonOpsSyntax} import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri -import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{Sort, SortList} import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef import ch.epfl.bluebrain.nexus.delta.sourcing.config.{BatchConfig, QueryConfig} @@ -40,15 +44,15 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.{DiscardMetadata, FilterByType, FilterDeprecated} +import ch.epfl.bluebrain.nexus.testkit.TestHelpers import ch.epfl.bluebrain.nexus.testkit.bio.ResourceFixture.TaskFixture -import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, JsonAssertions, PatienceConfig, ResourceFixture, TextAssertions} -import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie -import ch.epfl.bluebrain.nexus.testkit.{IOFixedClock, TestHelpers} +import ch.epfl.bluebrain.nexus.testkit.bio._ import fs2.Stream import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredEncoder @@ -56,27 +60,68 @@ import io.circe.generic.semiauto.deriveEncoder import io.circe.syntax._ import io.circe.{Encoder, Json} import monix.bio.{Task, UIO} -import monix.execution.Scheduler import munit.AnyFixture import java.time.Instant import java.util.UUID -import scala.concurrent.duration._ +import scala.concurrent.duration.DurationInt + +class SingleCompositeIndexingSuite extends CompositeIndexingSuite(SinkConfig.Single, singleQuery) +class BatchCompositeIndexingSuite extends CompositeIndexingSuite(SinkConfig.Batch, batchQuery) + +trait CompositeIndexingFixture extends BioSuite { + + implicit private val baseUri: BaseUri = BaseUri("http://localhost", Label.unsafe("v1")) + implicit private val rcr: RemoteContextResolution = RemoteContextResolution.never + + private val queryConfig = QueryConfig(10, RefreshStrategy.Delay(10.millis)) + val batchConfig = BatchConfig(2, 50.millis) + private val compositeConfig = + CompositeViewsFixture.config.copy( + blazegraphBatch = batchConfig, + elasticsearchBatch = batchConfig + ) + + type Result = (ElasticSearchClient, BlazegraphClient, CompositeProjections, CompositeSpaces.Builder) + + private def resource(sinkConfig: SinkConfig): Resource[Task, Result] = { + (Doobie.resource(), ElasticSearchClientSetup.resource(), BlazegraphClientSetup.resource()).parMapN { + case (xas, esClient, bgClient) => + val compositeRestartStore = new CompositeRestartStore(xas) + val projections = + CompositeProjections(compositeRestartStore, xas, queryConfig, batchConfig, 3.seconds) + val spacesBuilder = + CompositeSpaces.Builder("delta", esClient, bgClient, compositeConfig.copy(sinkConfig = sinkConfig))( + baseUri, + rcr + ) + (esClient, bgClient, projections, spacesBuilder) + } + } + + def suiteLocalFixture(name: String, sinkConfig: SinkConfig): TaskFixture[Result] = + ResourceFixture.suiteLocal(name, resource(sinkConfig)) + + def compositeIndexing(sinkConfig: SinkConfig): ResourceFixture.TaskFixture[Result] = + suiteLocalFixture("compositeIndexing", sinkConfig) + +} -class CompositeIndexingSuite - extends BioSuite - with CompositeIndexingSuite.Fixture +abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConstructQuery) + extends CompositeIndexingFixture with TestHelpers with Fixtures with JsonAssertions with TextAssertions { - override def munitFixtures: Seq[AnyFixture[_]] = List(compositeIndexing) + private val fixture = compositeIndexing(sinkConfig) + + override def munitFixtures: Seq[AnyFixture[_]] = List(fixture) implicit private val patienceConfig: PatienceConfig = PatienceConfig(10.seconds, 100.millis) private val prefix = "delta" - private lazy val (esClient, bgClient, projections, spacesBuilder) = compositeIndexing() + private lazy val (esClient, bgClient, projections, spacesBuilder) = fixture() // Data to index private val museId = iri"http://music.com/muse" @@ -95,27 +140,6 @@ class CompositeIndexingSuite private val theGatewayId = iri"http://music.com/the_getaway" private val theGateway = Album(theGatewayId, "The Getaway", redHotId) - private val query = SparqlConstructQuery.unsafe( - """ - |prefix music: - |CONSTRUCT { - | {resource_id} music:name ?bandName ; - | music:genre ?bandGenre ; - | music:start ?bandStartYear ; - | music:album ?albumId . - | ?albumId music:title ?albumTitle . - |} WHERE { - | {resource_id} music:name ?bandName ; - | music:start ?bandStartYear; - | music:genre ?bandGenre . - | OPTIONAL { - | {resource_id} ^music:by ?albumId . - | ?albumId music:title ?albumTitle . - | } - |} - |""".stripMargin - ) - private val project1 = ProjectRef.unsafe("org", "proj") private val project2 = ProjectRef.unsafe("org", "proj2") private val project3 = ProjectRef.unsafe("org", "proj3") @@ -180,7 +204,8 @@ class CompositeIndexingSuite private val mainCompleted = Ref.unsafe[Task, Map[ProjectRef, Int]](Map.empty) private val rebuildCompleted = Ref.unsafe[Task, Map[ProjectRef, Int]](Map.empty) - private def resetCompleted = mainCompleted.set(Map.empty) >> rebuildCompleted.set(Map.empty) + private def resetCompleted = mainCompleted.set(Map.empty) >> rebuildCompleted.set(Map.empty) + private def increment(map: Ref[Task, Map[ProjectRef, Int]], project: ProjectRef) = map.update(_.updatedWith(project)(_.map(_ + 1).orElse(Some(1)))) @@ -529,41 +554,17 @@ class CompositeIndexingSuite } yield () } - private val checkQuery = SparqlConstructQuery.unsafe("CONSTRUCT {?s ?p ?o} WHERE {?s ?p ?o} ORDER BY ?s") + private val checkQuery = SparqlConstructQuery.unsafe("CONSTRUCT {?s ?p ?o} WHERE {?s ?p ?o} ORDER BY ?s") + private def checkBlazegraphTriples(namespace: String, expected: String) = bgClient .query(Set(namespace), checkQuery, SparqlNTriples) .map(_.value.toString) .map(_.equalLinesUnordered(expected)) -} - -object CompositeIndexingSuite extends IOFixedClock { - - implicit private val baseUri: BaseUri = BaseUri("http://localhost", Label.unsafe("v1")) - implicit private val rcr: RemoteContextResolution = RemoteContextResolution.never - - private val queryConfig: QueryConfig = QueryConfig(10, RefreshStrategy.Delay(10.millis)) - val batchConfig: BatchConfig = BatchConfig(2, 50.millis) - type Result = (ElasticSearchClient, BlazegraphClient, CompositeProjections, CompositeSpaces.Builder) - - private def resource()(implicit s: Scheduler, cl: ClassLoader): Resource[Task, Result] = { - (Doobie.resource(), ElasticSearchClientSetup.resource(), BlazegraphClientSetup.resource()).parMapN { - case (xas, esClient, bgClient) => - val compositeRestartStore = new CompositeRestartStore(xas) - val projections = - CompositeProjections(compositeRestartStore, xas, queryConfig, batchConfig, 3.seconds) - val spacesBuilder = CompositeSpaces.Builder("delta", esClient, batchConfig, bgClient, batchConfig)(baseUri, rcr) - (esClient, bgClient, projections, spacesBuilder) - } - } - - def suiteLocalFixture(name: String)(implicit s: Scheduler, cl: ClassLoader): TaskFixture[Result] = - ResourceFixture.suiteLocal(name, resource()) +} - trait Fixture { self: BioSuite => - val compositeIndexing: ResourceFixture.TaskFixture[Result] = suiteLocalFixture("compositeIndexing") - } +object CompositeIndexingSuite { private val ctxIri = ContextValue(iri"http://music.com/context") @@ -576,7 +577,9 @@ object CompositeIndexingSuite extends IOFixedClock { sealed trait Music extends Product with Serializable { def id: Iri + def tpe: Iri + def label: String } @@ -584,16 +587,19 @@ object CompositeIndexingSuite extends IOFixedClock { override val tpe: Iri = iri"http://music.com/Band" override val label: String = name } + object Band { implicit val bandEncoder: Encoder.AsObject[Band] = deriveConfiguredEncoder[Band].mapJsonObject(_.add("@type", "Band".asJson)) implicit val bandJsonLdEncoder: JsonLdEncoder[Band] = JsonLdEncoder.computeFromCirce((b: Band) => b.id, ctxIri) } - final case class Album(id: Iri, title: String, by: Iri) extends Music { + + final case class Album(id: Iri, title: String, by: Iri) extends Music { override val tpe: Iri = iri"http://music.com/Album" override val label: String = title } + object Album { implicit val albumEncoder: Encoder.AsObject[Album] = deriveConfiguredEncoder[Album].mapJsonObject(_.add("@type", "Album".asJson)) @@ -602,6 +608,7 @@ object CompositeIndexingSuite extends IOFixedClock { } final case class Metadata(uuid: UUID) + object Metadata { implicit private val encoderMetadata: Encoder.AsObject[Metadata] = deriveEncoder implicit val jsonLdEncoderMetadata: JsonLdEncoder[Metadata] = JsonLdEncoder.computeFromCirce(ctxIri) @@ -609,3 +616,52 @@ object CompositeIndexingSuite extends IOFixedClock { } } + +object Queries { + val batchQuery: SparqlConstructQuery = SparqlConstructQuery.unsafe( + """ + |prefix music: + |CONSTRUCT { + | ?alias music:name ?bandName ; + | music:genre ?bandGenre ; + | music:start ?bandStartYear ; + | music:album ?albumId . + | ?albumId music:title ?albumTitle . + |} WHERE { + | VALUES ?id { {resource_id} } . + | BIND( IRI(concat(str(?id), '/', 'alias')) AS ?alias ) . + | + | ?id music:name ?bandName ; + | music:start ?bandStartYear; + | music:genre ?bandGenre . + | OPTIONAL { + | ?id ^music:by ?albumId . + | ?albumId music:title ?albumTitle . + | } + |} + |""".stripMargin + ) + + val singleQuery: SparqlConstructQuery = SparqlConstructQuery.unsafe( + """ + |prefix music: + |CONSTRUCT { + | ?id music:name ?bandName ; + | music:genre ?bandGenre ; + | music:start ?bandStartYear ; + | music:album ?albumId . + | ?albumId music:title ?albumTitle . + |} WHERE { + | BIND( {resource_id} AS ?id ) . + | + | ?id music:name ?bandName ; + | music:start ?bandStartYear; + | music:genre ?bandGenre . + | OPTIONAL { + | ?id ^music:by ?albumId . + | ?albumId music:title ?albumTitle . + | } + |} + |""".stripMargin + ) +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala index 56060ec34e..63d05219e1 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala @@ -78,6 +78,13 @@ sealed trait Elem[+A] extends Product with Serializable { */ def dropped: DroppedElem = DroppedElem(tpe, id, project, instant, offset, rev) + /** Action of dropping an Elem */ + def drop: Elem[Nothing] = this match { + case e: SuccessElem[A] => e.dropped + case e: FailedElem => e + case e: DroppedElem => e + } + /** * Maps the underlying element value if this is a [[Elem.SuccessElem]] using f. * @param f diff --git a/tests/docker/config/construct-query.sparql b/tests/docker/config/construct-query.sparql index 77ba1b1b34..f7069580e9 100644 --- a/tests/docker/config/construct-query.sparql +++ b/tests/docker/config/construct-query.sparql @@ -10,7 +10,7 @@ prefix : CONSTRUCT { # Metadata - ?id a ?type ; + ?alias a ?type ; :name ?name ; :description ?description ; :createdAt ?createdAt ; @@ -20,68 +20,68 @@ CONSTRUCT { :deprecated ?deprecated ; :self ?self . - ?id :project ?projectBN . - ?projectBN :identifier ?projectId ; + ?alias :project ?projectId . + ?projectId :identifier ?projectId ; :label ?projectLabel . # Common properties ## Annotations - ?id :mType ?mType . + ?alias :mType ?mType . ?mType :identifier ?mType ; :label ?mTypeLabel ; :idLabel ?mTypeIdLabel . - ?id :eType ?eType . + ?alias :eType ?eType . ?eType :identifier ?eType ; :label ?eTypeLabel ; :idLabel ?eTypeIdLabel . - ?id :sType ?sType . + ?alias :sType ?sType . ?sType :identifier ?sType ; :label ?sTypeLabel ; :idLabel ?sTypeIdLabel . ## Atlas - ?id :coordinatesInBrainAtlas ?coordinates . + ?alias :coordinatesInBrainAtlas ?coordinates . ?coordinates :valueX ?valueX ; :valueY ?valueY ; :valueZ ?valueZ . ## Brain region / layer - ?id :brainRegion ?brainRegionId . - ?brainRegionId :identifier ?brainRegionId ; - :label ?brainRegionLabel ; - :idLabel ?brainRegionIdLabel . - ?id :layer ?layerId . + ?alias :brainRegion ?brainRegionId . + ?brainRegionId :identifier ?brainRegionId ; + :label ?brainRegionLabel ; + :idLabel ?brainRegionIdLabel . + ?alias :layer ?layerId . ?layerId :identifier ?layerId ; :label ?layerLabel ; :idLabel ?layerIdLabel . ## Contributors / Organizations - ?id :contributors ?personId . + ?alias :contributors ?personId . ?personId :identifier ?personId ; :label ?personName ; :idLabel ?personIdName ; :affiliation ?affiliation . ?affiliation :label ?affiliationName . - ?id :organizations ?orgId . + ?alias :organizations ?orgId . ?orgId :identifier ?orgId ; :label ?organizationName ; :idLabel ?organizationIdName . ## Derivation - ?id :derivation ?derivation . + ?alias :derivation ?derivation . ?derivation :identifier ?entity ; rdf:type ?entityType ; :label ?entityName . ## Distribution - ?id :distribution ?distribution . + ?alias :distribution ?distribution . ?distribution :label ?distributionName ; :encodingFormat ?distributionEncodingFormat ; :contentUrl ?distributionContentUrl ; :contentSize ?distributionContentSize . ## Generation - ?id :generation ?generation . + ?alias :generation ?generation . ?generation :protocol ?protocol ; :startedAt ?generationStartedAtTime ; :endedAt ?generationEndedAtTime . @@ -90,25 +90,25 @@ CONSTRUCT { :value ?protocolValue . ## Images - ?id :image ?image . + ?alias :image ?image . ?image :identifier ?image ; :about ?imageAbout ; :repetition ?imageRepetition ; :stimulusType ?imageStimulusTypeLabel . ## License - ?id :license ?licenseBN . - ?licenseBN :identifier ?licenseId ; + ?alias :license ?licenseId . + ?licenseId :identifier ?licenseId ; :label ?licenseLabel . ## Series - ?id :series ?series . + ?alias :series ?series . ?series :value ?seriesValue ; :unit ?seriesUnit ; :statistic ?seriesStatistic . ## Source - ?id :source ?source . + ?alias :source ?source . ?source :title ?sourceTitle ; rdf:type ?sourceType ; :identifier ?sourceIdentifier . @@ -116,79 +116,80 @@ CONSTRUCT { :value ?sourceIdentifierValue . ## Species - ?id :subjectSpecies ?species . + ?alias :subjectSpecies ?species . ?species :identifier ?species ; :label ?speciesLabel . ## Start / end / status - ?id :startedAt ?startedAt ; + ?alias :startedAt ?startedAt ; :endedAt ?endedAt ; :status ?status . ## Subject - ?id :subjectAge ?age . - ?age :value ?subjectAgeValue ; - :minValue ?subjectAgeMinValue ; - :maxValue ?subjectAgeMaxValue ; - :unit ?subjectAgeUnit ; - :period ?subjectAgePeriod ; - :label ?subjectAgeLabel . - ?id :subjectWeight ?subjectWeightBN . - ?subjectWeightBN :value ?subjectWeightValue ; - :unit ?subjectWeightUnit ; - :minValue ?subjectWeightMinValue ; - :maxValue ?subjectWeightMaxValue ; - :label ?subjectWeightLabel . + ?alias :subjectAge ?age . + ?age :value ?subjectAgeValue ; + :minValue ?subjectAgeMinValue ; + :maxValue ?subjectAgeMaxValue ; + :unit ?subjectAgeUnit ; + :period ?subjectAgePeriod ; + :label ?subjectAgeLabel . + ?alias :subjectWeight ?weight . + ?weight :value ?subjectWeightValue ; + :unit ?subjectWeightUnit ; + :minValue ?subjectWeightMinValue ; + :maxValue ?subjectWeightMaxValue ; + :label ?subjectWeightLabel . # Properties specific to types ## Bouton density - ?id :boutonDensity ?boutonDensityBN . - ?boutonDensityBN :value ?boutonDensityValue ; + ?alias :boutonDensity ?boutonDensityBN . + ?boutonDensityBN :value ?boutonDensityValue ; :unit ?boutonDensityUnit ; :label ?boutonDensityLabel . ## Circuit - ?id :circuitType ?circuitType ; + ?alias :circuitType ?circuitType ; :circuitBase ?circuitBaseUrlStr ; :circuitConfigPath ?circuitConfigPathUrlStr ; - :circuitBrainRegion ?circuitBrainRegionBN . - ?circuitBrainRegionBN :identifier ?circuitBrainRegionId ; + :circuitBrainRegion ?circuitBrainRegionId . + ?circuitBrainRegionId :identifier ?circuitBrainRegionId ; :label ?circuitBrainRegionLabel . ## Layer thickness - ?id :layerThickness ?thicknessBN . - ?thicknessBN :value ?thicknessValue ; + ?alias :layerThickness ?thicknessBN . + ?thicknessBN :value ?thicknessValue ; :unit ?thicknessUnit ; :nValue ?thicknessNValue ; :label ?thicknessLabel . ## Neuron density - ?id :neuronDensity ?neuronDensityBN . - ?neuronDensityBN :value ?neuronDensityValue ; + ?alias :neuronDensity ?neuronDensityBN . + ?neuronDensityBN :value ?neuronDensityValue ; :unit ?neuronDensityUnit ; :nValue ?neuronDensityNValue ; :label ?neuronDensityLabel . ## Simulation campaigns - ?id :config ?campaignConfigId . + ?alias :config ?campaignConfigId . ?campaignConfigId :identifier ?campaignConfigId ; :name ?campaignConfigName . ## Simulation campaigns / simulations - ?id :parameter ?parameter . + ?alias :parameter ?parameter . ?parameter :attrs ?attrs . ?attrs ?attrs_prop ?attrs_value . - ?id :parameter ?parameter . + ?alias :parameter ?parameter . ?parameter :coords ?coords . ?coords ?coords_prop ?coords_value . ## Simulations - ?id :campaign ?campaignId . + ?alias :campaign ?campaignId . ?campaignId :identifier ?campaignId ; :name ?campaignName . } WHERE { - BIND({resource_id} as ?id) . + VALUES ?id { {resource_id} } . + BIND( IRI(concat(str(?id), '/', 'alias')) AS ?alias ) . ?id a ?type . @@ -211,7 +212,6 @@ CONSTRUCT { nxv:self ?self ; nxv:project ?projectId . BIND( STRAFTER(STR(?projectId), "/v1/projects/") as ?projectLabel ) . - BIND( BNODE() AS ?projectBN ) . # We read the following nodes as text. This is done in order to avoid conflict # when another triple uses the same @id. For instance, createdBy and updatedBy @@ -401,10 +401,7 @@ CONSTRUCT { # License OPTIONAL { ?id schema:license ?licenseId . - OPTIONAL { - ?licenseId schema:name ?licenseLabel . - } . - BIND( BNODE() AS ?licenseBN ) . + OPTIONAL { ?licenseId schema:name ?licenseLabel . } . } . # Series @@ -469,7 +466,6 @@ CONSTRUCT { CONCAT(STR(?subjectWeightMinValue), " to ", STR(?subjectWeightMaxValue), " ", STR(?subjectWeightUnit)) ) as ?subjectWeightLabel ) . - BIND( BNODE() as ?subjectWeightBN ) . } . # Bouton density @@ -485,7 +481,7 @@ CONSTRUCT { STR(?boutonDensityUnit) ) as ?boutonDensityLabel ) . - BIND( BNODE() as ?boutonDensityBN ) . + BIND( BNODE(CONCAT(STR(?id), '/boutonDensity')) as ?boutonDensityBN ) . } . # Circuit @@ -524,12 +520,12 @@ CONSTRUCT { OPTIONAL { ?id a nsg:LayerThickness ; nsg:series ?meanSeries . - ?meanSeries nsg:statistic "mean" ; - schema:value ?thicknessValue ; - schema:unitCode ?thicknessUnit . - ?id nsg:series ?nSeries . - ?nSeries nsg:statistic "N" ; - schema:value ?thicknessNValue . + ?meanSeries nsg:statistic "mean" ; + schema:value ?thicknessValue ; + schema:unitCode ?thicknessUnit . + ?id nsg:series ?nSeries . + ?nSeries nsg:statistic "N" ; + schema:value ?thicknessNValue . BIND( CONCAT( STR(?thicknessValue), " ", @@ -537,7 +533,8 @@ CONSTRUCT { STR(?thicknessNValue), ")" ) as ?thicknessLabel ) . - BIND( BNODE() as ?thicknessBN ) . + + BIND( BNODE(CONCAT(STR(?id), '/layerThickness')) as ?thicknessBN ) . } . # Neuron density @@ -558,7 +555,8 @@ CONSTRUCT { STR(?neuronDensityNValue), ")" ) as ?neuronDensityLabel ) . - BIND( BNODE() as ?neuronDensityBN ) . + + BIND( BNODE((CONCAT(STR(?id), '/neuronDensity'))) as ?neuronDensityBN ) . } . # Simulation campaign configuration @@ -585,7 +583,6 @@ CONSTRUCT { OPTIONAL { ?circuitBrainRegionId rdfs:label ?circuitBrainRegionLabel . } - BIND( BNODE() AS ?circuitBrainRegionBN ) . } . } . } . diff --git a/tests/docker/config/delta-postgres.conf b/tests/docker/config/delta-postgres.conf index b27dded274..e5fe26cecd 100644 --- a/tests/docker/config/delta-postgres.conf +++ b/tests/docker/config/delta-postgres.conf @@ -55,6 +55,7 @@ plugins { composite-views { min-interval-rebuild = 5 seconds + sink-config = batch } elasticsearch { diff --git a/tests/src/test/resources/kg/views/composite/composite-view-include-context.json b/tests/src/test/resources/kg/views/composite/composite-view-include-context.json index 6afe1aeafe..6181f5546f 100644 --- a/tests/src/test/resources/kg/views/composite/composite-view-include-context.json +++ b/tests/src/test/resources/kg/views/composite/composite-view-include-context.json @@ -80,7 +80,7 @@ }, "dynamic": false }, - "query": "prefix music: prefix nxv: CONSTRUCT { {resource_id} music:name ?bandName ; music:genre ?bandGenre ; music:album ?albumId . ?albumId music:released ?albumReleaseDate ; music:song ?songId . ?songId music:title ?songTitle ; music:number ?songNumber ; music:length ?songLength } WHERE { {resource_id} music:name ?bandName ; music:genre ?bandGenre . OPTIONAL { {resource_id} ^music:by ?albumId . ?albumId music:released ?albumReleaseDate . OPTIONAL {?albumId ^music:on ?songId . ?songId music:title ?songTitle ; music:number ?songNumber ; music:length ?songLength } } } ORDER BY(?songNumber)", + "query": "{{bandQuery}}", "context": { "@base": "https://music.example.com/", "@vocab": "https://music.example.com/" @@ -121,7 +121,7 @@ }, "dynamic": false }, - "query": "prefix xsd: prefix music: prefix nxv: CONSTRUCT { {resource_id} music:name ?albumTitle ; music:length ?albumLength ; music:numberOfSongs ?numberOfSongs } WHERE {SELECT ?albumReleaseDate ?albumTitle (sum(xsd:integer(?songLength)) as ?albumLength) (count(?albumReleaseDate) as ?numberOfSongs) WHERE {OPTIONAL { {resource_id} ^music:on / music:length ?songLength } {resource_id} music:released ?albumReleaseDate ; music:title ?albumTitle . } GROUP BY ?albumReleaseDate ?albumTitle }", + "query": "{{albumQuery}}", "context": { "@base": "https://music.example.com/", "@vocab": "https://music.example.com/" diff --git a/tests/src/test/resources/kg/views/composite/composite-view.json b/tests/src/test/resources/kg/views/composite/composite-view.json index d9da4e0983..58a1e0b9b8 100644 --- a/tests/src/test/resources/kg/views/composite/composite-view.json +++ b/tests/src/test/resources/kg/views/composite/composite-view.json @@ -80,7 +80,7 @@ }, "dynamic": false }, - "query": "prefix music: prefix nxv: CONSTRUCT { {resource_id} music:name ?bandName ; music:genre ?bandGenre ; music:album ?albumId . ?albumId music:released ?albumReleaseDate ; music:song ?songId . ?songId music:title ?songTitle ; music:number ?songNumber ; music:length ?songLength } WHERE { {resource_id} music:name ?bandName ; music:genre ?bandGenre . OPTIONAL { {resource_id} ^music:by ?albumId . ?albumId music:released ?albumReleaseDate . OPTIONAL {?albumId ^music:on ?songId . ?songId music:title ?songTitle ; music:number ?songNumber ; music:length ?songLength } } } ORDER BY(?songNumber)", + "query": "{{bandQuery}}", "context": { "@base": "https://music.example.com/", "@vocab": "https://music.example.com/" @@ -120,7 +120,7 @@ }, "dynamic": false }, - "query": "prefix xsd: prefix music: prefix nxv: CONSTRUCT { {resource_id} music:name ?albumTitle ; music:length ?albumLength ; music:numberOfSongs ?numberOfSongs } WHERE {SELECT ?albumReleaseDate ?albumTitle (sum(xsd:integer(?songLength)) as ?albumLength) (count(?albumReleaseDate) as ?numberOfSongs) WHERE {OPTIONAL { {resource_id} ^music:on / music:length ?songLength } {resource_id} music:released ?albumReleaseDate ; music:title ?albumTitle . } GROUP BY ?albumReleaseDate ?albumTitle }", + "query": "{{albumQuery}}", "context": { "@base": "https://music.example.com/", "@vocab": "https://music.example.com/" diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsSpec.scala index b14a4bfbcc..32bd61d04c 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/CompositeViewsSpec.scala @@ -7,11 +7,13 @@ import ch.epfl.bluebrain.nexus.tests.HttpClient._ import ch.epfl.bluebrain.nexus.tests.Identity.compositeviews.Jerry import ch.epfl.bluebrain.nexus.tests.Optics._ import ch.epfl.bluebrain.nexus.tests.iam.types.Permission.{Events, Organizations, Views} +import ch.epfl.bluebrain.nexus.tests.kg.CompositeViewsSpec.{albumQuery, bandQuery} import com.typesafe.scalalogging.Logger import io.circe.Json import io.circe.optics.JsonPath._ import monix.bio.Task import monix.execution.Scheduler.Implicits.global + import scala.concurrent.duration._ class CompositeViewsSpec extends BaseSpec { @@ -136,7 +138,9 @@ class CompositeViewsSpec extends BaseSpec { "org" -> orgId, "org2" -> orgId, "remoteEndpoint" -> "http://delta:8080/v1", - "token" -> jerryToken + "token" -> jerryToken, + "bandQuery" -> bandQuery, + "albumQuery" -> albumQuery ): _* ) @@ -166,7 +170,9 @@ class CompositeViewsSpec extends BaseSpec { "org" -> orgId, "org2" -> orgId, "remoteEndpoint" -> "http://delta:8080/v1/other", - "token" -> jerryToken + "token" -> jerryToken, + "bandQuery" -> bandQuery, + "albumQuery" -> albumQuery ): _* ) @@ -183,7 +189,9 @@ class CompositeViewsSpec extends BaseSpec { "org" -> orgId, "org2" -> orgId, "remoteEndpoint" -> "http://delta:8080/v1", - "token" -> s"${jerryToken}wrong" + "token" -> s"${jerryToken}wrong", + "bandQuery" -> bandQuery, + "albumQuery" -> albumQuery ): _* ) @@ -207,7 +215,9 @@ class CompositeViewsSpec extends BaseSpec { "org" -> orgId, "org2" -> orgId, "remoteEndpoint" -> "http://fail.does.not.exist.at.all.asndkajbskhabsdfjhabsdfjkh/v1", - "token" -> jerryToken + "token" -> jerryToken, + "bandQuery" -> bandQuery, + "albumQuery" -> albumQuery ): _* ) @@ -312,7 +322,9 @@ class CompositeViewsSpec extends BaseSpec { "org" -> orgId, "org2" -> orgId, "remoteEndpoint" -> "http://delta:8080/v1", - "token" -> jerryToken + "token" -> jerryToken, + "bandQuery" -> bandQuery, + "albumQuery" -> albumQuery ): _* ) @@ -390,4 +402,75 @@ class CompositeViewsSpec extends BaseSpec { } } } + +} + +object CompositeViewsSpec { + + private val bandQuery = + raw""" + |PREFIX nxv: + |PREFIX music: + | + |CONSTRUCT + | { + | ?alias music:name ?bandName ; + | music:genre ?bandGenre ; + | music:album ?albumId . + | ?albumId music:released ?albumReleaseDate ; + | music:song ?songId . + | ?songId music:title ?songTitle ; + | music:number ?songNumber ; + | music:length ?songLength . + | } + |WHERE + | { VALUES ?id { {resource_id} } + | BIND(IRI(concat(str(?id), '/alias')) AS ?alias) + | + | ?id music:name ?bandName ; + | music:genre ?bandGenre + | + | OPTIONAL + | { ?id ^music:by ?albumId . + | ?albumId music:released ?albumReleaseDate + | OPTIONAL + | { ?albumId ^music:on ?songId . + | ?songId music:title ?songTitle ; + | music:number ?songNumber ; + | music:length ?songLength + | } + | } + | } + |ORDER BY ?songNumber + |""".stripMargin + .replaceAll("\\n", " ") + + private val albumQuery = + raw""" + |PREFIX xsd: + |PREFIX music: + |PREFIX nxv: + | + |CONSTRUCT + | { + | ?alias music:name ?albumTitle ; + | music:length ?albumLength ; + | music:numberOfSongs ?numberOfSongs . + | } + |WHERE + | { { SELECT ?id ?albumReleaseDate ?albumTitle (SUM(xsd:integer(?songLength)) AS ?albumLength) (COUNT(?albumReleaseDate) AS ?numberOfSongs) + | WHERE + | { VALUES ?id { {resource_id} } . + | OPTIONAL + | { ?id ^music:on/music:length ?songLength } + | ?id music:released ?albumReleaseDate ; + | music:title ?albumTitle . + | } + | GROUP BY ?id ?albumReleaseDate ?albumTitle + | } + | BIND(IRI(concat(str(?id), '/alias')) AS ?alias) + | } + |""".stripMargin + .replaceAll("\\n", " ") + } diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigSpec.scala index 57ba062754..aaab7349c9 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigSpec.scala @@ -89,6 +89,7 @@ class SearchConfigSpec extends BaseSpec { json""" { "project" : { + "@id": "http://delta:8080/v1/projects/$id1", "identifier" : "http://delta:8080/v1/projects/$id1", "label" : "$id1" } @@ -106,6 +107,7 @@ class SearchConfigSpec extends BaseSpec { json""" { "license" : { + "@id" : "https://bbp.epfl.ch/neurosciencegraph/data/licenses/97521f71-605d-4f42-8f1b-c37e742a30bf", "identifier" : "https://bbp.epfl.ch/neurosciencegraph/data/licenses/97521f71-605d-4f42-8f1b-c37e742a30bf", "label" : "SSCX Portal Data Licence final v1.0" }