Skip to content
Mahmoud Hanafy edited this page Feb 28, 2016 · 12 revisions

StreamingSuiteBase simulates the way spark streaming works. It sends the input as batches and apply given operation at every batch and compare the output with expected output.

Example(Unary operation):

class SampleStreamingTest extends StreamingSuiteBase {

  test("really simple transformation") {
    val input = List(List("hi"), List("hi holden"), List("bye"))
    val expected = List(List("hi"), List("hi", "holden"), List("bye"))
    testOperation[String, String](input, tokenize _, expected, ordered = false)
  }

  // This is the sample operation we are testing
  def tokenize(f: DStream[String]): DStream[String] = {
    f.flatMap(_.split(" "))
  }

}

Example(Binary Operation):

class SampleStreamingTest extends StreamingSuiteBase {

  test("simple two stream streaming test") {
    val input = List(List("hi", "pandas"), List("hi holden"), List("bye"))
    val input2 = List(List("hi"), List("pandas"), List("byes"))
    val expected = List(List("pandas"), List("hi holden"), List("bye"))
    testOperation[String, String, String](input, input2, subtract _, expected, ordered = false)
  }

  def subtract(d1: DStream[String], d2: DStream[String]): DStream[String] = {
    d1.transformWith(d2, SampleStreamingTest.subtractRDDs _)
  }

}

object SampleStreamingTest {
  def subtractRDDs(r1: RDD[String], r2: RDD[String]): RDD[String] = {
    r1.subtract(r2)
  }

You can also compare the output and expected output with custom equality method, using implicit custom equality object.

Example(Custom Equality):

class SampleStreamingTest extends StreamingSuiteBase {
  test("custom equality object (Integer)") {
    val input = List(List(-1), List(-2, 3, -4), List(5, -6))
    val expected = List(List(1), List(2, 3, 4), List(5, 6))

    implicit val integerCustomEquality =
      new Equality[Int] {
        override def areEqual(a: Int, b: Any): Boolean =
          b match {
            case n: Int => Math.abs(a) == Math.abs(n)
            case _ => false
          }
      }

    def doNothing(ds: DStream[Int]) = ds

    testOperation[Int, Int](input, doNothing _, expected, ordered = false)
    testOperation[Int, Int](input, doNothing _, expected, ordered = true)
  }
}
Clone this wiki locally