Skip to content

Commit

Permalink
fix: correctly handle async cancellation of call context in OkHttp en…
Browse files Browse the repository at this point in the history
…gine (#1063)
  • Loading branch information
ianbotsf authored Apr 17, 2024
1 parent 66cd2f9 commit 28a28f5
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 5 deletions.
8 changes: 8 additions & 0 deletions .changes/289fe06b-6d65-47da-a007-360398c39244.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "289fe06b-6d65-47da-a007-360398c39244",
"type": "bugfix",
"description": "Correctly handle async cancellation of call context in OkHttp engine",
"issues": [
"awslabs/smithy-kotlin#1061"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,19 @@ public class OkHttpEngine(
val engineCall = client.newCall(engineRequest)
val engineResponse = mapOkHttpExceptions { engineCall.executeAsync() }

callContext.job.invokeOnCompletion {
engineResponse.body.close()
}

val response = engineResponse.toSdkResponse()
val requestTime = Instant.fromEpochMilliseconds(engineResponse.sentRequestAtMillis)
val responseTime = Instant.fromEpochMilliseconds(engineResponse.receivedResponseAtMillis)

return OkHttpCall(request, response, requestTime, responseTime, callContext, engineCall)
return OkHttpCall(request, response, requestTime, responseTime, callContext, engineCall).also { call ->
callContext.job.invokeOnCompletion { cause ->
// If cause is non-null that means the job was cancelled (CancellationException) or failed (anything
// else). In both cases we need to ensure that the engine-side resources are cleaned up completely
// since they wouldn't otherwise be. https://github.com/smithy-lang/smithy-kotlin/issues/1061
if (cause != null) call.cancelInFlight()
engineResponse.body.close()
}
}
}

override fun shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,45 @@ class AsyncStressTest : AbstractEngineTest() {
assertEquals(engineJobsBefore.size, engineJobsAfter.size, message)
}
}

@Test
fun testJobCancellation() = testEngines {
// https://github.com/smithy-lang/smithy-kotlin/issues/1061

test { _, client ->
val req = HttpRequest {
testSetup()
url.path.decoded = "slow"
}

// Expect CancellationException because we're cancelling
assertFailsWith<CancellationException> {
coroutineScope {
val parentScope = this
val call = client.call(req)

val bytes = async {
delay(100.milliseconds)

try {
// The server side is feeding us bytes very slowly. This shouldn't complete before the
// parentScope cancellation happens below.
call.response.body.readAll()
} catch (e: Throwable) {
// "IllegalStateException: Unbalanced enter/exit" will be thrown if body closed improperly
assertIsNot<IllegalStateException>(e)
null
}
}

val cancellation = async {
delay(400.milliseconds)
parentScope.cancel("Cancelling!")
}

awaitAll(bytes, cancellation)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package aws.smithy.kotlin.runtime.http.test.suite
import io.ktor.server.application.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlinx.coroutines.delay
import kotlin.time.Duration.Companion.milliseconds

internal fun Application.concurrentTests() {
routing {
Expand All @@ -18,5 +20,17 @@ internal fun Application.concurrentTests() {
call.respondText(text.repeat(respSize / text.length))
}
}

route("slow") {
get {
val chunk = ByteArray(256) { it.toByte() }
call.respondOutputStream {
repeat(10) {
delay(200.milliseconds)
write(chunk)
}
}
}
}
}
}

0 comments on commit 28a28f5

Please sign in to comment.