Skip to content

Commit

Permalink
refactor: TestClientService: STTP instead of Apache HTTP (DEV-1627) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
siers authored May 28, 2024
1 parent 0488add commit ecb6f8e
Showing 1 changed file with 50 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,16 @@
package org.knora.webapi.testservices

import org.apache.http
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.CloseableHttpResponse
import org.apache.http.client.protocol.HttpClientContext
import org.apache.http.config.SocketConfig
import org.apache.http.entity.ContentType
import org.apache.http.entity.mime.HttpMultipartMode
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.impl.client.HttpClients
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
import org.apache.http.util.EntityUtils
import org.apache.pekko
import org.apache.pekko.actor.ActorSystem
import spray.json.*
import spray.json.JsObject
import sttp.capabilities.zio.ZioStreams
import sttp.client3
import sttp.client3.*
import sttp.client3.SttpBackend
import sttp.client3.httpclient.zio.HttpClientZioBackend
import zio.*

import java.nio.file.Path
Expand All @@ -43,7 +37,6 @@ import org.knora.webapi.messages.util.rdf.JsonLDDocument
import org.knora.webapi.messages.util.rdf.JsonLDUtil
import org.knora.webapi.settings.KnoraDispatchers
import org.knora.webapi.store.iiif.errors.SipiException
import org.knora.webapi.util.SipiUtil

import pekko.http.scaladsl.client.RequestBuilding
import pekko.http.scaladsl.unmarshalling.Unmarshal
Expand All @@ -65,10 +58,15 @@ final case class FileToUpload(path: Path, mimeType: ContentType)
*/
final case class InputFile(fileToUpload: FileToUpload, width: Int, height: Int)

final case class TestClientService(config: AppConfig, httpClient: CloseableHttpClient)(implicit system: ActorSystem)
final case class TestClientService(
config: AppConfig,
sttp: SttpBackend[Task, ZioStreams],
)(implicit system: ActorSystem)
extends TriplestoreJsonProtocol
with RequestBuilding {

private val targetHostUri = uri"${config.sipi.internalBaseUrl}"

implicit val executionContext: ExecutionContext = system.dispatchers.lookup(KnoraDispatchers.KnoraBlockingDispatcher)

case class TestClientTimeoutException(msg: String) extends Exception
Expand Down Expand Up @@ -161,209 +159,66 @@ final case class TestClientService(config: AppConfig, httpClient: CloseableHttpC
* The upload creates a multipart/form-data request which can contain multiple files.
*
* @param loginToken the login token to be included in the request to Sipi.
* @param filesToUpload the files to be uploaded.
* @param files the files to be uploaded.
* @return a [[SipiUploadResponse]] representing Sipi's response.
*/
def uploadToSipi(loginToken: String, filesToUpload: Seq[FileToUpload]): Task[SipiUploadResponse] = {

// builds the url for the operation
def uploadUrl(token: String) = ZIO.succeed(s"${config.sipi.internalBaseUrl}/upload?token=$token")

// create the entity builder
val builder: MultipartEntityBuilder = MultipartEntityBuilder.create()
builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE)

// add each file to the entity builder
filesToUpload.foreach { fileToUpload =>
builder.addBinaryBody(
"file",
fileToUpload.path.toFile(),
fileToUpload.mimeType,
fileToUpload.path.getFileName.toString,
)
}

// build our entity
val requestEntity: http.HttpEntity = builder.build()

// build the request
def request(url: String, requestEntity: http.HttpEntity) = {
val req = new http.client.methods.HttpPost(url)
req.setEntity(requestEntity)
req
}

def uploadToSipi(loginToken: String, files: Seq[FileToUpload]): Task[SipiUploadResponse] =
for {
url <- uploadUrl(loginToken)
entity <- ZIO.succeed(requestEntity)
req <- ZIO.succeed(request(url, entity))
response <- doSipiRequest(req)
sipiResponse <- ZIO.succeed(response.parseJson.asJsObject.convertTo[SipiUploadResponse])
} yield sipiResponse
}
url <- ZIO.succeed(targetHostUri.addPath("upload").addParam("token", loginToken))
multiparts = files.map { file =>
multipartFile("file", file.path.toFile)
.fileName(file.path.getFileName.toString)
.contentType(file.mimeType.toString)
}
response <- doSipiRequest(quickRequest.post(url).multipartBody(multiparts))
} yield response.parseJson.asJsObject.convertTo[SipiUploadResponse]

/**
* Uploads a file to the IIIF Service's "upload_without_processing" route and returns the information in Sipi's response.
* The upload creates a multipart/form-data request which can contain multiple files.
*
* @param loginToken the login token to be included in the request to Sipi.
* @param filesToUpload the files to be uploaded.
* @param files the files to be uploaded.
* @return a [[SipiUploadWithoutProcessingResponse]] representing Sipi's response.
*/
def uploadWithoutProcessingToSipi(
loginToken: String,
filesToUpload: Seq[FileToUpload],
): Task[SipiUploadWithoutProcessingResponse] = {

// builds the url for the operation
def uploadWithoutProcessingUrl(token: String) =
ZIO.succeed(s"${config.sipi.internalBaseUrl}/upload_without_processing?token=$token")

// create the entity builder
val builder: MultipartEntityBuilder = MultipartEntityBuilder.create()
builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE)

// add each file to the entity builder
filesToUpload.foreach { fileToUpload =>
builder.addBinaryBody(
"file",
fileToUpload.path.toFile(),
fileToUpload.mimeType,
fileToUpload.path.getFileName.toString,
)
}

// build our entity
val requestEntity: http.HttpEntity = builder.build()

