Skip to content

Commit

Permalink
Add a backoff retry timer to the ConsulDtabStore observer (linkerd#1742)
Browse files Browse the repository at this point in the history
Add a backoff retry timer to the consul observer to query the Consul API less often

Signed-off-by: Dennis Adjei-Baah <[email protected]>
  • Loading branch information
dadjeibaah authored and siggy committed Dec 15, 2017
1 parent 544060c commit fcea27d
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 95 deletions.
4 changes: 2 additions & 2 deletions consul/src/main/scala/io/buoyant/consul/v1/KvApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import com.twitter.finagle.stats.{DefaultStatsReceiver, StatsReceiver}
import com.twitter.util._

object KvApi {
def apply(c: Client): KvApi = new KvApi(c, s"/$versionString")
def apply(c: Client, backoff: Stream[Duration]): KvApi = new KvApi(c, s"/$versionString", backoff)
}

class KvApi(
val client: Client,
val uriPrefix: String,
val backoffs: Stream[Duration] = Backoff.exponentialJittered(1.milliseconds, 5.seconds),
val backoffs: Stream[Duration],
val stats: StatsReceiver = DefaultStatsReceiver
) extends BaseApi with Closable {
val kvPrefix = s"$uriPrefix/kv"
Expand Down
66 changes: 34 additions & 32 deletions consul/src/test/scala/io/buoyant/consul/v1/KvApiTest.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.buoyant.consul.v1

import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.service.Backoff
import com.twitter.finagle.{Failure, Service}
import com.twitter.io.Buf
import com.twitter.util.Future
import com.twitter.util.{Duration, Future}
import io.buoyant.test.{Awaits, Exceptions}
import org.scalatest.FunSuite

Expand All @@ -16,6 +17,7 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
val deleteOkBuf = Buf.Utf8("""true""")
val deleteFailBuf = Buf.Utf8("""false""")
var lastUri = ""
val constBackoff = Backoff.const(Duration.Zero)

def stubService(buf: Buf) = Service.mk[Request, Response] { req =>
val rsp = Response()
Expand All @@ -29,7 +31,7 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
test("list returns an indexed seq of key names") {
val service = stubService(listBuf)

val result = await(KvApi(service).list("/foo/"))
val result = await(KvApi(service, constBackoff).list("/foo/"))

assert(result.index == Some("4"))
assert(result.value.size == 2)
Expand All @@ -45,13 +47,13 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
Future.value(rsp)
}
assertThrows[NotFound](
await(KvApi(failureService).list("/wrong/path/"))
await(KvApi(failureService, constBackoff).list("/wrong/path/"))
)
}

test("list supports consistency parameter") {
val service = stubService(listBuf)
val api = KvApi(service)
val api = KvApi(service, constBackoff)

await(api.list("/foo/"))
assert(!lastUri.contains("consistent"))
Expand All @@ -71,15 +73,15 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
test("get returns an indexed value") {
val service = stubService(getBuf)

val result = await(KvApi(service).get("/some/path/to/key"))
val result = await(KvApi(service, constBackoff).get("/some/path/to/key"))
assert(result.index == Some("4"))
assert(result.value == "foobar")
}

test("get uses raw values") {
val service = stubService(putFailBuf)

await(KvApi(service).get("/path/to/key"))
await(KvApi(service, constBackoff).get("/path/to/key"))
assert(lastUri.contains(s"raw=true"))
}

Expand All @@ -92,13 +94,13 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
Future.value(rsp)
}
assertThrows[NotFound](
await(KvApi(failureService).get("/wrong/path"))
await(KvApi(failureService, constBackoff).get("/wrong/path"))
)
}

test("get supports consistency parameter") {
val service = stubService(getBuf)
val api = KvApi(service)
val api = KvApi(service, constBackoff)

await(api.get("/path/to/key"))
assert(!lastUri.contains("consistent"))
Expand All @@ -118,7 +120,7 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
test("multiGet returns an indexed seq of values") {
val service = stubService(multiGetBuf)

val result = await(KvApi(service).multiGet("/sample"))
val result = await(KvApi(service, constBackoff).multiGet("/sample"))
assert(result.index == Some("4"))
assert(result.value.size == 1)
assert(result.value.head.decoded == Some("foobar"))
Expand All @@ -127,14 +129,14 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
test("multiGet by default is non-recurse") {
val service = stubService(multiGetBuf)

await(KvApi(service).multiGet("/path/to/key"))
await(KvApi(service, constBackoff).multiGet("/path/to/key"))
assert(!lastUri.contains("recurse"))
}

test("multiGet with recurse set to true adds a recurse parameter") {
val service = stubService(multiGetBuf)

await(KvApi(service).multiGet("/path/to/key", recurse = Some(true)))
await(KvApi(service, constBackoff).multiGet("/path/to/key", recurse = Some(true)))
assert(lastUri.contains("recurse"))
}

Expand All @@ -147,13 +149,13 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
Future.value(rsp)
}
assertThrows[NotFound](
await(KvApi(failureService).multiGet("/wrong/path"))
await(KvApi(failureService, constBackoff).multiGet("/wrong/path"))
)
}

test("multiGet supports consistency parameter") {
val service = stubService(multiGetBuf)
val api = KvApi(service)
val api = KvApi(service, constBackoff)

await(api.multiGet("/path/to/key"))
assert(!lastUri.contains("consistent"))
Expand All @@ -173,30 +175,30 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
test("put returns true on success") {
val service = stubService(putOkBuf)

val result = await(KvApi(service).put("/path/to/key", "foobar"))
val result = await(KvApi(service, constBackoff).put("/path/to/key", "foobar"))
assert(result)
}

test("put returns false on failure") {
val service = stubService(putFailBuf)

val result = await(KvApi(service).put("/path/to/key", "foobar"))
val result = await(KvApi(service, constBackoff).put("/path/to/key", "foobar"))
assert(!result)
}

test("put cas flag") {
val service = stubService(putFailBuf)

await(KvApi(service).put("/path/to/key", "foobar", cas = Some("0")))
await(KvApi(service, constBackoff).put("/path/to/key", "foobar", cas = Some("0")))
assert(lastUri.contains(s"cas=0"))

await(KvApi(service).put("/path/to/key", "foobar"))
await(KvApi(service, constBackoff).put("/path/to/key", "foobar"))
assert(!lastUri.contains(s"cas"))
}

test("put supports consistency parameter") {
val service = stubService(putOkBuf)
val api = KvApi(service)
val api = KvApi(service, constBackoff)

await(api.put("/path/to/key", "foobar"))
assert(!lastUri.contains("consistent"))
Expand All @@ -216,44 +218,44 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
test("delete returns true on success") {
val service = stubService(putOkBuf)

val result = await(KvApi(service).delete("/path/to/key"))
val result = await(KvApi(service, constBackoff).delete("/path/to/key"))
assert(result)
}

test("delete returns false on failure") {
val service = stubService(putFailBuf)

val result = await(KvApi(service).delete("/path/to/key"))
val result = await(KvApi(service, constBackoff).delete("/path/to/key"))
assert(!result)
}

test("delete cas flag") {
val service = stubService(putFailBuf)

await(KvApi(service).delete("/path/to/key", cas = Some("0")))
await(KvApi(service, constBackoff).delete("/path/to/key", cas = Some("0")))
assert(lastUri.contains(s"cas=0"))

await(KvApi(service).delete("/path/to/key"))
await(KvApi(service, constBackoff).delete("/path/to/key"))
assert(!lastUri.contains(s"cas"))
}

test("delete by default is non-recurse") {
val service = stubService(deleteOkBuf)

await(KvApi(service).delete("/path/to/key"))
await(KvApi(service, constBackoff).delete("/path/to/key"))
assert(!lastUri.contains("recurse"))
}

test("delete with recurse set to true adds a recurse parameter") {
val service = stubService(deleteOkBuf)

await(KvApi(service).delete("/path/to/key", recurse = Some(true)))
await(KvApi(service, constBackoff).delete("/path/to/key", recurse = Some(true)))
assert(lastUri.contains("recurse"))
}

test("delete supports consistency parameter") {
val service = stubService(putOkBuf)
val api = KvApi(service)
val api = KvApi(service, constBackoff)

await(api.delete("/path/to/key"))
assert(!lastUri.contains("consistent"))
Expand All @@ -272,9 +274,9 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {

test("blocking index returned from one call can be used to set index on subsequent calls") {
val service = stubService(getBuf)
val index = await(KvApi(service).get("/some/path")).index.get
val index = await(KvApi(service, constBackoff).get("/some/path")).index.get

await(KvApi(service).get("/some/path", blockingIndex = Some(index)))
await(KvApi(service, constBackoff).get("/some/path", blockingIndex = Some(index)))
assert(lastUri.contains(s"index=$index"))
}

Expand All @@ -283,7 +285,7 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
Future.exception(new Exception("I have no idea who to talk to"))
}
assertThrows[Exception](
await(KvApi(failureService).list("/foo/"))
await(KvApi(failureService, constBackoff).list("/foo/"))
)
}

Expand All @@ -301,7 +303,7 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
Future.exception(new Exception("I have no idea who to talk to"))
}
}
val result = await(KvApi(failureService).list("/some/path/", retry = true))
val result = await(KvApi(failureService, constBackoff).list("/some/path/", retry = true))
assert(result.index == Some("4"))
assert(result.value == List("foo/bar/", "foo/baz/"))
}
Expand All @@ -312,7 +314,7 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
Failure("consul observation released", Failure.Interrupted)
)
}
val api = KvApi(failureService)
val api = KvApi(failureService, constBackoff)
val result = api.list("/some/path/", retry = true)
assertThrows[Failure](
await(result)
Expand All @@ -330,7 +332,7 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
}

