Skip to content

Commit

Permalink
Update Delegator interface to return a single value only (linkerd#1862)
Browse files Browse the repository at this point in the history
The Delegator interface defines a `delegate` method that returns an Activity.  However, in all cases, only the first value of this Activity was ever used (for the delegator UI).  For complex delegations with fast changing values, a delegation lookup could be quite expensive because many values would be calculated before the Activity was canceled.

We modify the `delegate` method to return a Future instead of an Activity.  This implies changes to all of Namerd's interfaces.

* thriftNamerInterface: We change the client to do a one-shot call with an empty stamp and ignore the stamp in the response instead of long-polling.  For backwards compatibility with long-polling clients, we modify the server to keep requests that have a non-empty stamp pending until cancelled by the client.
* http streaming: No change necessary because the delegate method never took advantage of http streaming.
* mesh: We deprecate the StreamDelegateTree method and modify the implementation to only return a single value on the stream.

I have manually tested all combinations of:
* client version = {1.3.5, HEAD}
* server version = {1.3.5, HEAD}
* interface = {thriftNamerInterface, mesh, http streaming}

Fixes linkerd#1846 

Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong authored Mar 20, 2018
1 parent 9223686 commit b78f094
Show file tree
Hide file tree
Showing 23 changed files with 160 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ object DelegateApiHandler {
(dtabTry, pathTry) match {
case (Return(d), Return(p)) =>
delegator.delegate(d, p)
.toFuture
.flatMap(JsonDelegateTree.mk).map { tree =>
val rsp = Response()
rsp.content = Codec.writeBuf(tree)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.twitter.concurrent.AsyncStream
import com.twitter.io.{Buf, Reader}
import com.twitter.logging.Logger
import com.twitter.util.{Future, Try}

import scala.util.control.NonFatal

class JsonStreamParser(mapper: ObjectMapper with ScalaObjectMapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,6 @@ object Stream {
}
}

def fromFuture[T](fut: Future[T]): Stream[T] = deferred(fut.map(value))

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,14 @@ object Client {
override def delegate(
dtab: Dtab,
tree: NameTree[Name.Path]
): Activity[DelegateTree[Name.Bound]] = {
val open = () => delegator.streamDelegateTree(mkDelegateTreeReq(root, dtab, tree))
streamActivity(open, decodeDelegateTree, backoffs, timer)
): Future[DelegateTree[Name.Bound]] = {
val req = mkDelegateTreeReq(root, dtab, tree)
delegator.getDelegateTree(req).flatMap { delegateTree =>
decodeDelegateTree(delegateTree) match {
case Some(decoded) => Future.value(decoded)
case None => Future.exception(new Exception("Failed to decode delegate tree: " + delegateTree))
}
}
}

private[this] val resolve: Path => Var[Addr] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.buoyant.transformer.perHost

import com.twitter.finagle.Name.Bound
import com.twitter.finagle._
import com.twitter.util.Activity
import com.twitter.util.{Activity, Future}
import io.buoyant.namer.{DelegateTree, DelegatingNameTreeTransformer}
import java.net.InetSocketAddress

Expand Down Expand Up @@ -35,6 +35,6 @@ class PortTransformer(prefix: Path, port: Int) extends DelegatingNameTreeTransfo
override protected def transform(tree: NameTree[Bound]): Activity[NameTree[Bound]] =
Activity.value(tree.map(mapBound))

override protected def transformDelegate(tree: DelegateTree[Bound]): Activity[DelegateTree[Bound]] =
Activity.value(DelegatingNameTreeTransformer.transformDelegate(tree, mapBound))
override protected def transformDelegate(tree: DelegateTree[Bound]): Future[DelegateTree[Bound]] =
Future.value(DelegatingNameTreeTransformer.transformDelegate(tree, mapBound))
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package io.buoyant.transformer

import com.twitter.finagle.Name.Bound
import com.twitter.finagle._
import com.twitter.util.{Activity, Var}
import io.buoyant.namer.{DelegateTree, DelegatingNameTreeTransformer}
import com.twitter.util.{Activity, Future, Var}
import io.buoyant.namer.{DelegateTree, DelegatingNameTreeTransformer, RichActivity}

/**
* Transforms a bound name tree to only include addresses in
Expand All @@ -30,8 +30,8 @@ class GatewayTransformer(
gatewayPredicate: (Address, Address) => Boolean
) extends DelegatingNameTreeTransformer {

override protected def transformDelegate(tree: DelegateTree[Bound]): Activity[DelegateTree[Bound]] =
gatewayTree.map { gateways =>
override protected def transformDelegate(tree: DelegateTree[Bound]): Future[DelegateTree[Bound]] =
gatewayTree.toFuture.map { gateways =>
val routable = flatten(gateways.eval.toSet.flatten)
DelegatingNameTreeTransformer.transformDelegate(tree, mapBound(_, routable))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.buoyant.linkerd.admin.names

import com.twitter.finagle._
import com.twitter.util.{Var, Return}
import io.buoyant.linkerd.{TestProtocol, Linker}
import com.twitter.util.Var
import io.buoyant.linkerd.{Linker, TestProtocol}
import io.buoyant.namer._
import io.buoyant.test.Awaits
import org.scalatest.FunSuite
Expand Down Expand Up @@ -38,7 +38,7 @@ class DelegatorTest extends FunSuite with Awaits {
test("uses NamerInterpreter to resolve names") {
val path = Path.read("/nah/bro")
val dtab = Dtab.read("""/nah=>/#/namer;""")
assert(await(interpreter.delegate(dtab, path).toFuture) ==
assert(await(interpreter.delegate(dtab, path)) ==
DelegateTree.Delegate(path, Dentry.nop, DelegateTree.Leaf(
Path.read("/#/namer/bro"),
Dentry.read("/nah=>/#/namer"),
Expand All @@ -48,13 +48,13 @@ class DelegatorTest extends FunSuite with Awaits {

test("explain neg delegation") {
val path = Path.Utf8("nope")
assert(await(interpreter.delegate(dtab, path).toFuture) ==
assert(await(interpreter.delegate(dtab, path)) ==
DelegateTree.Neg(path, Dentry.nop))
}

test("explain delegate delegation") {
val path = Path.read("/meh/hey")
assert(await(interpreter.delegate(dtab, path).toFuture) ==
assert(await(interpreter.delegate(dtab, path)) ==
DelegateTree.Delegate(
path,
Dentry.nop,
Expand All @@ -64,7 +64,7 @@ class DelegatorTest extends FunSuite with Awaits {

test("explain alt delegation") {
val path = Path.read("/boo/lol")
assert(await(interpreter.delegate(dtab, path).toFuture) ==
assert(await(interpreter.delegate(dtab, path)) ==
DelegateTree.Delegate(path, Dentry.nop, DelegateTree.Alt(
Path.read("/foo/lol"),
Dentry.read("/boo=>/foo"),
Expand All @@ -76,7 +76,7 @@ class DelegatorTest extends FunSuite with Awaits {

test("explain bound delegation") {
val path = Path.read("/boo/humbug/ya")
assert(await(interpreter.delegate(dtab, path).toFuture) ==
assert(await(interpreter.delegate(dtab, path)) ==
DelegateTree.Delegate(path, Dentry.nop, DelegateTree.Alt(
Path.read("/foo/humbug/ya"),
Dentry.read("/boo=>/foo"),
Expand All @@ -96,7 +96,7 @@ class DelegatorTest extends FunSuite with Awaits {

test("explain error delegation") {
val path = Path.read("/beh/humbug")
assert(await(interpreter.delegate(dtab, path).toFuture) ==
assert(await(interpreter.delegate(dtab, path)) ==
DelegateTree.Delegate(
path,
Dentry.nop,
Expand Down
20 changes: 20 additions & 0 deletions linkerd/examples/namerd.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
# Use namerd for name resolution.
routers:
- protocol: http
label: named-thrift
interpreter:
kind: io.l5d.namerd
dst: /$/inet/localhost/4100
namespace: default
servers:
- port: 4140
ip: 0.0.0.0
- protocol: http
label: named-http
interpreter:
kind: io.l5d.namerd.http
experimental: true
dst: /$/inet/localhost/4180
namespace: default
servers:
- port: 4141
ip: 0.0.0.0
- protocol: http
label: named-grpc
interpreter:
kind: io.l5d.mesh
dst: /$/inet/localhost/4321
root: /default
servers:
- port: 4142
ip: 0.0.0.0
1 change: 1 addition & 0 deletions mesh/core/src/main/protobuf/delegator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ service Delegator {
rpc StreamDtab (DtabReq) returns (stream DtabRsp) {}

rpc GetDelegateTree (DelegateTreeReq) returns (DelegateTreeRsp) {}
// DEPRECATED: This method will only return one value. Use GetDelegateTree instead.
rpc StreamDelegateTree (DelegateTreeReq) returns (stream DelegateTreeRsp) {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.buoyant.namer
import com.twitter.finagle.Name.Bound
import com.twitter.finagle._
import com.twitter.finagle.naming.NameInterpreter
import com.twitter.util.{Activity, Var}
import com.twitter.util.{Activity, Future, Var}

case class ConfiguredDtabNamer(
configuredDtab: Activity[Dtab],
Expand Down Expand Up @@ -45,8 +45,8 @@ case class ConfiguredDtabNamer(
override def delegate(
dtab: Dtab,
tree: NameTree[Name.Path]
): Activity[DelegateTree[Bound]] =
configuredDtab.flatMap { confDtab =>
): Future[DelegateTree[Bound]] =
configuredDtab.toFuture.flatMap { confDtab =>
namersInterpreter.delegate(confDtab ++ dtab, tree)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package io.buoyant.namer
import com.twitter.finagle.Name.Bound
import com.twitter.finagle._
import com.twitter.finagle.naming.NameInterpreter
import com.twitter.util.{Activity, Var}
import com.twitter.util.{Activity, Future, Var}
import io.buoyant.namer.DelegateTree._
import scala.util.control.{NonFatal, NoStackTrace}
import scala.util.control.{NoStackTrace, NonFatal}
import scala.{Exception => ScalaException}

object DefaultInterpreterConfig {
Expand Down Expand Up @@ -78,7 +78,7 @@ case class ConfiguredNamersInterpreter(namers: Seq[(Path, Namer)])
override def delegate(
dtab: Dtab,
tree: NameTree[Name.Path]
): Activity[DelegateTree[Bound]] = {
): Future[DelegateTree[Bound]] = {
val dtree = DelegateTree.fromNameTree(tree)
delegateBind(dtab, 0, dtree).map(_.simplified)
}
Expand All @@ -89,7 +89,7 @@ case class ConfiguredNamersInterpreter(namers: Seq[(Path, Namer)])
dtab: Dtab,
dentry: Dentry,
path: Path
): Activity[DelegateTree[Name]] = {
): Future[DelegateTree[Name]] = {
val matches: Seq[DelegateTree[Name.Path]] = dtab.reverse.collect {
case d@Dentry(prefix, dst) if prefix.matches(path) =>
val suff = path.drop(prefix.size)
Expand All @@ -102,13 +102,13 @@ case class ConfiguredNamersInterpreter(namers: Seq[(Path, Namer)])
case trees => DelegateTree.Alt(path, dentry, trees: _*)
}

val lookup: Activity[DelegateTree[Name]] = result match {
val lookup: Future[DelegateTree[Name]] = result match {
case DelegateTree.Neg(path, d) =>
this.lookup(path).map {
case NameTree.Neg => result
case tree => fromNameTree(path, d, tree)
}
case tree => Activity.value(tree)
}.toFuture
case tree => Future.value(tree)
}

lookup.handle { case NonFatal(e) => DelegateTree.Exception(path, dentry, e) }
Expand All @@ -118,17 +118,17 @@ case class ConfiguredNamersInterpreter(namers: Seq[(Path, Namer)])
dtab: Dtab,
depth: Int,
tree: DelegateTree[Name]
): Activity[DelegateTree[Name.Bound]] =
): Future[DelegateTree[Name.Bound]] =
if (depth > MaxDepth)
Activity.exception(new IllegalArgumentException("Max recursion level reached."))
Future.exception(new IllegalArgumentException("Max recursion level reached."))
else tree match {
case tree@Exception(_, _, _) => Activity.value(tree)
case tree@Empty(_, _) => Activity.value(tree)
case tree@Fail(_, _) => Activity.value(tree)
case tree@Neg(_, _) => Activity.value(tree)
case tree@Exception(_, _, _) => Future.value(tree)
case tree@Empty(_, _) => Future.value(tree)
case tree@Fail(_, _) => Future.value(tree)
case tree@Neg(_, _) => Future.value(tree)

case Leaf(path, dentry, bound@Name.Bound(_)) =>
Activity.value(Leaf(path, dentry, bound))
Future.value(Leaf(path, dentry, bound))

case Leaf(_, dentry, Name.Path(path)) =>
// Resolve this leaf path through the dtab and bind the resulting tree.
Expand All @@ -139,31 +139,29 @@ case class ConfiguredNamersInterpreter(namers: Seq[(Path, Namer)])
case Delegate(path, dentry, tree) =>
delegateBind(dtab, depth, tree).map(Delegate(path, dentry, _))

case Alt(path, dentry) => Activity.value(Neg(path, dentry))
case Alt(path, dentry) => Future.value(Neg(path, dentry))
case Alt(path, dentry, tree) => delegateBind(dtab, depth, tree).map(Delegate(path, dentry, _))
case Alt(path, dentry, trees@_*) =>
// Unlike Namer.bind, we bind *all* alternate trees.
val acts = trees.map { tree =>
delegateBind(dtab, depth, tree).transform {
case Activity.Failed(e) => Activity.value(Exception(path, dentry, e))
case state => Activity(Var(state))
val futs = trees.map { tree =>
delegateBind(dtab, depth, tree).handle {
case e => Exception(path, dentry, e)
}
}
Activity.collect(acts).map { alts =>
Future.collect(futs).map { alts =>
Alt(path, dentry, alts: _*)
}
case Union(path, dentry) => Activity.value(Neg(path, dentry))
case Union(path, dentry) => Future.value(Neg(path, dentry))
case Union(path, dentry, Weighted(_, tree)) =>
delegateBind(dtab, depth, tree).map(Delegate(path, dentry, _))
case Union(path, dentry, trees@_*) =>
val acts = trees.map {
val futs = trees.map {
case Weighted(w, tree) =>
delegateBind(dtab, depth, tree).transform {
case Activity.Failed(e) => Activity.value(Exception(path, dentry, e))
case state => Activity(Var(state))
delegateBind(dtab, depth, tree).handle {
case e => Exception(path, dentry, e)
}.map(Weighted(w, _))
}
Activity.collect(acts).map { branches =>
Future.collect(futs).map { branches =>
Union(path, dentry, branches: _*)
}
}
Expand Down
6 changes: 3 additions & 3 deletions namer/core/src/main/scala/io/buoyant/namer/Delegator.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package io.buoyant.namer

import com.twitter.finagle.{Dentry, Dtab, Name, NameTree, Path}
import com.twitter.util.Activity
import com.twitter.util.{Activity, Future}

trait Delegator {

def delegate(
dtab: Dtab,
tree: NameTree[Name.Path]
): Activity[DelegateTree[Name.Bound]]
): Future[DelegateTree[Name.Bound]]

final def delegate(
dtab: Dtab,
path: Path
): Activity[DelegateTree[Name.Bound]] =
): Future[DelegateTree[Name.Bound]] =
delegate(dtab, NameTree.Leaf(Name.Path(path)))

def dtab: Activity[Dtab]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.buoyant.namer
import com.twitter.finagle.Name.Bound
import com.twitter.finagle.naming.NameInterpreter
import com.twitter.finagle._
import com.twitter.util.Activity
import com.twitter.util.{Activity, Future}

/**
* A NameTreeTransformer performs some kind of transformation on bound
Expand Down Expand Up @@ -52,7 +52,7 @@ trait NameTreeTransformer {
*/
trait DelegatingNameTreeTransformer extends NameTreeTransformer {

protected def transformDelegate(tree: DelegateTree[Name.Bound]): Activity[DelegateTree[Name.Bound]]
protected def transformDelegate(tree: DelegateTree[Name.Bound]): Future[DelegateTree[Name.Bound]]

/** Like wrap, but preserving the ability of the NameInterpreter to delegate */
def delegatingWrap(underlying: NameInterpreter with Delegator): NameInterpreter with Delegator = new NameInterpreter with Delegator {
Expand All @@ -62,7 +62,7 @@ trait DelegatingNameTreeTransformer extends NameTreeTransformer {
override def delegate(
dtab: Dtab,
tree: NameTree[Name.Path]
): Activity[DelegateTree[Bound]] =
): Future[DelegateTree[Bound]] =
underlying.delegate(dtab, tree).flatMap(transformDelegate)

override def dtab: Activity[Dtab] = underlying.dtab
Expand Down Expand Up @@ -114,8 +114,8 @@ trait FilteringNameTreeTransformer extends DelegatingNameTreeTransformer {

}

override protected def transformDelegate(tree: DelegateTree[Name.Bound]): Activity[DelegateTree[Name.Bound]] =
Activity.value(tree.flatMap { leaf =>
override protected def transformDelegate(tree: DelegateTree[Name.Bound]): Future[DelegateTree[Name.Bound]] =
Future.value(tree.flatMap { leaf =>
val bound = mapBound(leaf.value)
val path = bound.id match {
case id: Path => id
Expand Down
16 changes: 16 additions & 0 deletions namerd/examples/all.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
admin:
port: 9991

storage:
kind: io.l5d.inMemory
namespaces:
default: /svc => /#/io.l5d.fs;

namers:
- kind: io.l5d.fs
rootDir: namerd/examples/disco

interfaces:
- kind: io.l5d.httpController
- kind: io.l5d.mesh
- kind: io.l5d.thriftNameInterpreter
Loading

0 comments on commit b78f094

Please sign in to comment.