Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch composite sink #4105

Merged
merged 31 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4133644
Initial batch BG sink
olivergrabinski Jul 24, 2023
5142531
Remove custom blank nodes from construct-query.sparql
olivergrabinski Jul 24, 2023
51bc480
Adapt tests to search query changes
olivergrabinski Jul 25, 2023
d98c0c7
Simplify NewCompositeSink
olivergrabinski Jul 25, 2023
8e73fe3
Update query in CompositeIndexingSuite
olivergrabinski Jul 25, 2023
0dc80ff
Failed elems shouldn't be mapped to dropped. Chunks of dropped elems …
olivergrabinski Jul 26, 2023
c124d21
Clean imports
olivergrabinski Jul 26, 2023
957d9e5
Update query in integration test
olivergrabinski Jul 26, 2023
5c62f59
Resolve issues with BatchCompositeSink
olivergrabinski Jul 28, 2023
0089173
Update CompositeIndexingSuite to work with new sink
olivergrabinski Jul 28, 2023
ce6005b
Update the multi-construct-query.sparql to use an alias
olivergrabinski Jul 28, 2023
bd6f044
Update queries in test resources
olivergrabinski Jul 28, 2023
8c5ea75
Add `drop` method on Elem
olivergrabinski Jul 31, 2023
cb16dae
Rename `NewQueryGraph` to `BatchQueryGraph`
olivergrabinski Jul 31, 2023
44bcd91
Fix neuronDensity and layerThickness in the mutli id case
olivergrabinski Jul 31, 2023
f6e4f62
Further "series" related fixes
olivergrabinski Jul 31, 2023
7834fb1
Update sparql queries in integration tests
olivergrabinski Jul 31, 2023
43a5e03
Group composite sinks
olivergrabinski Jul 31, 2023
ab6c2bd
Change BatchQueryGraph signature
olivergrabinski Jul 31, 2023
d704151
Make the choice of sink configurable
olivergrabinski Aug 2, 2023
a66c076
Remove "legacy" sparql query
olivergrabinski Aug 2, 2023
e1b8031
Add default sink-config
olivergrabinski Aug 2, 2023
e18b188
Template `query` fields inside the composite-view payloads
olivergrabinski Aug 3, 2023
81930ce
The sink is not legacy
olivergrabinski Aug 3, 2023
75f28b1
Reflect sinkConfig change in composite config
olivergrabinski Aug 3, 2023
3053b0b
Refactor CompositeIndexingSuite so that it can run for both Single an…
olivergrabinski Aug 3, 2023
d7181fe
Try out for comprehension for clarity
olivergrabinski Aug 3, 2023
3c05a09
Add missing bracket
olivergrabinski Aug 3, 2023
1a9229e
Remove unused QueryGraph trait
olivergrabinski Aug 3, 2023
8774311
Split QueryGraph
olivergrabinski Aug 3, 2023
a54c44e
Merge branch 'master' into 3890-batch-sink
olivergrabinski Aug 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,7 @@ plugins.composite-views {

# set to false to disable composite view indexing
indexing-enabled = ${app.defaults.indexing.enable}

# type of composite sink to use for composite view indexing
sink-config = single
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews

import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.QueryGraph
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink, GraphResourceToNTriples}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig.SinkConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.{BatchQueryGraph, SingleQueryGraph}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSearchSink, GraphResourceToDocument}
import ch.epfl.bluebrain.nexus.delta.rdf.graph.Graph
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
Expand All @@ -12,28 +28,34 @@ import shapeless.Typeable

import scala.concurrent.duration.FiniteDuration

/**
* A composite sink handles querying the common blazegraph namespace, transforming the result into a format that can be
* pushed into a target namespace or index, and finally sinks it into the target.
*/
trait CompositeSink extends Sink