// build the request
def request(url: String, requestEntity: http.HttpEntity) = {
val req = new http.client.methods.HttpPost(url)
req.setEntity(requestEntity)
req
}

files: Seq[FileToUpload],
): Task[SipiUploadWithoutProcessingResponse] =
for {
url <- uploadWithoutProcessingUrl(loginToken)
entity <- ZIO.succeed(requestEntity)
req <- ZIO.succeed(request(url, entity))
response <- doSipiRequest(req)
sipiResponse <- ZIO.succeed(response.parseJson.asJsObject.convertTo[SipiUploadWithoutProcessingResponse])
} yield sipiResponse
}

/**
* Makes an HTTP request to Sipi and returns the response.
*
* @param request the HTTP request.
* @return Sipi's response.
*/
private def doSipiRequest(request: http.HttpRequest): Task[String] = {
val targetHost: HttpHost =
new HttpHost(config.sipi.internalHost, config.sipi.internalPort, config.sipi.internalProtocol)
val httpContext: HttpClientContext = HttpClientContext.create()
var maybeResponse: Option[CloseableHttpResponse] = None

val response: Task[String] = ZIO.attemptBlocking {
maybeResponse = Some(httpClient.execute(targetHost, request, httpContext))

val responseEntityStr: String = Option(maybeResponse.get.getEntity) match {
case Some(responseEntity) => EntityUtils.toString(responseEntity)
case None => ""
}

val statusCode: Int = maybeResponse.get.getStatusLine.getStatusCode
val statusCategory: Int = statusCode / 100

// Was the request successful?
if (statusCategory == 2) {
// Yes.
responseEntityStr
url <- ZIO.succeed(targetHostUri.addPath("upload_without_processing").addParam("token", loginToken))
multiparts = files.map { file =>
multipartFile("file", file.path.toFile)
.fileName(file.path.getFileName.toString)
.contentType(file.mimeType.toString)
}
response <- doSipiRequest(quickRequest.post(url).multipartBody(multiparts))
} yield response.parseJson.asJsObject.convertTo[SipiUploadWithoutProcessingResponse]

private def doSipiRequest[T](request: Request[String, Any]): Task[String] =
sttp.send(request).flatMap { response =>
if (response.isSuccess) {
ZIO.succeed(response.body)
} else {
// No. Throw an appropriate exception.
val sipiErrorMsg = SipiUtil.getSipiErrorMessage(responseEntityStr)

if (statusCode == 404) {
throw NotFoundException(sipiErrorMsg)
} else if (statusCategory == 4) {
throw BadRequestException(s"Sipi responded with HTTP status code $statusCode: $sipiErrorMsg")
if (response.code.code == 404) {
ZIO.fail(NotFoundException(response.body))
} else if (response.isClientError) {
ZIO.fail(BadRequestException(s"Sipi responded with HTTP status code ${response.code.code}: ${response.body}"))
} else {
throw SipiException(s"Sipi responded with HTTP status code $statusCode: $sipiErrorMsg")
ZIO.fail(SipiException(s"Sipi responded with HTTP status code ${response.code.code}: ${response.body}"))
}
}
}

maybeResponse match {
case Some(response) => response.close()
case None => ()
}

response
}
}

object TestClientService {

/**
* Acquires a configured httpClient, backed by a connection pool,
* to be used in communicating with SIPI.
*/
private val acquire = ZIO.attemptBlocking {

// Create a connection manager with custom configuration.
val connManager: PoolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager()

// Create socket configuration
val socketConfig: SocketConfig = SocketConfig
.custom()
.setTcpNoDelay(true)
.build()

// Configure the connection manager to use socket configuration by default.
connManager.setDefaultSocketConfig(socketConfig)

// Validate connections after 1 sec of inactivity
connManager.setValidateAfterInactivity(1000)

// Configure total max or per route limits for persistent connections
// that can be kept in the pool or leased by the connection manager.
connManager.setMaxTotal(100)
connManager.setDefaultMaxPerRoute(10)

// Sipi custom default request config
val sipiTimeoutMillis = 120 * 1000
val defaultRequestConfig = RequestConfig
.custom()
.setConnectTimeout(sipiTimeoutMillis)
.setConnectionRequestTimeout(sipiTimeoutMillis)
.setSocketTimeout(sipiTimeoutMillis)
.build()

// Create an HttpClient with the given custom dependencies and configuration.
val httpClient: CloseableHttpClient = HttpClients
.custom()
.setConnectionManager(connManager)
.setDefaultRequestConfig(defaultRequestConfig)
.build()

httpClient
}.tap(_ => ZIO.logDebug(">>> Acquire Test Client Service <<<")).orDie

/**
* Releases the httpClient, freeing all resources.
*/
private def release(httpClient: CloseableHttpClient)(implicit system: ActorSystem) = ZIO.attemptBlocking {
pekko.http.scaladsl.Http().shutdownAllConnectionPools()
httpClient.close()
}.tap(_ => ZIO.logDebug(">>> Release Test Client Service <<<")).orDie

def layer: ZLayer[ActorSystem & AppConfig, Nothing, TestClientService] =
ZLayer.scoped {
for {
sys <- ZIO.service[ActorSystem]
config <- ZIO.service[AppConfig]
httpClient <- ZIO.acquireRelease(acquire)(release(_)(sys))
} yield TestClientService(config, httpClient)(sys)
}

HttpClientZioBackend.layer().orDie >+>
ZLayer.scoped {
for {
sys <- ZIO.service[ActorSystem]
config <- ZIO.service[AppConfig]
sttp <- ZIO.service[SttpBackend[Task, ZioStreams]]
} yield TestClientService(config, sttp)(sys)
}
}

0 comments on commit ecb6f8e

Please sign in to comment.