-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSparqlRequestFlowBuilder.scala
42 lines (31 loc) · 1.37 KB
/
SparqlRequestFlowBuilder.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package ai.agnos.sparql.stream.client
import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition}
import ai.agnos.sparql.api.{SparqlConstruct, _}
trait SparqlRequestFlowBuilder extends SparqlQueryFlowBuilder
with SparqlConstructFlowBuilder
with SparqlUpdateFlowBuilder {
/**
* Create a flow of Sparql requests to results.
*
* @param endpointFlow the HTTP endpoint flow for the Sparql triple store server
* @return
*/
def sparqlRequestFlow(endpointFlow: HttpEndpointFlow[SparqlRequest]): Flow[SparqlRequest, SparqlResponse, NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val routes = 3
val partition = builder.add(Partition[SparqlRequest](routes, {
case SparqlRequest(_: SparqlQuery, _) => 0
case SparqlRequest(_: SparqlUpdate, _) => 1
case SparqlRequest(_: SparqlConstruct, _) => 2
}))
val responseMerger = builder.add(Merge[SparqlResponse](routes).named("merge.sparqlResponse"))
partition ~> sparqlQueryFlow(endpointFlow) ~> responseMerger
partition ~> sparqlUpdateFlow(endpointFlow) ~> responseMerger
partition ~> sparqlConstructFlow(endpointFlow) ~> responseMerger
FlowShape(partition.in, responseMerger.out)
} named "flow.sparqlRequestFlow")
}
}