/**
* A sink that queries N-Triples in Blazegraph, transforms them, and pushes the result to the provided sink
* @param queryGraph
* how to query the blazegraph
* @param transform
* function to transform a graph into the format needed by the sink
* @param sink
* function that allows
* function that defines how to sink a chunk of Elem[SinkFormat]
* @param chunkSize
* the maximum number of elements to be pushed in ES at once
* the maximum number of elements to be pushed into the sink
* @param maxWindow
* the maximum number of elements to be pushed at once
* the maximum time to wait for the chunk to gather [[chunkSize]] elements
* @tparam SinkFormat
* the type of data accepted by the sink
*/
final class CompositeSink[SinkFormat](
queryGraph: QueryGraph,
final class Single[SinkFormat](
queryGraph: SingleQueryGraph,
transform: GraphResource => Task[Option[SinkFormat]],
sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]],
override val chunkSize: Int,
override val maxWindow: FiniteDuration
) extends Sink {
) extends CompositeSink {

override type In = GraphResource
override def inType: Typeable[GraphResource] = Typeable[GraphResource]
Expand All @@ -54,3 +76,166 @@ final class CompositeSink[SinkFormat](
.flatMap(sink)

}

/**
* A sink that queries N-Triples in Blazegraph for multiple resources, transforms it for each resource, and pushes the
* result to the provided sink
* @param queryGraph
* how to query the blazegraph
* @param transform
* function to transform a graph into the format needed by the sink
* @param sink
* function that defines how to sink a chunk of Elem[SinkFormat]
* @param chunkSize
* the maximum number of elements to be pushed into the sink
* @param maxWindow
* the maximum time to wait for the chunk to gather [[chunkSize]] elements
* @tparam SinkFormat
* the type of data accepted by the sink
*/
final class Batch[SinkFormat](
queryGraph: BatchQueryGraph,
transform: GraphResource => Task[Option[SinkFormat]],
sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]],
override val chunkSize: Int,
override val maxWindow: FiniteDuration
)(implicit rcr: RemoteContextResolution)
extends CompositeSink {

override type In = GraphResource

override def inType: Typeable[GraphResource] = Typeable[GraphResource]

/** Performs the sparql query only using [[SuccessElem]]s from the chunk */
private def query(elements: Chunk[Elem[GraphResource]]): Task[Option[Graph]] =
elements.mapFilter(elem => elem.map(_.id).toOption) match {
case ids if ids.nonEmpty => queryGraph(ids)
case _ => Task.none
}

/** Replaces the graph of a provided [[GraphResource]] by extracting its new graph from the provided (full) graph. */
private def replaceGraph(gr: GraphResource, fullGraph: Graph) = {
implicit val api: JsonLdApi = JsonLdJavaApi.lenient
fullGraph
.replaceRootNode(iri"${gr.id}/alias")
.toCompactedJsonLd(ContextValue.empty)
.flatMap(_.toGraph)
.map(g => gr.copy(graph = g.replaceRootNode(gr.id)))
}

override def apply(elements: Chunk[Elem[GraphResource]]): Task[Chunk[Elem[Unit]]] =
for {
graph <- query(elements)
transformed <- graph match {
case Some(fullGraph) =>
elements.traverse { elem =>
elem.evalMapFilter { gr =>
replaceGraph(gr, fullGraph).flatMap(transform)
}
}
case None =>
Task.pure(elements.map(_.drop))
}
sank <- sink(transformed)
} yield sank
}

