From 0488add9f2538322f055c0717b258af6620c2fe3 Mon Sep 17 00:00:00 2001 From: Raitis Veinbahs Date: Tue, 28 May 2024 13:08:54 +0200 Subject: [PATCH] refactor: TriplestoreServiceLive: use Sttp with ZIO instead of Apache HTTP (DEV-1627) (#3251) --- .../domain/service/DspIngestClient.scala | 11 +- .../store/triplestore/impl/FusekiPaths.scala | 14 +- .../impl/TriplestoreServiceLive.scala | 512 ++++++------------ 3 files changed, 187 insertions(+), 350 deletions(-) diff --git a/webapi/src/main/scala/org/knora/webapi/slice/admin/domain/service/DspIngestClient.scala b/webapi/src/main/scala/org/knora/webapi/slice/admin/domain/service/DspIngestClient.scala index 6be33f6845..87e244be02 100644 --- a/webapi/src/main/scala/org/knora/webapi/slice/admin/domain/service/DspIngestClient.scala +++ b/webapi/src/main/scala/org/knora/webapi/slice/admin/domain/service/DspIngestClient.scala @@ -6,6 +6,8 @@ package org.knora.webapi.slice.admin.domain.service import sttp.capabilities.zio.ZioStreams +import sttp.client3.Empty +import sttp.client3.RequestT import sttp.client3.SttpBackend import sttp.client3.UriContext import sttp.client3.asStreamAlways @@ -70,10 +72,11 @@ final case class DspIngestClientLive( private def projectsPath(shortcode: Shortcode) = s"${dspIngestConfig.baseUrl}/projects/${shortcode.value}" - private val authenticatedRequest = jwtService - .createJwtForDspIngest() - .map(_.jwtString) - .map(basicRequest.auth.bearer(_)) + private val authenticatedRequest: ZIO[Any, Nothing, RequestT[Empty, Either[String, String], Any]] = + jwtService + .createJwtForDspIngest() + .map(_.jwtString) + .map(basicRequest.auth.bearer(_)) override def getAssetInfo(shortcode: Shortcode, assetId: AssetId): Task[AssetInfoResponse] = for { diff --git a/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/FusekiPaths.scala b/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/FusekiPaths.scala index 7bdef5e6b6..98722595eb 100644 --- a/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/FusekiPaths.scala +++ b/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/FusekiPaths.scala @@ -21,11 +21,11 @@ trait FusekiTriplestore { } case class FusekiPaths(config: Fuseki) { - val checkServer = "/$/server" - val repository = s"/${config.repositoryName}" - val data = s"$repository/data" - val get = s"$repository/get" - val query = s"$repository/query" - val update = s"$repository/update" - val datasets = "/$/datasets" + val checkServer: List[String] = List("$", "server") + val repository: List[String] = List(config.repositoryName) + val data: List[String] = repository :+ "data" + val get: List[String] = repository :+ "get" + val query: List[String] = repository :+ "query" + val update: List[String] = repository :+ "update" + val datasets: List[String] = List("$", "datasets") } diff --git a/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceLive.scala b/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceLive.scala index 076e5ed02e..150837c47a 100644 --- a/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceLive.scala +++ b/webapi/src/main/scala/org/knora/webapi/store/triplestore/impl/TriplestoreServiceLive.scala @@ -6,43 +6,28 @@ package org.knora.webapi.store.triplestore.impl import org.apache.commons.lang3.StringUtils -import org.apache.http.Consts -import org.apache.http.HttpEntity import org.apache.http.HttpHost -import org.apache.http.HttpRequest -import org.apache.http.NameValuePair -import org.apache.http.auth.AuthScope -import org.apache.http.auth.UsernamePasswordCredentials -import org.apache.http.client.config.RequestConfig -import org.apache.http.client.entity.UrlEncodedFormEntity -import org.apache.http.client.methods.CloseableHttpResponse -import org.apache.http.client.methods.HttpGet -import org.apache.http.client.methods.HttpPost -import org.apache.http.client.utils.URIBuilder -import org.apache.http.config.SocketConfig -import org.apache.http.entity.ContentType -import org.apache.http.entity.FileEntity -import org.apache.http.entity.StringEntity -import org.apache.http.impl.client.BasicCredentialsProvider -import org.apache.http.impl.client.CloseableHttpClient -import org.apache.http.impl.client.HttpClients -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager -import org.apache.http.message.BasicNameValuePair -import org.apache.http.util.EntityUtils import spray.json.* +import sttp.capabilities.zio.ZioStreams +import sttp.client3.Empty +import sttp.client3.Request +import sttp.client3.RequestT +import sttp.client3.Response +import sttp.client3.SttpBackend +import sttp.client3.SttpBackendOptions +import sttp.client3.UriContext +import sttp.client3.basicRequest +import sttp.client3.httpclient.zio.HttpClientZioBackend import zio.* import zio.metrics.Metric +import zio.nio.file.Path as NioPath -import java.io.BufferedInputStream -import java.net.URI import java.nio.charset.StandardCharsets import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths -import java.nio.file.StandardCopyOption -import java.nio.file.StandardOpenOption +import java.nio.file.StandardOpenOption.* import java.time.temporal.ChronoUnit -import java.util import scala.io.Source import dsp.errors.* @@ -50,6 +35,7 @@ import org.knora.webapi.* import org.knora.webapi.config.Triplestore import org.knora.webapi.messages.StringFormatter import org.knora.webapi.messages.store.triplestoremessages.* +import org.knora.webapi.messages.store.triplestoremessages.FusekiJsonProtocol.fusekiServerFormat import org.knora.webapi.messages.store.triplestoremessages.SparqlResultProtocol.* import org.knora.webapi.messages.util.rdf.* import org.knora.webapi.slice.resourceinfo.domain.InternalIri @@ -68,12 +54,21 @@ import org.knora.webapi.util.FileUtil case class TriplestoreServiceLive( triplestoreConfig: Triplestore, - queryHttpClient: CloseableHttpClient, - targetHost: HttpHost, + sttp: SttpBackend[Task, ZioStreams], )(implicit val sf: StringFormatter) extends TriplestoreService with FusekiTriplestore { + private val targetHostUri = { + val proto = if (triplestoreConfig.useHttps) "https" else "http" + val host = new HttpHost(triplestoreConfig.host, triplestoreConfig.fuseki.port, proto) + uri"${(host).toURI}" + } + + // NOTE: possibly quickRequest might be used instead of basicRequest (no Either) + private val authenticatedRequest: RequestT[Empty, Either[String, String], Any] = + basicRequest.auth.basic(triplestoreConfig.fuseki.username, triplestoreConfig.fuseki.password) + private val requestTimer = Metric.timer( "fuseki_request_duration", @@ -150,9 +145,12 @@ case class TriplestoreServiceLive( * @return [[Unit]]. */ override def query(query: Update): Task[Unit] = { - val request = new HttpPost(paths.update) - request.setEntity(new StringEntity(query.sparql, ContentType.create(mimeTypeApplicationSparqlUpdate, Consts.UTF_8))) - trackQueryDuration(query, doHttpRequest(request, _ => ZIO.unit)) + val request = authenticatedRequest + .post(targetHostUri.addPath(paths.update)) + .body(query.sparql) + .contentType(mimeTypeApplicationSparqlUpdate) + + trackQueryDuration(query, doHttpRequest(request)).unit } /** @@ -225,66 +223,33 @@ case class TriplestoreServiceLive( override def insertDataIntoTriplestore( rdfDataObjects: List[RdfDataObject], prependDefaults: Boolean, - ): Task[Unit] = { - val calculateCompleteRdfDataObjectList: Task[NonEmptyChunk[RdfDataObject]] = - if (prependDefaults) { - ZIO.succeed(DefaultRdfData.data.append(Chunk(rdfDataObjects: _*))) - } else { - NonEmptyChunk - .fromIterableOption(rdfDataObjects) - .fold[Task[NonEmptyChunk[RdfDataObject]]]( - ZIO.fail(BadRequestException("Cannot insert list with empty data into triplestore.")), - )( - ZIO.succeed(_), - ) - } - + ): Task[Unit] = for { - _ <- ZIO.logDebug("==>> Loading Data Start") - list <- calculateCompleteRdfDataObjectList - request <- - ZIO.foreach(list)(elem => - for { - graphName <- - if (elem.name.toLowerCase == "default") { - ZIO.fail(TriplestoreUnsupportedFeatureException("Requests to the default graph are not supported")) - } else { - ZIO.succeed(elem.name) - } - - uriBuilder <- - ZIO.attempt { - val uriBuilder: URIBuilder = new URIBuilder(paths.data) - uriBuilder.addParameter("graph", graphName) // Note: addParameter encodes the graphName URL - uriBuilder - } - - rdfContents <- loadRdfObject(elem.path) - - httpPost <- - ZIO.attemptBlocking { - val httpPost = new HttpPost(uriBuilder.build()) - val turtleContentType = ContentType.create(mimeTypeTextTurtle, "UTF-8") - httpPost.setEntity(new StringEntity(rdfContents, turtleContentType)) - httpPost - } - responseHandler <- ZIO.attempt(returnInsertGraphDataResponse(graphName)(_)) - } yield (httpPost, responseHandler), - ) - _ <- ZIO.foreachDiscard(request)(elem => doHttpRequest(request = elem._1, processResponse = elem._2)) - _ <- ZIO.logDebug("==>> Loading Data End") + _ <- ZIO.logDebug("==>> Loading Data Start") + objects = DefaultRdfData.data.toList.filter(_ => prependDefaults) ++ rdfDataObjects + _ <- ZIO.foreach(objects)(insertObjectIntoTriplestore) + _ <- ZIO.logDebug("==>> Loading Data End") } yield () - } /** - * Load the RdfDataObject into some AbstractHttpEntity from either a relative ../$path or through a resource. + * Insert the RdfDataObject into the triplestore from either a relative ../$path or through a resource. */ - private def loadRdfObject(path: String): Task[String] = { - val relativeFileEntity = ZIO.readFile(Paths.get("..", path)) - val resourceStringEntity = ZIO.attemptBlocking(Source.fromResource(path).mkString) - - relativeFileEntity.orElse(resourceStringEntity) - } + private def insertObjectIntoTriplestore(rdfDataObject: RdfDataObject): Task[Unit] = + for { + graphName <- ZIO + .fail(TriplestoreUnsupportedFeatureException("Requests to the default graph are not supported")) + .when(rdfDataObject.name.toLowerCase == "default") + .as(rdfDataObject.name) + rdfContents <- ZIO + .readFile(Paths.get("..", rdfDataObject.path)) + .orElse(ZIO.attemptBlocking(Source.fromResource(rdfDataObject.path).mkString)) + request = authenticatedRequest + .post(targetHostUri.addPath(paths.data).addParam("graph", graphName)) + .body(rdfContents) + .contentType(mimeTypeTextTurtle) + _ <- ZIO.logDebug(s"INSERT: ${request.uri}") + _ <- doHttpRequest(request).map(_.body).flatMap(ensuringBody(_)) + } yield () /** * Checks the Fuseki triplestore if it is available and configured correctly. If it is not @@ -316,22 +281,21 @@ case class TriplestoreServiceLive( /** * Call an endpoint that returns all datasets and check if our required dataset is present. */ - private def checkTriplestoreInitialized(): Task[Boolean] = { - - val request = new HttpGet(paths.checkServer) - request.addHeader("Accept", mimeTypeApplicationJson) - - def checkForExpectedDataset(response: String) = ZIO.attempt { - val nameShouldBe = fusekiConfig.repositoryName - import org.knora.webapi.messages.store.triplestoremessages.FusekiJsonProtocol.* - val fusekiServer: FusekiServer = JsonParser(response).convertTo[FusekiServer] - val neededDataset: Option[FusekiDataset] = - fusekiServer.datasets.find(dataset => dataset.dsName == s"/$nameShouldBe" && dataset.dsState) - neededDataset.nonEmpty - } - - doHttpRequest(request, returnResponseAsString).flatMap(checkForExpectedDataset) - } + private def checkTriplestoreInitialized(): Task[Boolean] = + for { + response <- doHttpRequest( + authenticatedRequest + .get(targetHostUri.addPath(paths.checkServer)) + .header("Accept", mimeTypeApplicationJson), + ) + expectedFound <- ZIO.attempt { + JsonParser(response.body.toOption.getOrElse("")) + .convertTo[FusekiServer] + .datasets + .find(dataset => dataset.dsName == s"/${fusekiConfig.repositoryName}" && dataset.dsState) + .nonEmpty + } + } yield expectedFound /** * Initialize the Jena Fuseki triplestore. Currently only works for @@ -341,35 +305,23 @@ case class TriplestoreServiceLive( * (`test/resources/test.conf`). Usage is only recommended for automated * testing and not for production use. */ - private def initJenaFusekiTriplestore(): Task[Unit] = { - - val httpPost = ZIO.attemptBlocking { - val configFileName = s"fuseki-repository-config.ttl.template" - - // take config from the classpath and write to triplestore - val configFile: String = - FileUtil.readTextResource(configFileName).replace("@REPOSITORY@", fusekiConfig.repositoryName) - - val httpPost: HttpPost = new HttpPost("/$/datasets") - val stringEntity = new StringEntity(configFile, ContentType.create(mimeTypeTextTurtle)) - httpPost.setEntity(stringEntity) - httpPost - } - - httpPost.flatMap(doHttpRequest(_, _ => ZIO.unit)).unit - } + private def initJenaFusekiTriplestore(): Task[Unit] = + for { + configFile <- + ZIO.attemptBlocking { + FileUtil + .readTextResource(s"fuseki-repository-config.ttl.template") + .replace("@REPOSITORY@", fusekiConfig.repositoryName) + } - /** - * Makes a triplestore URI for downloading a named graph. - * - * @param graphIri the IRI of the named graph. - * @return a triplestore-specific URI for downloading the named graph. - */ - private def makeNamedGraphDownloadUri(graphIri: IRI): URI = { - val uriBuilder: URIBuilder = new URIBuilder(paths.get) - uriBuilder.setParameter("graph", s"$graphIri") - uriBuilder.build() - } + _ <- + doHttpRequest( + authenticatedRequest + .post(targetHostUri.addPath(paths.datasets)) + .contentType(mimeTypeTextTurtle) + .body(configFile), + ) + } yield () /** * Requests the contents of a named graph, saving the response in a file. @@ -383,30 +335,35 @@ case class TriplestoreServiceLive( graphIri: InternalIri, outputFile: zio.nio.file.Path, outputFormat: QuadFormat, - ): Task[Unit] = { - val request = new HttpGet(makeNamedGraphDownloadUri(graphIri.value)) - request.addHeader("Accept", mimeTypeTextTurtle) - doHttpRequest(request, writeResponseFileAsTurtleContent(outputFile.toFile.toPath, graphIri.value, outputFormat)) - }.unit + ): Task[Unit] = + for { + request <- + ZIO.succeed( + authenticatedRequest + .get(targetHostUri.addPath(paths.get).addParam("graph", s"${graphIri.value}")) + .header("Accept", mimeTypeTextTurtle), + ) + response <- doHttpRequest(request) + rdfBody <- ensuringBody(response.body).map(RdfStringSource(_)) + _ <- ZIO.attemptBlocking { + RdfFormatUtil.turtleToQuadsFile(rdfBody, graphIri.value, outputFile.toFile.toPath, outputFormat, APPEND) + } + } yield () private def executeSparqlQuery( query: SparqlQuery, acceptMimeType: String = mimeTypeApplicationSparqlResultsJson, - ) = { + ): Task[String] = { val timeout: Duration = query.timeout match { case SparqlTimeout.Standard => triplestoreConfig.queryTimeout case SparqlTimeout.Maintenance => triplestoreConfig.maintenanceTimeout case SparqlTimeout.Gravsearch => triplestoreConfig.gravsearchTimeout } - val formParams = new util.ArrayList[NameValuePair]() - formParams.add(new BasicNameValuePair("query", query.sparql)) - formParams.add(new BasicNameValuePair("timeout", s"${timeout.toSeconds}")) - - val request: HttpPost = new HttpPost(paths.query) - request.setEntity(new UrlEncodedFormEntity(formParams, Consts.UTF_8)) - request.addHeader("Accept", acceptMimeType) - trackQueryDuration(query, doHttpRequest(request, returnResponseAsString)) + val params = Map(("query", query.sparql), ("timeout", timeout.toSeconds.toString)) + val uri = targetHostUri.addPath(paths.query).addParams(params) + val request = authenticatedRequest.post(uri).header("Accept", acceptMimeType) + trackQueryDuration(query, doHttpRequest(request).map(_.body.merge)) } private def trackQueryDuration[T](query: SparqlQuery, reqTask: Task[T]): Task[T] = { @@ -437,226 +394,103 @@ case class TriplestoreServiceLive( * @param graphs Specify which graphs are to be dumped. * @return [[Unit]] Or fails if the export was not successful. */ - override def downloadRepository(outputFile: Path, graphs: GraphsForMigration): Task[Unit] = { - def downloadWholeRepository = { - val request = new HttpGet(paths.repository) - request.addHeader("Accept", mimeTypeApplicationNQuads) - doHttpRequest(request, writeResponseFileAsPlainContent(outputFile)).unit - } + override def downloadRepository(outputFile: Path, graphs: GraphsForMigration): Task[Unit] = graphs match { case MigrateAllGraphs => - downloadWholeRepository + for { + response <- + doHttpRequest( + authenticatedRequest + .get(targetHostUri.addPath(paths.repository)) + .header("Accept", mimeTypeApplicationNQuads), + ) + body <- ensuringBody(response.body) + _ <- ZIO.attempt(Files.write(outputFile, body.getBytes(StandardCharsets.UTF_8), CREATE, TRUNCATE_EXISTING)) + } yield () case MigrateSpecificGraphs(graphIris) => - ZIO.foreach(graphIris)(graphIri => downloadGraph(graphIri, zio.nio.file.Path.fromJava(outputFile), NQuads)).unit + ZIO.foreach(graphIris)(downloadGraph(_, NioPath.fromJava(outputFile), NQuads)).unit } - } + + private def ensuringBody(body: Either[String, String]): Task[String] = + ZIO + .succeed(body.toOption) + .someOrFail(TriplestoreResponseException("Triplestore returned no content for for repository dump")) /** * Uploads repository content from an N-Quads file. * * @param inputFile an N-Quads file containing the content to be uploaded to the repository. */ - override def uploadRepository(inputFile: Path): Task[Unit] = { - val fileEntity = new FileEntity(inputFile.toFile, ContentType.create(mimeTypeApplicationNQuads, "UTF-8")) - val request: HttpPost = new HttpPost(paths.repository) - request.setEntity(fileEntity) - doHttpRequest(request, _ => ZIO.unit).unit - } + override def uploadRepository(inputFile: Path): Task[Unit] = + ZIO + .readFile(inputFile) + .map( + authenticatedRequest + .post(targetHostUri.addPath(paths.repository)) + .contentType(mimeTypeApplicationNQuads, "UTF-8") + .body(_), + ) + .flatMap(doHttpRequest) + .unit - /** - * Makes an HTTP connection to the triplestore, and delegates processing of the response - * to a function. - * - * @param request the request to be sent. - * @tparam T the return type of `processError`. - * @return the return value of `processError`. - */ private def doHttpRequest[T]( - request: HttpRequest, - processResponse: CloseableHttpResponse => Task[T], - ): Task[T] = { - - def executeQuery(): Task[CloseableHttpResponse] = { - ZIO - .attempt(queryHttpClient.execute(targetHost, request)) - .catchSome { - case socketTimeoutException: java.net.SocketTimeoutException => - val message = - "The triplestore took too long to process a request. This can happen because the triplestore needed too much time to search through the data that is currently in the triplestore. Query optimisation may help." - val error = TriplestoreTimeoutException(message, socketTimeoutException) - ZIO.logError(error.toString) *> - ZIO.fail(error) - case e: Exception => - val message = s"Failed to connect to triplestore." - val error = TriplestoreConnectionException(message, Some(e)) - ZIO.logError(error.toString) *> - ZIO.fail(error) - } + request: Request[Either[String, String], Any], + ): Task[Response[Either[String, String]]] = { + def executeQuery(request: Request[Either[String, String], Any]): Task[Response[Either[String, String]]] = { + request.send(sttp).catchSome { + case socketTimeoutException: java.net.SocketTimeoutException => + val message = + "The triplestore took too long to process a request. This can happen because the triplestore needed too much time to search through the data that is currently in the triplestore. Query optimisation may help." + val error = TriplestoreTimeoutException(message, socketTimeoutException) + ZIO.logError(error.toString) *> ZIO.fail(error) + case e: Exception => + val message = s"Failed to connect to triplestore." + val error = TriplestoreConnectionException(message, Some(e)) + ZIO.logError(error.toString) *> ZIO.fail(error) + } } <* ZIO.logDebug(s"Executing Query: $request") - def checkResponse(response: CloseableHttpResponse, statusCode: Int): Task[Unit] = + def checkResponse(response: Response[Either[String, String]]): Task[Unit] = ZIO - .unless(statusCode / 100 == 2) { - val entity = - Option(response.getEntity) - .map(responseEntity => EntityUtils.toString(responseEntity, StandardCharsets.UTF_8)) - - val statusResponseMsg = - s"Triplestore responded with HTTP code $statusCode" + .unless(response.code.isSuccess) { + val statusResponseMsg = s"Triplestore responded with HTTP code ${response.code}" - (statusCode, entity) match { - case (404, _) => ZIO.fail(NotFoundException.notFound) - case (400, Some(response)) if response.contains("Text search parse error") => + (response.code.code, response.body.merge) match { + case (404, _) => + ZIO.fail(NotFoundException.notFound) + case (400, response) if response.contains("Text search parse error") => ZIO.fail(BadRequestException(s"$response")) - case (500, _) => ZIO.fail(TriplestoreResponseException(statusResponseMsg)) - case (503, Some(response)) if response.contains("Query timed out") => + case (503, response) if response.contains("Query timed out") => ZIO.fail(TriplestoreTimeoutException(s"$statusResponseMsg: $response")) - case (503, _) => ZIO.fail(TriplestoreResponseException(statusResponseMsg)) - case _ => ZIO.fail(TriplestoreResponseException(statusResponseMsg)) + case _ => + ZIO.fail(TriplestoreResponseException(statusResponseMsg)) } } .unit - def getResponse = ZIO.acquireRelease(executeQuery())(response => ZIO.succeed(response.close())) - - ZIO.scoped(for { - _ <- ZIO.logDebug("Executing query...") - response <- getResponse - statusCode <- ZIO.attempt(response.getStatusLine.getStatusCode) - _ <- ZIO.logDebug(s"Executing query done with status code: $statusCode") - _ <- checkResponse(response, statusCode) - _ <- ZIO.logDebug("Checking response done.") - result <- processResponse(response) - _ <- ZIO.logDebug("Processing response done.") - } yield result) + for { + _ <- ZIO.logDebug("Executing query...") + response <- executeQuery(request) + _ <- ZIO.logDebug(s"Executed query with status code: ${response.code}") + _ <- checkResponse(response) + } yield response } - - /** - * Attempts to transforms a [[CloseableHttpResponse]] to a [[String]]. - */ - private def returnResponseAsString(response: CloseableHttpResponse): Task[String] = - Option(response.getEntity) match { - case None => ZIO.succeed("") - case Some(responseEntity) => - ZIO - .attempt(EntityUtils.toString(responseEntity, StandardCharsets.UTF_8)) - .tapDefect(e => ZIO.logError(s"Failed to return response as string: $e")) - } - - /** - * Attempts to transforms a [[CloseableHttpResponse]] to a [[Unit]]. - */ - private def returnInsertGraphDataResponse( - graphName: String, - )(response: CloseableHttpResponse): Task[Unit] = - Option(response.getEntity) match { - case None => ZIO.fail(TriplestoreResponseException(s"$graphName could not be inserted into Triplestore.")) - case Some(_) => ZIO.unit - } - - /** - * Writes an HTTP response the response is written as-is to the output file. - * - * @param outputFile the output file. - * @param response the response to be read. - * @return a [[Unit]]. - */ - private def writeResponseFileAsPlainContent( - outputFile: Path, - )(response: CloseableHttpResponse): Task[Unit] = - Option(response.getEntity) match { - case Some(responseEntity: HttpEntity) => - // Stream the HTTP entity directly to the output file. - ZIO.attempt(Files.copy(responseEntity.getContent, outputFile, StandardCopyOption.REPLACE_EXISTING)).unit - case None => - val error = TriplestoreResponseException(s"Triplestore returned no content for for repository dump") - ZIO.logError(error.toString) *> - ZIO.fail(error) - } - - /** - * Writes an HTTP response to a file, where the response is parsed as Turtle - * and converted to the output format, with the graph IRI added to each statement. - * - * @param outputFile the output file. - * @param graphIri the IRI of the graph used in the output. - * @param quadFormat the output format. - * @param response the response to be read. - * @return a [[Unit]]. - */ - private def writeResponseFileAsTurtleContent( - outputFile: Path, - graphIri: IRI, - quadFormat: QuadFormat, - )(response: CloseableHttpResponse): Task[Unit] = - Option(response.getEntity) match { - case Some(responseEntity: HttpEntity) => - ZIO.attemptBlocking { - // Yes. Stream the HTTP entity to a temporary Turtle file. - val tempTurtleFile = Paths.get(outputFile.toString + ".ttl") - Files.copy(responseEntity.getContent, tempTurtleFile, StandardCopyOption.REPLACE_EXISTING) - - RdfFormatUtil.turtleToQuadsFile( - RdfInputStreamSource(new BufferedInputStream(Files.newInputStream(tempTurtleFile))), - graphIri, - outputFile, - quadFormat, - StandardOpenOption.APPEND, - ) - - Files.delete(tempTurtleFile) - () - } - - case None => - val message = s"Triplestore returned no content for graph $graphIri" - val error = TriplestoreResponseException(message) - ZIO.logError(error.toString) *> - ZIO.fail(error) - } } object TriplestoreServiceLive { - private def makeHttpClient(config: Triplestore, host: HttpHost) = - ZIO.acquireRelease { - val connManager = new PoolingHttpClientConnectionManager() - connManager.setDefaultSocketConfig(SocketConfig.custom().setTcpNoDelay(true).build()) - connManager.setValidateAfterInactivity(1000) - connManager.setMaxTotal(100) - connManager.setDefaultMaxPerRoute(15) - - val credentialsProvider = new BasicCredentialsProvider - credentialsProvider.setCredentials( - new AuthScope(host.getHostName, host.getPort), - new UsernamePasswordCredentials(config.fuseki.username, config.fuseki.password), - ) - - // the client config used for queries to the triplestore. The timeout has to be larger than - // tripleStoreConfig.queryTimeoutAsDuration and tripleStoreConfig.gravsearchTimeoutAsDuration. - val requestTimeoutMillis = 7_200_000 // 2 hours - val requestConfig = RequestConfig - .custom() - .setConnectTimeout(requestTimeoutMillis) - .setConnectionRequestTimeout(requestTimeoutMillis) - .setSocketTimeout(requestTimeoutMillis) - .build - - val httpClient: CloseableHttpClient = HttpClients - .custom() - .setConnectionManager(connManager) - .setDefaultCredentialsProvider(credentialsProvider) - .setDefaultRequestConfig(requestConfig) - .build() - ZIO.succeed(httpClient) - }(client => ZIO.attemptBlocking(client.close()).logError.ignore) + import scala.concurrent.duration._ val layer: URLayer[Triplestore & StringFormatter, TriplestoreService] = - ZLayer.scoped { - for { - sf <- ZIO.service[StringFormatter] - config <- ZIO.service[Triplestore] - host = new HttpHost(config.host, config.fuseki.port, "http") - client <- makeHttpClient(config, host) - } yield TriplestoreServiceLive(config, client, host)(sf) - } + HttpClientZioBackend + .layer( + SttpBackendOptions.connectionTimeout(2.hours), + ) + .orDie >+> + ZLayer.scoped { + for { + sf <- ZIO.service[StringFormatter] + config <- ZIO.service[Triplestore] + sttp <- ZIO.service[SttpBackend[Task, ZioStreams]] + } yield TriplestoreServiceLive(config, sttp)(sf) + } }