Skip to content

Commit

Permalink
akka#104 Use mock scheduler when testing Pulse stage
Browse files Browse the repository at this point in the history
  • Loading branch information
2m committed Oct 24, 2017
1 parent 8cf384c commit a54f916
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ package akka.stream.contrib

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

trait BaseStreamSpec extends WordSpec with Matchers with BeforeAndAfterAll {

protected implicit val system = {
def config = ConfigFactory.parseString(s"akka.stream.materializer.auto-fusing=$autoFusing")
def systemConfig = ConfigFactory.parseString(s"akka.stream.materializer.auto-fusing=$autoFusing")
.withFallback(config)
.withFallback(ConfigFactory.load())
ActorSystem("default", config)
ActorSystem("default", systemConfig)
}

protected implicit val mat = ActorMaterializer()
Expand All @@ -26,4 +28,5 @@ trait BaseStreamSpec extends WordSpec with Matchers with BeforeAndAfterAll {
}

protected def autoFusing: Boolean
protected def config: Config = ConfigFactory.empty()
}
83 changes: 46 additions & 37 deletions contrib/src/test/scala/akka/stream/contrib/TimeWindowSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,65 +3,74 @@
*/
package akka.stream.contrib

import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.testkit.TestDuration
import org.scalatest.concurrent.ScalaFutures
import java.util.concurrent.ThreadFactory

import akka.event.LoggingAdapter
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import com.miguno.akka.testing.{ MockScheduler, VirtualTime }
import com.typesafe.config.{ Config, ConfigFactory }

import scala.concurrent.duration._

/*
conflateWithSeed seems not to work as expected because of buffering of each stage when auto-fusing=off
class TimeWindowSpecAutoFusingOff extends { val autoFusing = false } with TimeWindowSpec
*/
class TimeWindowSpecAutoFusingOn extends { val autoFusing = true } with TimeWindowSpec

trait TimeWindowSpec extends BaseStreamSpec with ScalaFutures {
private val timeWindow = 100.milliseconds
class AkkaMockScheduler extends {
val time = new VirtualTime
} with MockScheduler(time) {
def this(config: Config, adapter: LoggingAdapter, tf: ThreadFactory) = this()
}

trait TimeWindowSpec extends BaseStreamSpec {

override def config = ConfigFactory.parseString(
s"""
|akka.scheduler.implementation = ${classOf[AkkaMockScheduler].getName}
""".stripMargin
)

private val timeWindow = 100.millis
private val epsilonTime = 10.millis

private val scheduler = system.scheduler.asInstanceOf[AkkaMockScheduler]

"TimeWindow flow" should {
"aggregate data for predefined amount of time" in {
val summingWindow = TimeWindow(timeWindow.dilated, eager = false)(identity[Int])(_ + _)
val summingWindow = TimeWindow(timeWindow, eager = false)(identity[Int])(_ + _)

val (pub, sub) = TestSource.probe[Int]
val sub = Source.repeat(1)
.via(summingWindow)
.toMat(TestSink.probe)(Keep.both)
.run
.runWith(TestSink.probe)

sub.request(2)

pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
sub.expectNext(timeWindow * 2, 5)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
sub.expectNext(timeWindow * 2, 5)
sub.expectNoMsg(timeWindow + epsilonTime)
scheduler.time.advance(timeWindow + epsilonTime)
scheduler.tick()
sub.expectNext()

sub.expectNoMsg(timeWindow + epsilonTime)
scheduler.time.advance(timeWindow + epsilonTime)
scheduler.tick()
sub.expectNext()
}

"emit the first seed if eager" in {
val summingWindow = TimeWindow(timeWindow.dilated, eager = true)(identity[Int])(_ + _)
val summingWindow = TimeWindow(timeWindow, eager = true)(identity[Int])(_ + _)

val (pub, sub) = TestSource.probe[Int]
val sub = Source.repeat(1)
.via(summingWindow)
.toMat(TestSink.probe)(Keep.both)
.run
.runWith(TestSink.probe)

sub.request(2)

pub.sendNext(1)
sub.expectNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
sub.expectNext(5)
sub.expectNext()

sub.expectNoMsg(timeWindow + epsilonTime)
scheduler.time.advance(timeWindow + epsilonTime)
scheduler.tick()
sub.expectNext()
}
}
}
5 changes: 3 additions & 2 deletions project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ object Common extends AutoPlugin {

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % "test",
"org.scalatest" %% "scalatest" % "3.0.0" % "test" // ApacheV2
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
"org.scalatest" %% "scalatest" % "3.0.0" % Test, // ApacheV2
"com.miguno.akka" %% "akka-mock-scheduler" % "0.5.1" % Test // ApacheV2
),

headers := headers.value ++ Map(
Expand Down

0 comments on commit a54f916

Please sign in to comment.