Skip to content

Commit

Permalink
Add timeout for body parsing
Browse files Browse the repository at this point in the history
For a long-running connections (for example ones coming from a load-balancer),
we used to wait for body text stream to complete and therefore hanged the
request infinitely. This caused performance issues.
Now, we detect if there was no activity for a given body text for the period
of time and short-circuit the processing early.
  • Loading branch information
peel committed Mar 15, 2024
1 parent 2bff968 commit 4c4b327
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 16 deletions.
2 changes: 2 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
networking {
maxConnections = 1024
idleTimeout = 610 seconds
responseHeaderTimeout = 2 seconds
bodyReadTimeout = 500 millis
}

enableDefaultRedirect = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ object Config {

case class Networking(
maxConnections: Int,
idleTimeout: FiniteDuration
idleTimeout: FiniteDuration,
responseHeaderTimeout: FiniteDuration,
bodyReadTimeout: FiniteDuration
)

case class License(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ object HttpServer {
.withHttpApp(hstsMiddleware(hsts, routes.orNotFound))
.withIdleTimeout(networking.idleTimeout)
.withMaxConnections(networking.maxConnections)
.withResponseHeaderTimeout(networking.responseHeaderTimeout)
.cond(secure, _.withSslContext(SSLContext.getDefault))
.resource

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
*/
package com.snowplowanalytics.snowplow.collector.core

import scala.concurrent.duration.FiniteDuration
import cats.implicits._
import cats.effect.Sync
import cats.effect.{Async, Sync}
import org.http4s._
import org.http4s.dsl.Http4sDsl
import org.http4s.implicits._
import com.comcast.ip4s.Dns

class Routes[F[_]: Sync](
class Routes[F[_]: Async](
enableDefaultRedirect: Boolean,
enableRootResponse: Boolean,
enableCrossdomainTracking: Boolean,
bodyReadTimeout: FiniteDuration,
service: IService[F]
) extends Http4sDsl[F] {

Expand Down Expand Up @@ -49,7 +51,7 @@ class Routes[F[_]: Sync](
case req @ POST -> Root / vendor / version =>
val path = service.determinePath(vendor, version)
service.cookie(
body = req.bodyText.compile.string.map(Some(_)),
body = req.bodyText.through(Streams.timeoutOnIdle(bodyReadTimeout)).compile.string.map(Some(_)),
path = path,
request = req,
pixelExpected = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ object Run {
config.enableDefaultRedirect,
config.rootResponse.enabled,
config.crossDomain.enabled,
config.networking.responseHeaderTimeout,
collectorService
).value,
if (config.ssl.enable) config.ssl.port else config.port,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright (c) 2013-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/
package com.snowplowanalytics.snowplow.collector.core

import scala.concurrent.duration.FiniteDuration
import cats.effect.Async
import fs2.{Pipe, Pull}

object Streams {
def timeoutOnIdle[F[_]: Async, A](duration: FiniteDuration): Pipe[F, A, A] =
_.pull.timed { timedPull =>
def go(timedPull: Pull.Timed[F, A]): Pull[F, A, Unit] =
timedPull.timeout(duration) >>
timedPull.uncons.flatMap {
case Some((Right(elems), next)) => Pull.output(elems) >> go(next)
case _ => Pull.done
}

go(timedPull)
}.stream
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ class RoutesSpec extends Specification {
) = {
val service = new TestService()
val routes =
new Routes(enabledDefaultRedirect, enableRootResponse, enableCrossdomainTracking, service).value.orNotFound
new Routes(enabledDefaultRedirect, enableRootResponse, enableCrossdomainTracking, 500.millis, service)
.value
.orNotFound
val routesWithHsts = HttpServer.hstsMiddleware(Config.HSTS(enableHsts, 180.days), routes)
(service, routesWithHsts)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.snowplowanalytics.snowplow.collector.core

import scala.concurrent.duration._
import org.specs2.mutable.Specification
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.Stream

class StreamsSpec extends Specification {

"Streams" should {
"allow terminating a stream early when idle" in {
Stream
.emits[IO, Int](Vector(1, 2, 3))
.onComplete(Stream.empty[IO].delayBy(20.seconds))
.through(Streams.timeoutOnIdle(100.millis))
.compile
.count
.unsafeRunSync() must beEqualTo(3)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ object TestUtils {
),
networking = Networking(
1024,
610.seconds
610.seconds,
2.second,
500.millis
),
enableDefaultRedirect = false,
redirectDomains = Set.empty[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ object KafkaConfigSpec {
autoGeneratedId = None
),
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
maxConnections = 1024,
idleTimeout = 610.seconds,
responseHeaderTimeout = 2.seconds,
bodyReadTimeout = 500.millis
),
license = Config.License(accept = true)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ object KinesisConfigSpec {
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
maxConnections = 1024,
idleTimeout = 610.seconds,
responseHeaderTimeout = 2.seconds,
bodyReadTimeout = 500.millis
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,10 @@ object NsqConfigSpec {
autoGeneratedId = None
),
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
maxConnections = 1024,
idleTimeout = 610.seconds,
responseHeaderTimeout = 2.seconds,
bodyReadTimeout = 500.millis
),
license = Config.License(accept = true)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ object ConfigSpec {
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
maxConnections = 1024,
idleTimeout = 610.seconds,
responseHeaderTimeout = 2.seconds,
bodyReadTimeout = 500.millis
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ object SqsConfigSpec {
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
maxConnections = 1024,
idleTimeout = 610.seconds,
responseHeaderTimeout = 2.seconds,
bodyReadTimeout = 500.millis
),
streams = Config.Streams(
useIpAddressAsPartitionKey = false,
Expand Down

0 comments on commit 4c4b327

Please sign in to comment.