assertThrows[UnexpectedResponse](
await(KvApi(failureService).list("/some/path/", datacenter = Some("non-existent dc")))
await(KvApi(failureService, constBackoff).list("/some/path/", datacenter = Some("non-existent dc")))
)
}

Expand All @@ -342,7 +344,7 @@ class KvApiTest extends FunSuite with Awaits with Exceptions {
Future.value(rsp)
}
assertThrows[Forbidden](
await(KvApi(failureService).get("/some/path"))
await(KvApi(failureService, constBackoff).get("/some/path"))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import com.twitter.conversions.time._
import com.twitter.finagle.liveness.{FailureAccrualFactory, FailureAccrualPolicy}
import com.twitter.finagle.service.Backoff
import com.twitter.util.Duration
import io.buoyant.config.{PolymorphicConfig, ConfigInitializer}
import io.buoyant.config.{ConfigInitializer, PolymorphicConfig}
import io.buoyant.namer.BackoffConfig

abstract class FailureAccrualInitializer extends ConfigInitializer

Expand Down
35 changes: 2 additions & 33 deletions linkerd/core/src/main/scala/io/buoyant/linkerd/SvcConfig.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package io.buoyant.linkerd

import com.fasterxml.jackson.annotation.{JsonIgnore, JsonProperty, JsonSubTypes}
import com.fasterxml.jackson.annotation.{JsonIgnore, JsonProperty}
import com.twitter.conversions.time._
import com.twitter.finagle.{param, Stack}
import com.twitter.finagle.buoyant.TotalTimeout
import com.twitter.finagle.service._
import com.twitter.finagle.buoyant.ParamsMaybeWith
import com.twitter.util.Duration
import io.buoyant.config.PolymorphicConfig
import io.buoyant.namer.BackoffConfig
import io.buoyant.router.{ClassifiedRetries, RetryBudgetConfig}
import io.buoyant.router.RetryBudgetModule.{param => ev}

Expand Down Expand Up @@ -74,33 +73,3 @@ case class RetriesConfig(
def mkBackoff: Option[ClassifiedRetries.Backoffs] =
backoff.map(_.mk).map(ClassifiedRetries.Backoffs(_))
}

@JsonSubTypes(Array(
new JsonSubTypes.Type(value = classOf[ConstantBackoffConfig], name = "constant"),
new JsonSubTypes.Type(value = classOf[JitteredBackoffConfig], name = "jittered")
))
abstract class BackoffConfig extends PolymorphicConfig {
@JsonIgnore
def mk: Stream[Duration]
}

case class ConstantBackoffConfig(ms: Int) extends BackoffConfig {
// ms defaults to 0 when not specified
def mk = Backoff.constant(ms.millis)
}

/** See http://www.awsarchitectureblog.com/2015/03/backoff.html */
case class JitteredBackoffConfig(minMs: Option[Int], maxMs: Option[Int]) extends BackoffConfig {
def mk = {
val min = minMs match {
case Some(ms) => ms.millis
case None => throw new IllegalArgumentException("'minMs' must be specified")
}
val max = maxMs match {
case Some(ms) => ms.millis
case None => throw new IllegalArgumentException("'maxMs' must be specified")
}
Backoff.decorrelatedJittered(min, max)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.buoyant.linkerd
import com.twitter.finagle.service.Retries
import com.twitter.util.Duration
import io.buoyant.config.Parser
import io.buoyant.namer.{ConstantBackoffConfig, JitteredBackoffConfig}
import io.buoyant.router.RetryBudgetConfig
import org.scalatest.FunSuite

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package io.buoyant.linkerd
import com.twitter.finagle.{Dtab, Path, Stack}
import com.twitter.finagle.naming.buoyant.DstBindingFactory
import io.buoyant.config.Parser
import io.buoyant.namer.{ConfiguredNamersInterpreter, InterpreterInitializer, TestInterpreter, TestInterpreterInitializer}
import io.buoyant.namer.{ConfiguredNamersInterpreter, InterpreterInitializer, JitteredBackoffConfig, TestInterpreter, TestInterpreterInitializer}
import io.buoyant.router.{Originator, RetryBudgetConfig, RoutingFactory}
import io.buoyant.test.Exceptions
import java.net.InetAddress

import org.scalatest.FunSuite

class RouterTest extends FunSuite with Exceptions {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package io.buoyant.linkerd.failureAccrual

import io.buoyant.config.Parser
import io.buoyant.linkerd.{ConstantBackoffConfig, FailureAccrualConfig, JitteredBackoffConfig}
import io.buoyant.linkerd.FailureAccrualConfig
import io.buoyant.test.FunSuite
import org.scalatest.OptionValues
import com.twitter.conversions.time._
import io.buoyant.namer.{ConstantBackoffConfig, JitteredBackoffConfig}
import org.scalatest.prop.PropertyChecks
import org.scalacheck.Gen

Expand Down
Loading

0 comments on commit fcea27d

Please sign in to comment.