Skip to content

Commit

Permalink
Add timeout for body parsing
Browse files Browse the repository at this point in the history
For a long-running connections (for example ones coming from a load-balancer),
we used to wait for body text stream to complete and therefore hanged the
request infinitely. This caused performance issues.
Now, we detect if there was no activity for a given body text for the period
of time and short-circuit the processing early.
We extend timeout in testing to accomodate for virtual environment performance.
  • Loading branch information
peel committed Mar 19, 2024
1 parent 2bff968 commit 4ab8c30
Show file tree
Hide file tree
Showing 26 changed files with 150 additions and 16 deletions.
2 changes: 2 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
networking {
maxConnections = 1024
idleTimeout = 610 seconds
responseHeaderTimeout = 2 seconds
bodyReadTimeout = 500 millis
}

enableDefaultRedirect = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ object Config {

case class Networking(
maxConnections: Int,
idleTimeout: FiniteDuration
idleTimeout: FiniteDuration,
responseHeaderTimeout: FiniteDuration,
bodyReadTimeout: FiniteDuration
)

case class License(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ object HttpServer {
.withHttpApp(hstsMiddleware(hsts, routes.orNotFound))
.withIdleTimeout(networking.idleTimeout)
.withMaxConnections(networking.maxConnections)
.withResponseHeaderTimeout(networking.responseHeaderTimeout)
.cond(secure, _.withSslContext(SSLContext.getDefault))
.resource

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright (c) 2013-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/
package com.snowplowanalytics.snowplow.collector.core

import scala.concurrent.duration.FiniteDuration
import cats.effect.Async
import fs2.{Pipe, Pull}

object Pipes {
def timeoutOnIdle[F[_]: Async, A](duration: FiniteDuration): Pipe[F, A, A] =
_.pull.timed { timedPull =>
def go(timedPull: Pull.Timed[F, A]): Pull[F, A, Unit] =
timedPull.timeout(duration) >>
timedPull.uncons.flatMap {
case Some((Right(elems), next)) => Pull.output(elems) >> go(next)
case _ => Pull.done
}

go(timedPull)
}.stream
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
*/
package com.snowplowanalytics.snowplow.collector.core

import scala.concurrent.duration.FiniteDuration
import cats.implicits._
import cats.effect.Sync
import cats.effect.{Async, Sync}
import org.http4s._
import org.http4s.dsl.Http4sDsl
import org.http4s.implicits._
import com.comcast.ip4s.Dns

class Routes[F[_]: Sync](
class Routes[F[_]: Async](
enableDefaultRedirect: Boolean,
enableRootResponse: Boolean,
enableCrossdomainTracking: Boolean,
bodyReadTimeout: FiniteDuration,
service: IService[F]
) extends Http4sDsl[F] {

Expand Down Expand Up @@ -49,7 +51,7 @@ class Routes[F[_]: Sync](
case req @ POST -> Root / vendor / version =>
val path = service.determinePath(vendor, version)
service.cookie(
body = req.bodyText.compile.string.map(Some(_)),
body = req.bodyText.through(Pipes.timeoutOnIdle(bodyReadTimeout)).compile.string.map(Some(_)),
path = path,
request = req,
pixelExpected = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ object Run {
config.enableDefaultRedirect,
config.rootResponse.enabled,
config.crossDomain.enabled,
config.networking.responseHeaderTimeout,
collectorService
).value,
if (config.ssl.enable) config.ssl.port else config.port,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.snowplowanalytics.snowplow.collector.core

import scala.concurrent.duration._
import org.specs2.mutable.Specification
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.Stream

class PipesSpec extends Specification {

"Pipes#timeoutOnIdle" should {
"allow terminating a stream early when idle" in {
Stream
.emits[IO, Int](Vector(1, 2, 3))
.onComplete(Stream.empty[IO].delayBy(20.seconds))
.through(Pipes.timeoutOnIdle(100.millis))
.compile
.count
.unsafeRunSync() must beEqualTo(3)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ class RoutesSpec extends Specification {
) = {
val service = new TestService()
val routes =
new Routes(enabledDefaultRedirect, enableRootResponse, enableCrossdomainTracking, service).value.orNotFound
new Routes(enabledDefaultRedirect, enableRootResponse, enableCrossdomainTracking, 500.millis, service)
.value
.orNotFound
val routesWithHsts = HttpServer.hstsMiddleware(Config.HSTS(enableHsts, 180.days), routes)
(service, routesWithHsts)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ object TestUtils {
),
networking = Networking(
1024,
610.seconds
610.seconds,
2.second,
500.millis
),
enableDefaultRedirect = false,
redirectDomains = Set.empty[String],
Expand Down
5 changes: 5 additions & 0 deletions kafka/src/it/resources/collector.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,9 @@ collector {
}
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ object KafkaConfigSpec {
autoGeneratedId = None
),
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
maxConnections = 1024,
idleTimeout = 610.seconds,
responseHeaderTimeout = 2.seconds,
bodyReadTimeout = 500.millis
),
license = Config.License(accept = true)
)
Expand Down
5 changes: 5 additions & 0 deletions kinesis/src/it/resources/collector-cookie-anonymous.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ collector {
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}

"cookie": {
"enabled": true,
"name": "sp",
Expand Down
5 changes: 5 additions & 0 deletions kinesis/src/it/resources/collector-cookie-attributes-1.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ collector {
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}

"cookie": {
"enabled": true,
"name": "greatName",
Expand Down
5 changes: 5 additions & 0 deletions kinesis/src/it/resources/collector-cookie-attributes-2.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ collector {
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}

"cookie": {
"enabled": true,
"name": "sp",
Expand Down
5 changes: 5 additions & 0 deletions kinesis/src/it/resources/collector-cookie-domain.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ collector {
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}

"cookie": {
"enabled": true,
"name": "sp",
Expand Down
5 changes: 5 additions & 0 deletions kinesis/src/it/resources/collector-cookie-fallback.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ collector {
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}

"cookie": {
"enabled": true,
"name": "sp",
Expand Down
5 changes: 5 additions & 0 deletions kinesis/src/it/resources/collector-cookie-no-domain.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ collector {
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}

"cookie": {
"enabled": true,
"name": "sp",
Expand Down
6 changes: 6 additions & 0 deletions kinesis/src/it/resources/collector-custom-paths.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ collector {
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}


"paths": {
"/acme/track": "/com.snowplowanalytics.snowplow/tp2",
"/acme/redirect": "/r/tp2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ collector {
maxBytes = ${MAX_BYTES}
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}

"doNotTrackCookie": {
"enabled": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ collector {
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}

"doNotTrackCookie": {
"enabled": true,
"name" : "foo",
Expand Down
5 changes: 5 additions & 0 deletions kinesis/src/it/resources/collector.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ collector {
maxBytes = ${MAX_BYTES}
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ object KinesisConfigSpec {
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
maxConnections = 1024,
idleTimeout = 610.seconds,
responseHeaderTimeout = 2.seconds,
bodyReadTimeout = 500.millis
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,10 @@ object NsqConfigSpec {
autoGeneratedId = None
),
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
maxConnections = 1024,
idleTimeout = 610.seconds,
responseHeaderTimeout = 2.seconds,
bodyReadTimeout = 500.millis
),
license = Config.License(accept = true)
)
Expand Down
5 changes: 5 additions & 0 deletions pubsub/src/it/resources/collector.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ collector {
maxBytes = ${MAX_BYTES}
}
}

networking {
responseHeaderTimeout = 10 seconds
bodyReadTimeout = 2 seconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ object ConfigSpec {
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
maxConnections = 1024,
idleTimeout = 610.seconds,
responseHeaderTimeout = 2.seconds,
bodyReadTimeout = 500.millis
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ object SqsConfigSpec {
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
maxConnections = 1024,
idleTimeout = 610.seconds,
responseHeaderTimeout = 2.seconds,
bodyReadTimeout = 500.millis
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down

0 comments on commit 4ab8c30

Please sign in to comment.