-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSparqlQueryFlowBuilder.scala
114 lines (91 loc) · 4.24 KB
/
SparqlQueryFlowBuilder.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package ai.agnos.sparql.stream.client
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, Source}
import akka.util.ByteString
import ai.agnos.sparql.api._
import scala.util.{Failure, Success, Try}
import spray.json._
import scala.concurrent.ExecutionContext
trait SparqlQueryFlowBuilder extends SparqlClientHelpers with ErrorHandlerSupport {
import ai.agnos.sparql.mapper.SparqlClientJsonProtocol._
implicit val system: ActorSystem
implicit val materializer: ActorMaterializer
implicit val dispatcher: ExecutionContext
def sparqlQueryFlow(endpointFlow: HttpEndpointFlow[SparqlRequest]): Flow[SparqlRequest, SparqlResponse, Any] = {
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val routes = 2
val partition = builder.add(Partition[SparqlRequest](routes, {
case SparqlRequest(SparqlQuery(_, _, StreamedQuery(_),_,_,_,_,_,_,_), _) => 0
case SparqlRequest(SparqlQuery(_, _,_: MappedQuery[_],_,_,_,_,_,_,_), _) => 1
}))
val responseMerger = builder.add(Merge[SparqlResponse](routes).named("merge.sparqlResponse"))
partition.out(0) ~> sparqlQueryToStreamFlow(endpointFlow) ~> responseMerger
partition.out(1) ~> sparqlQueryToStreamFlow(endpointFlow) ~> responseUnmarshallerFlow() ~> responseMerger
FlowShape(partition.in, responseMerger.out)
} named "flow.sparqlRequestFlow")
}
private def responseUnmarshallerFlow(): Flow[SparqlResponse, SparqlResponse, Any] = {
Flow[SparqlResponse]
.flatMapConcat {
case response@SparqlResponse(
SparqlRequest(
SparqlQuery(_, _, mappedQueryType: MappedQuery[_],_,_,_,_,_,_,_), _
),
true,
_,
List(StreamingSparqlResult(dataStream, Some(contentType))), _
) if isSparqlResultsJson(contentType) =>
Source.fromFuture {
dataStream.runFold(ByteString.empty)(_ ++ _).map { data =>
Try(format3.read(data.utf8String.parseJson)) match {
case Success(x: ResultSet) =>
response.copy(result = mappedQueryType.mapper.map(x))
case Failure(err) =>
errorHandler.handleError(err)
response.copy(
success = false, result = Nil,
error = Some(SparqlClientRequestFailedWithError("failed to un-marshall result", err))
)
}
}
}
// catchall for all streaming responses, we need to process the response entities stream, otherwise
// we will get back-pressure issues.
case response@SparqlResponse(
_, _, _, List(StreamingSparqlResult(dataStream, contentType)), _) =>
dataStream.map { data =>
response.copy(
success = false, result = Nil,
error = Some(SparqlClientRequestFailed(
s"unsupported result type [${contentType.getOrElse("Unknown")}] ${data.take(1024).utf8String}...")
)
)
}
// we don't care about errors
case r: SparqlResponse =>
Source.single(r.copy(
success = false, result = Nil,
error = Some(SparqlClientRequestFailed(s"invalid request"))
))
}
}
private def sparqlQueryToStreamFlow(endpointFlow: HttpEndpointFlow[SparqlRequest]): Flow[SparqlRequest, SparqlResponse, Any] = {
Flow[SparqlRequest]
.map {
case request@SparqlRequest(query: SparqlQuery, _) =>
(makeHttpRequest(endpointFlow.endpoint, query), request)
}
.log("SPARQL endpoint request")
.via(endpointFlow.flow)
.log("SPARQL endpoint response")
.map {
case (Success(HttpResponse(status, _, entity, _)), request) =>
SparqlResponse(request, status == StatusCodes.OK, status, result = StreamingSparqlResult(entity.dataBytes, Some(entity.contentType)) :: Nil)
case (Failure(error), request) =>
SparqlResponse(request, success = false, error = Some(SparqlClientRequestFailedWithError("failed to execute sparql query", error)))
}
}
}