-
-
Notifications
You must be signed in to change notification settings - Fork 355
StreamingSuiteBase
StreamingSuiteBase
simulates the way spark streaming works. It sends the input as batches and apply given operation at every batch and compare the output with expected output. You can compare the output and expected output in the same order (by setting ordered flag to true), Or Compare them unordered (ordered flag = false).
Example(Unary operation):
class SampleStreamingTest extends StreamingSuiteBase {
test("really simple transformation") {
val input = List(List("hi"), List("hi holden"), List("bye"))
val expected = List(List("hi"), List("hi", "holden"), List("bye"))
testOperation[String, String](input, tokenize _, expected, ordered = false)
}
// This is the sample operation we are testing
def tokenize(f: DStream[String]): DStream[String] = {
f.flatMap(_.split(" "))
}
}
Example(Binary Operation):
class SampleStreamingTest extends StreamingSuiteBase {
test("simple two stream streaming test") {
val input = List(List("hi", "pandas"), List("hi holden"), List("bye"))
val input2 = List(List("hi"), List("pandas"), List("byes"))
val expected = List(List("pandas"), List("hi holden"), List("bye"))
testOperation[String, String, String](input, input2, subtract _, expected, ordered = false)
}
def subtract(d1: DStream[String], d2: DStream[String]): DStream[String] = {
d1.transformWith(d2, SampleStreamingTest.subtractRDDs _)
}
}
object SampleStreamingTest {
def subtractRDDs(r1: RDD[String], r2: RDD[String]): RDD[String] = {
r1.subtract(r2)
}
Example(Window Operation):
class SampleStreamingTest extends StreamingSuiteBase {
test("CountByWindow with windowDuration 3s and slideDuration=2s") {
// There should be 2 windows : {batch2, batch1}, {batch4, batch3, batch2}
val batch1 = List("a", "b")
val batch2 = List("d", "f", "a")
val batch3 = List("f", "g"," h")
val batch4 = List("a")
val input= List(batch1, batch2, batch3, batch4)
val expected = List(List(5L), List(7L))
def countByWindow(ds:DStream[String]):DStream[Long] = {
ds.countByWindow(windowDuration = Seconds(3), slideDuration = Seconds(2))
}
testOperation[String, Long](input, countByWindow _, expected, ordered = true)
}
}
You can simulate the input batch as a List of values or as null to simulate empty batch.
Example(Empty Batch):
class SampleStreamingTest extends StreamingSuiteBase {
test("empty batch by using null") {
def multiply(stream1: DStream[Int]) = stream1.map(_ * 3)
val input1 = List(List(1), null, List(10))
val output = List(List(3), List(30))
testOperation(input1, multiply _, output, ordered = false)
}
}
You can also compare the output and expected output with custom equality method, using implicit custom equality object.
Example(Custom Equality):
class SampleStreamingTest extends StreamingSuiteBase {
test("custom equality object (Integer)") {
val input = List(List(-1), List(-2, 3, -4), List(5, -6))
val expected = List(List(1), List(2, 3, 4), List(5, 6))
implicit val integerCustomEquality =
new Equality[Int] {
override def areEqual(a: Int, b: Any): Boolean =
b match {
case n: Int => Math.abs(a) == Math.abs(n)
case _ => false
}
}
def doNothing(ds: DStream[Int]) = ds
testOperation[Int, Int](input, doNothing _, expected, ordered = false)
testOperation[Int, Int](input, doNothing _, expected, ordered = true)
}
}
The simulation times out after 10 seconds. If you want to increase that duration you should override maxWaitTimeMillis
value.
Example(Longer timeout duration):
class SampleStreamingTest extends StreamingSuiteBase {
override def maxWaitTimeMillis: Int = 20000
test("increase duration more than 10 seconds") {
val input = (1 to 1000).toList.map(x => List(x))
val expectedOutput = (1 to 1000).toList.map(x => List(2 * x))
def multiply(ds: DStream[Int]) = ds.map(_ * 2)
testOperation[Int, Int](input, multiply _, expectedOutput, ordered = true)
}
}