-
-
Notifications
You must be signed in to change notification settings - Fork 355
StreamingSuiteBase
Mahmoud Hanafy edited this page Feb 28, 2016
·
12 revisions
StreamingSuiteBase
simulates the way spark streaming works. It sends the input as batches and apply given operation at every batch and compare the output with expected output.
Example(Unary operation):
class SampleStreamingTest extends StreamingSuiteBase {
test("really simple transformation") {
val input = List(List("hi"), List("hi holden"), List("bye"))
val expected = List(List("hi"), List("hi", "holden"), List("bye"))
testOperation[String, String](input, tokenize _, expected, ordered = false)
}
// This is the sample operation we are testing
def tokenize(f: DStream[String]): DStream[String] = {
f.flatMap(_.split(" "))
}
}
Example(Binary Operation):
class SampleStreamingTest extends StreamingSuiteBase {
test("simple two stream streaming test") {
val input = List(List("hi", "pandas"), List("hi holden"), List("bye"))
val input2 = List(List("hi"), List("pandas"), List("byes"))
val expected = List(List("pandas"), List("hi holden"), List("bye"))
testOperation[String, String, String](input, input2, subtract _, expected, ordered = false)
}
def subtract(d1: DStream[String], d2: DStream[String]): DStream[String] = {
d1.transformWith(d2, SampleStreamingTest.subtractRDDs _)
}
}
object SampleStreamingTest {
def subtractRDDs(r1: RDD[String], r2: RDD[String]): RDD[String] = {
r1.subtract(r2)
}
You can also compare the output and expected output with custom equality method, using implicit custom equality object.
Example(Custom Equality):
class SampleStreamingTest extends StreamingSuiteBase {
test("custom equality object (Integer)") {
val input = List(List(-1), List(-2, 3, -4), List(5, -6))
val expected = List(List(1), List(2, 3, 4), List(5, 6))
implicit val integerCustomEquality =
new Equality[Int] {
override def areEqual(a: Int, b: Any): Boolean =
b match {
case n: Int => Math.abs(a) == Math.abs(n)
case _ => false
}
}
def doNothing(ds: DStream[Int]) = ds
testOperation[Int, Int](input, doNothing _, expected, ordered = false)
testOperation[Int, Int](input, doNothing _, expected, ordered = true)
}
}