object CompositeSink {

/**
* @param blazeClient
* client used to connect to blazegraph
* @param namespace
* name of the target blazegraph namespace
* @param common
* name of the common blazegraph namespace
* @param cfg
* configuration of the composite views
* @return
* a function that given a sparql view returns a composite sink that has the view as target
*/
def blazeSink(
blazeClient: BlazegraphClient,
namespace: String,
common: String,
cfg: CompositeViewsConfig
)(implicit baseUri: BaseUri, rcr: RemoteContextResolution): SparqlProjection => CompositeSink = { target =>
compositeSink(
blazeClient,
common,
target.query,
GraphResourceToNTriples.graphToNTriples,
BlazegraphSink(blazeClient, cfg.blazegraphBatch, namespace).apply,
cfg.blazegraphBatch,
cfg.sinkConfig
)
}

/**
* @param blazeClient
* blazegraph client used to query the common space
* @param esClient
* client used to push to elasticsearch
* @param index
* name of the target elasticsearch index
* @param common
* name of the common blazegraph namespace
* @param cfg
* configuration of the composite views
* @return
* a function that given a elasticsearch view returns a composite sink that has the view as target
*/
def elasticSink(
blazeClient: BlazegraphClient,
esClient: ElasticSearchClient,
index: IndexLabel,
common: String,
cfg: CompositeViewsConfig
)(implicit rcr: RemoteContextResolution): ElasticSearchProjection => CompositeSink = { target =>
val esSink =
ElasticSearchSink.states(
esClient,
cfg.elasticsearchBatch.maxElements,
cfg.elasticsearchBatch.maxInterval,
index,
Refresh.False
)
compositeSink(
blazeClient,
common,
target.query,
new GraphResourceToDocument(target.context, target.includeContext).graphToDocument,
esSink.apply,
cfg.elasticsearchBatch,
cfg.sinkConfig
)
}

private def compositeSink[SinkFormat](
blazeClient: BlazegraphClient,
common: String,
query: SparqlConstructQuery,
transform: GraphResource => Task[Option[SinkFormat]],
sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]],
batchConfig: BatchConfig,
sinkConfig: SinkConfig
)(implicit rcr: RemoteContextResolution): CompositeSink = sinkConfig match {
case SinkConfig.Single =>
new Single(
new SingleQueryGraph(blazeClient, common, query),
transform,
sink,
batchConfig.maxElements,
batchConfig.maxInterval
)
case SinkConfig.Batch =>
new Batch(
new BatchQueryGraph(blazeClient, common, query),
transform,
sink,
batchConfig.maxElements,
batchConfig.maxInterval
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate")
) =>
CompositeSpaces.Builder(cfg.prefix, esClient, cfg.elasticsearchBatch, blazeClient, cfg.blazegraphBatch)(
CompositeSpaces.Builder(cfg.prefix, esClient, blazeClient, cfg)(
baseUri,
cr
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config

import akka.http.scaladsl.model.Uri
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config.BlazegraphViewsConfig.Credentials
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig.SinkConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.{BlazegraphAccess, RemoteSourceClientConfig, SourcesConfig}
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientConfig
import ch.epfl.bluebrain.nexus.delta.sdk.instances._
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{BatchConfig, EventLogConfig}
import com.typesafe.config.Config
import monix.bio.UIO
import pureconfig.error.CannotConvert
import pureconfig.generic.auto._
import pureconfig.generic.semiauto.deriveReader
import pureconfig.{ConfigReader, ConfigSource}
Expand Down Expand Up @@ -42,6 +44,8 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
* the interval at which a view will look for requested restarts
* @param indexingEnabled
* if false, disables composite view indexing
* @param sinkConfig
* type of sink used for composite indexing
*/
final case class CompositeViewsConfig(
sources: SourcesConfig,
Expand All @@ -55,7 +59,8 @@ final case class CompositeViewsConfig(
blazegraphBatch: BatchConfig,
elasticsearchBatch: BatchConfig,
restartCheckInterval: FiniteDuration,
indexingEnabled: Boolean
indexingEnabled: Boolean,
sinkConfig: SinkConfig
)

object CompositeViewsConfig {
Expand Down Expand Up @@ -106,6 +111,26 @@ object CompositeViewsConfig {
maxTimeWindow: FiniteDuration
)

object SinkConfig {

/** Represents the choice of composite sink */
sealed trait SinkConfig

/** A sink that only supports querying one resource at once from blazegraph */
case object Single extends SinkConfig

/** A sink that supports querying multiple resources at once from blazegraph */
case object Batch extends SinkConfig

implicit val sinkConfigReaderString: ConfigReader[SinkConfig] =
ConfigReader.fromString {
case "batch" => Right(Batch)
case "single" => Right(Single)
case value =>
Left(CannotConvert(value, SinkConfig.getClass.getSimpleName, s"$value is not one of: [single, batch]"))
}
}

/**
* Converts a [[Config]] into an [[CompositeViewsConfig]]
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.SparqlNTriples
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.idTemplating
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.graph.{Graph, NTriples}
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery
import fs2.Chunk
import monix.bio.Task

import java.util.regex.Pattern.quote

/**
* Provides a way to query for the multiple incoming resources (from a chunk). This assumes that the query contains the
* template: `VALUE ?id { {resource_id} }`. The result is a single Graph for all given resources.
* @param client
* the blazegraph client used to query
* @param namespace
* the namespace to query
* @param query
* the sparql query to perform
*/
final class BatchQueryGraph(client: BlazegraphClient, namespace: String, query: SparqlConstructQuery) {

private val logger: Logger = Logger[BatchQueryGraph]

private def newGraph(ntriples: NTriples): Task[Option[Graph]] =
if (ntriples.isEmpty) Task.none
else Task.fromEither(Graph(ntriples)).map(Some(_))

def apply(ids: Chunk[Iri]): Task[Option[Graph]] =
for {
ntriples <- client.query(Set(namespace), replaceIds(query, ids), SparqlNTriples)
graphResult <- newGraph(ntriples.value)
_ <- Task.when(graphResult.isEmpty)(
logger.debug(s"Querying blazegraph did not return any triples, '$ids' will be dropped.")
)
} yield graphResult

private def replaceIds(query: SparqlConstructQuery, iris: Chunk[Iri]): SparqlConstructQuery = {
val replacement = iris.foldLeft("") { (acc, iri) => acc + " " + s"<$iri>" }
SparqlConstructQuery.unsafe(query.value.replaceAll(quote(idTemplating), replacement))
}

}
Loading
Loading