Skip to content

Commit

Permalink
Merge pull request #56 from scala-academy/feature/#14-castalia-metrics
Browse files Browse the repository at this point in the history
Feature/#14 castalia metrics
  • Loading branch information
effibennekers committed Feb 3, 2016
2 parents 6cbd6ca + 5779e56 commit 674f934
Show file tree
Hide file tree
Showing 24 changed files with 394 additions and 62 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ libraryDependencies ++= {
"com.typesafe.akka" %% "akka-stream-experimental" % akkaStreamV,
"com.typesafe.akka" %% "akka-http-core-experimental" % akkaStreamV,
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaStreamV,
"com.typesafe.akka" %% "akka-contrib" % "2.4.1",
"org.jliszka" %% "probability-monad" % "1.0.1",
"org.scalatest" %% "scalatest" % scalaTestV % "test,it",
"com.typesafe.akka" %% "akka-http-testkit-experimental" % akkaStreamV % "test,it",
Expand Down
1 change: 1 addition & 0 deletions src/it/scala/castalia/IntegrationTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ import org.scalatest._
trait IntegrationTestBase extends FunSpec with GivenWhenThen with Matchers {

val serverAddress = "localhost:9000"
val mngmtAddress = "localhost:9090"
}
3 changes: 2 additions & 1 deletion src/it/scala/castalia/IntegrationTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import castalia.utils.TestHelpers
import org.scalatest.{BeforeAndAfterAll, Suites}

class IntegrationTestSuite extends Suites (
new StubServerTest
new StubServerTest,
new MngmtServerTest
) with BeforeAndAfterAll {

val serverAddress = "localhost:9000"
Expand Down
67 changes: 67 additions & 0 deletions src/it/scala/castalia/MngmtServerTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package castalia

import com.twitter.finagle
import com.twitter.finagle.http._
import com.twitter.io.Bufs._
import com.twitter.util.Await
import org.scalatest.DoNotDiscover

@DoNotDiscover
class MngmtServerTest extends IntegrationTestBase {

val client = finagle.Http.newService(s"$serverAddress")
val mngmtClient = finagle.Http.newService(s"$mngmtAddress")
val mngmtPath = "castalia/manager/endpoints"

describe("starting up stubserver with 'castaliaT.json'") {

it("should be possible to configure a new endpoint via manager endpoint") {

val url = s"http://$mngmtAddress/$mngmtPath"
val data = """{"endpoint": "my/endpoint/$1", "responses": [{"ids": {"1": "0"},"httpStatusCode": 200}]}"""

When(s"""I do a HTTP POST to "$url" with new endpoint """)

val mngmtRequest = RequestBuilder().url(url).setHeader("Content-Type", "application/json").buildPost(utf8Buf(data))

Then("the response should be new endpoint")
val response: Response = Await.result(mngmtClient(mngmtRequest))

response.status shouldBe Status.Ok
response.contentString shouldBe "my/endpoint/$1"
}
}

it("should be possible to fetch metrics via manager endpoint") {

When("Getting endpoint metrics")
val mngmtRequest = Request(Method.Get, s"/$mngmtPath/metrics")
mngmtRequest.host = mngmtAddress

Then("the response should contain the metrics")
var responseMng: Response = Await.result(mngmtClient(mngmtRequest))

responseMng.status shouldBe Status.Ok
responseMng.contentType.get shouldBe "application/json"
responseMng.contentString.contains(""""my/endpoint/$1": {
| "calls": 0
| }""")

val request = Request(Method.Get, "/my/endpoint/0")
request.host = serverAddress

When("Getting endpoint metrics after a GET on the endpoint")
val responseStub: Response = Await.result(client(request))
responseStub.status shouldBe Status.Ok

Then("the response should contain updated stats")
responseMng = Await.result(mngmtClient(mngmtRequest))

responseMng.status shouldBe Status.Ok
responseMng.contentType.get shouldBe "application/json"
responseMng.contentString.contains(""""my/endpoint/$1": {
| "calls": 1
| }""")
}

}
26 changes: 14 additions & 12 deletions src/main/scala/castalia/Receptionist.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,25 @@ package castalia
import akka.actor._
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{RouteResult, RequestContext, Route}
import akka.pattern.ask
import akka.util.Timeout
import castalia.actors.{JsonResponsesEndpointActor, JsonResponseProviderEndpointActor, JsonEndpointActor}
import castalia.matcher.{RequestMatcher, Matcher}
import castalia.model.Messages.{Done, UpsertEndpoint}
import castalia.model.Model.{StubResponse, StubConfig}

import scala.concurrent.duration._
import castalia.actors.{JsonEndpointActor, JsonResponseProviderEndpointActor, JsonResponsesEndpointActor}
import castalia.matcher.{Matcher, RequestMatcher}
import castalia.metrics.MetricsCollectorActor
import castalia.model.Messages.{Done, EndpointMetricsGet, UpsertEndpoint}
import castalia.model.Model.{StubConfig, StubResponse}

object Receptionist {
def props: Props = Props[Receptionist]
}

class Receptionist extends Actor with ActorLogging {

val metricsCollector = createMetricsCollector

private def upsertEndPointActor(stubConfig: StubConfig, endpointMatcher : RequestMatcher) = {

def endpointActorFactory(stubConfig: StubConfig): JsonEndpointActor = {
if (stubConfig.responseprovider.isDefined) new JsonResponseProviderEndpointActor(stubConfig)
else if (stubConfig.responses.isDefined) new JsonResponsesEndpointActor(stubConfig)
if (stubConfig.responseprovider.isDefined) new JsonResponseProviderEndpointActor(stubConfig, metricsCollector)
else if (stubConfig.responses.isDefined) new JsonResponsesEndpointActor(stubConfig, metricsCollector)
else throw new UnsupportedOperationException
}

Expand All @@ -51,11 +48,16 @@ class Receptionist extends Actor with ActorLogging {
case _ => sender ! StubResponse(NotFound.intValue, NotFound.reason)
}

case EndpointMetricsGet =>
log.info("fetching metrics for all endpoints")
metricsCollector forward EndpointMetricsGet

// unexpected messages
case x =>
log.info("Receptionist received unexpected message: " + x.toString)
}

def createMetricsCollector: ActorRef = context.actorOf(MetricsCollectorActor.props, "metricsCollector")
}


28 changes: 28 additions & 0 deletions src/main/scala/castalia/actors/EndpointRequestInterceptor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package castalia.actors

import akka.actor.ActorRef
import akka.contrib.pattern.ReceivePipeline
import akka.contrib.pattern.ReceivePipeline.Inner
import castalia.matcher.RequestMatch
import castalia.model.Messages._
import castalia.model.Model.StubConfig

/**
* Triggers registration of endpoint stats by intercepting RequestMatch messages
* send to the actors extending this trait.
*/
trait EndpointRequestInterceptor {

this: ReceivePipeline =>

def stubConfig: StubConfig
def metricsCollector: ActorRef

metricsCollector ! EndpointMetricsInit(stubConfig.endpoint)

pipelineOuter {
case rm: RequestMatch => { metricsCollector ! EndpointCalled(stubConfig.endpoint) }
Inner(rm)
}

}
4 changes: 3 additions & 1 deletion src/main/scala/castalia/actors/JsonEndpointActor.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package castalia.actors

import akka.actor._
import akka.contrib.pattern.ReceivePipeline
import castalia.model.Model._

/**
* Actor that provides answers based on the json configuration that is used to create this actor
*
* Created by Jean-Marc van Leerdam on 2016-01-16
*/
abstract class JsonEndpointActor(myStubConfig: StubConfig) extends Actor with ActorLogging
trait JsonEndpointActor
extends Actor with ActorLogging with ReceivePipeline with EndpointRequestInterceptor



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package castalia.actors

import java.lang.reflect.Method

import akka.actor.ActorRef
import castalia.matcher.RequestMatch
import castalia.model.Model.{StubConfig, StubResponse}
import scala.concurrent.Future
Expand All @@ -14,11 +15,12 @@ import scala.language.existentials
*
* Created on 2016-01-23
*/
class JsonResponseProviderEndpointActor(myStubConfig: StubConfig) extends JsonEndpointActor(myStubConfig) {
class JsonResponseProviderEndpointActor(override val stubConfig: StubConfig, override val metricsCollector: ActorRef)
extends JsonEndpointActor {
private case class ResponseProvider(clazz : Class[_], member : Method)

private val programmedStub: ResponseProvider =
myStubConfig.responseprovider match {
stubConfig.responseprovider match {
case Some(responseProvider) =>
val clazz = Class.forName(responseProvider.clazz)
val member = clazz.getDeclaredMethod(responseProvider.member, classOf[RequestMatch])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package castalia.actors

import akka.actor._
import akka.pattern.pipe
import akka.http.scaladsl.model.StatusCodes.Forbidden
import akka.pattern.pipe
import castalia.matcher.RequestMatch
import castalia.matcher.types.Params
import castalia.model.Model.{StubConfig, StubResponse, _}
Expand All @@ -18,7 +18,8 @@ case class DelayComplete(destination: ActorRef, message: StubResponse)
*
* Created on 2016-01-23
*/
class JsonResponsesEndpointActor(myStubConfig: StubConfig) extends JsonEndpointActor(myStubConfig)
class JsonResponsesEndpointActor(override val stubConfig: StubConfig, override val metricsCollector: ActorRef)
extends JsonEndpointActor
with ActorLogging
with Delay {

Expand Down Expand Up @@ -68,7 +69,7 @@ class JsonResponsesEndpointActor(myStubConfig: StubConfig) extends JsonEndpointA
case (params, Some(first :: rest)) => if (paramMatch(params, first.ids)) Some(first) else findResponseRecurse(params, Some(rest))
case (_, _) => None
}
findResponseRecurse(pathParams, myStubConfig.responses)
findResponseRecurse(pathParams, stubConfig.responses)
}

def paramMatch( left: Params, right: EndpointIds): Boolean = {
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/castalia/management/Manager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package castalia.management

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.util.Timeout
import castalia.model.Messages.{Done, UpsertEndpoint}
import castalia.model.Messages.{EndpointMetricsGet, UpsertEndpoint}
import castalia.model.Model.StubConfig

import scala.concurrent.duration._
Expand All @@ -23,6 +23,8 @@ class Manager(receptionist: ActorRef) extends Actor with ActorLogging {
case config: StubConfig =>
log.debug(s"received message to adjust configuration for '${config.endpoint}'")
receptionist forward UpsertEndpoint(config)
case x => log.debug("Unexpected message received: " + x.toString)
case EndpointMetricsGet =>
log.debug("received message to fetch metrics")
receptionist forward EndpointMetricsGet
}
}
16 changes: 10 additions & 6 deletions src/main/scala/castalia/management/ManagerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,22 @@ import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.pattern.ask
import akka.util.Timeout
import castalia.model.Messages.Done
import castalia.model.Model.{StubConfig, _}
import castalia.model.Messages.{EndpointMetricsGet, Done}
import castalia.model.Model.{EndpointMetrics, StubConfig}

import scala.concurrent.duration._
import scala.language.postfixOps

trait ManagerService {

protected def managerActor: ActorRef

protected implicit val system: ActorSystem

private implicit val timeout = Timeout(2 second)

import system.dispatcher

val managementRoute: Route =
path("castalia" / "manager" / "endpoints") {
pathPrefix("castalia" / "manager" / "endpoints") {
post {
entity(as[StubConfig]) {
stubConfig =>
Expand All @@ -34,7 +32,13 @@ trait ManagerService {
.map(result => s"${result.endpoint}")
}
}
}
} ~ path("metrics") {
get {
complete {
(managerActor ? EndpointMetricsGet).mapTo[EndpointMetrics]
}
}
}
}

}
31 changes: 31 additions & 0 deletions src/main/scala/castalia/metrics/MetricsCollectorActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package castalia.metrics

import akka.actor.{Actor, ActorLogging, Props}
import castalia.model.Messages.{EndpointCalled, EndpointMetricsInit, EndpointMetricsGet}
import castalia.model.Model.EndpointMetrics
import MetricsCollectorActor._

object MetricsCollectorActor {

val metricNumberOfCalls = "calls"

def props: Props = Props(new MetricsCollectorActor)
}

/**
* Actor that aggregates various metrics specific to endpoints.
*/
class MetricsCollectorActor extends Actor with ActorLogging {

override def receive: Receive = active(new MetricsRegistry())

def active(metricsRegistry: MetricsRegistry): Receive = {

case EndpointMetricsInit(endpoint) => context.become(active(metricsRegistry.reset(endpoint, metricNumberOfCalls)))

case EndpointCalled(endpoint) => context.become(active(metricsRegistry.increment(endpoint, metricNumberOfCalls)))

case EndpointMetricsGet => sender ! EndpointMetrics(metricsRegistry.metricsByEndpoint)
}

}
22 changes: 22 additions & 0 deletions src/main/scala/castalia/metrics/MetricsRegistry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package castalia.metrics

import castalia._

class MetricsRegistry(val metricsByEndpoint: Map[Endpoint, Metrics]) {

def this() = this(Map())

def reset(endpoint: Endpoint, metric: String): MetricsRegistry = {
add(endpoint, metric, 0)
}

def increment(endpoint: Endpoint, metric: String): MetricsRegistry = {
val value = metricsByEndpoint.getOrElse(endpoint, Map()).getOrElse(metric, 0) + 1
add(endpoint, metric, value)
}

private def add(endpoint: Endpoint, metric: String, value: Int) = {
val metrics = metricsByEndpoint.getOrElse(endpoint, Map()) ++ Map(metric -> value)
new MetricsRegistry(metricsByEndpoint + (endpoint -> metrics))
}
}
5 changes: 5 additions & 0 deletions src/main/scala/castalia/model/Messages.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package castalia.model

import castalia.Endpoint
import castalia.model.Model.StubConfig

object Messages {
case class UpsertEndpoint(stubConfig: StubConfig)
case class Done(endpoint: String)

case class EndpointCalled(endpoint: Endpoint)
case class EndpointMetricsInit(endpoint: Endpoint)
case object EndpointMetricsGet
}
3 changes: 3 additions & 0 deletions src/main/scala/castalia/model/Model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,15 @@ object Model extends DefaultJsonProtocol {
}
}

case class EndpointMetrics(metrics: Map[Endpoint, Metrics])

// Note: these implicits mus tbe declared in the correct order (first the leaves, then the composing classes)
implicit val castaliaConfigFormatter = jsonFormat3(CastaliaConfig.apply)
implicit val latencyConfigFormat = jsonFormat2(LatencyConfig)
implicit val responseProviderFormat = jsonFormat(ResponseProviderConfig, "class", "member")
implicit val defaultResponseConfigFormat = jsonFormat3(DefaultResponseConfig)
implicit val responseConfigFormat = jsonFormat4(ResponseConfig)
implicit val stubConfigFormat = jsonFormat4(StubConfig)
implicit val endpointMetricsFormat = jsonFormat1(EndpointMetrics)

}
Loading

0 comments on commit 674f934

Please sign in to comment.