From c77a94516d48bcd9e9c64c83a5f7cffe72e6e435 Mon Sep 17 00:00:00 2001 From: Xavier GUIHOT Date: Sun, 24 Jun 2018 11:33:36 +0100 Subject: [PATCH] Pimp RDD with minBy/maxBy and key/value RDD with min/maxByKey/Value --- README.md | 3 + .../SparkHelper$$PairRDDExtensions.html | 76 +++++++++++++++++++ .../SparkHelper$$RDDExtensions.html | 38 ++++++++++ docs/com/spark_helper/SparkHelper$.html | 5 +- docs/com/spark_helper/package.html | 5 +- docs/index/index-m.html | 18 +++++ .../scala/com/spark_helper/SparkHelper.scala | 59 ++++++++++++++ .../com/spark_helper/SparkHelperTest.scala | 43 +++++++++++ 8 files changed, 245 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 4c204b5..857f425 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,9 @@ rdd.toList // equivalent to rdd.collect.toList - alias: rdd.collectAsList rdd.toMap // RDD((1, "a"), (2, "b"), (2, "c")) => Map((1, "a"), (2, "c")) rdd.duplicates // RDD(1, 3, 2, 1, 7, 8, 8, 1, 2) => RDD(1, 2, 8) rdd.reduceWithCount // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1)) +rdd.maxBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c") +rdd.minBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a") +rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c") ``` diff --git a/docs/com/spark_helper/SparkHelper$$PairRDDExtensions.html b/docs/com/spark_helper/SparkHelper$$PairRDDExtensions.html index 41cb52e..cb8a426 100644 --- a/docs/com/spark_helper/SparkHelper$$PairRDDExtensions.html +++ b/docs/com/spark_helper/SparkHelper$$PairRDDExtensions.html @@ -347,6 +347,82 @@

Definition Classes
Any
+
  • + + +

    + + + def + + + maxByKey()(implicit ord: Ordering[K]): (K, V) + +

    + + Permalink + + +

    Returns the element of this RDD with the largest key as defined by the +implicit Ordering[K].

    Returns the element of this RDD with the largest key as defined by the +implicit Ordering[K].

    RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey // (4, "c")
    returns

    the element with the largest key

    +
  • + + +

    + + + def + + + maxByValue()(implicit ord: Ordering[V]): (K, V) + +

    + + Permalink + + +

    Returns the element of this RDD with the largest value as defined by the +implicit Ordering[V].

    Returns the element of this RDD with the largest value as defined by the +implicit Ordering[V].

    RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByValue // (2, "c") or (4, "c")
    returns

    the element with the largest value

    +
  • + + +

    + + + def + + + minByKey()(implicit ord: Ordering[K]): (K, V) + +

    + + Permalink + + +

    Returns the element of this RDD with the smallest key as defined by the +implicit Ordering[T].

    Returns the element of this RDD with the smallest key as defined by the +implicit Ordering[T].

    RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).minByKey // (1, "a")
    returns

    the element with the smallest key

    +
  • + + +

    + + + def + + + minByValue()(implicit ord: Ordering[V]): (K, V) + +

    + + Permalink + + +

    Returns the element of this RDD with the smallest value as defined by +the implicit Ordering[V].

    Returns the element of this RDD with the smallest value as defined by +the implicit Ordering[V].

    RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).minByValue // (1, "a")
    returns

    the element with the smallest value

  • diff --git a/docs/com/spark_helper/SparkHelper$$RDDExtensions.html b/docs/com/spark_helper/SparkHelper$$RDDExtensions.html index a206659..5248400 100644 --- a/docs/com/spark_helper/SparkHelper$$RDDExtensions.html +++ b/docs/com/spark_helper/SparkHelper$$RDDExtensions.html @@ -367,6 +367,44 @@

    Definition Classes
    Any
    +

  • + + +

    + + + def + + + maxBy[U](f: (T) ⇒ U)(implicit ord: Ordering[U]): T + +

    + + Permalink + + +

    Returns the max of this RDD by the given predicate as defined by the +implicit Ordering[T].

    Returns the max of this RDD by the given predicate as defined by the +implicit Ordering[T].

    RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxBy(_._2) // (2, "c") or (4, "c")
    returns

    the max of this RDD by the given predicate

    +
  • + + +

    + + + def + + + minBy[U](f: (T) ⇒ U)(implicit ord: Ordering[U]): T + +

    + + Permalink + + +

    Returns the min of this RDD by the given predicate as defined by the +implicit Ordering[T].

    Returns the min of this RDD by the given predicate as defined by the +implicit Ordering[T].

    RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).minBy(_._2) // (1, "a")
    returns

    the min of this RDD by the given predicate

  • diff --git a/docs/com/spark_helper/SparkHelper$.html b/docs/com/spark_helper/SparkHelper$.html index 748b917..338143b 100644 --- a/docs/com/spark_helper/SparkHelper$.html +++ b/docs/com/spark_helper/SparkHelper$.html @@ -90,7 +90,10 @@

    rdd.toList // equivalent to rdd.collect.toList - alias: rdd.collectAsList rdd.toMap // RDD((1, "a"), (2, "b"), (2, "c")) => Map((1, "a"), (2, "c")) rdd.duplicates // RDD(1, 3, 2, 1, 7, 8, 8, 1, 2) => RDD(1, 2, 8) -rdd.reduceWithCount // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1))

    Source // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1)) +rdd.maxBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c") +rdd.minBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a") +rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")

    Source SparkHelper

    Since

    2017-02

    To do

    sc.parallelize[T](elmts: T*) instead of sc.parallelize[T](elmts: Array[T])

    Linear Supertypes diff --git a/docs/com/spark_helper/package.html b/docs/com/spark_helper/package.html index b376ec6..f08d516 100644 --- a/docs/com/spark_helper/package.html +++ b/docs/com/spark_helper/package.html @@ -344,7 +344,10 @@

    rdd.toList // equivalent to rdd.collect.toList - alias: rdd.collectAsList rdd.toMap // RDD((1, "a"), (2, "b"), (2, "c")) => Map((1, "a"), (2, "c")) rdd.duplicates // RDD(1, 3, 2, 1, 7, 8, 8, 1, 2) => RDD(1, 2, 8) -rdd.reduceWithCount // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1))

    Source // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1)) +rdd.maxBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c") +rdd.minBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a") +rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")

    Source SparkHelper

    Since

    2017-02

    To do

    sc.parallelize[T](elmts: T*) instead of sc.parallelize[T](elmts: Array[T])

  • diff --git a/docs/index/index-m.html b/docs/index/index-m.html index aba778b..c58f225 100644 --- a/docs/index/index-m.html +++ b/docs/index/index-m.html @@ -13,6 +13,24 @@
    Monitor
    +
    +
    maxBy
    + +
    +
    maxByKey
    + +
    +
    maxByValue
    + +
    +
    minBy
    + +
    +
    minByKey
    + +
    +
    minByValue
    +
    monitoring
    diff --git a/src/main/scala/com/spark_helper/SparkHelper.scala b/src/main/scala/com/spark_helper/SparkHelper.scala index e3444ff..d0a9a0a 100644 --- a/src/main/scala/com/spark_helper/SparkHelper.scala +++ b/src/main/scala/com/spark_helper/SparkHelper.scala @@ -66,6 +66,9 @@ import scala.util.Random * rdd.toMap // RDD((1, "a"), (2, "b"), (2, "c")) => Map((1, "a"), (2, "c")) * rdd.duplicates // RDD(1, 3, 2, 1, 7, 8, 8, 1, 2) => RDD(1, 2, 8) * rdd.reduceWithCount // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1)) + * rdd.maxBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c") + * rdd.minBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a") + * rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c") * }}} * * Source U)(implicit ord: Ordering[U]): T = + rdd.reduce { case (x, y) => if (ord.compare(f(x), f(y)) > 0) x else y } + + /** Returns the min of this RDD by the given predicate as defined by the + * implicit Ordering[T]. + * + * {{{ RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).minBy(_._2) // (1, "a") }}} + * + * @return the min of this RDD by the given predicate + * */ + def minBy[U](f: T => U)(implicit ord: Ordering[U]): T = + rdd.reduce { case (x, y) => if (ord.compare(f(x), f(y)) > 0) y else x } } implicit class StringRDDExtensions(rdd: RDD[String]) { @@ -423,6 +446,42 @@ object SparkHelper extends Serializable { * @return the collected Map version of the RDD on the driver */ def toMap: Map[K, V] = rdd.collect().toMap + + /** Returns the element of this RDD with the largest key as defined by the + * implicit Ordering[K]. + * + * {{{ RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey // (4, "c") }}} + * + * @return the element with the largest key + */ + def maxByKey()(implicit ord: Ordering[K]): (K, V) = rdd.maxBy(_._1)(ord) + + /** Returns the element of this RDD with the smallest key as defined by the + * implicit Ordering[T]. + * + * {{{ RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).minByKey // (1, "a") }}} + * + * @return the element with the smallest key + */ + def minByKey()(implicit ord: Ordering[K]): (K, V) = rdd.minBy(_._1)(ord) + + /** Returns the element of this RDD with the largest value as defined by the + * implicit Ordering[V]. + * + * {{{ RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByValue // (2, "c") or (4, "c") }}} + * + * @return the element with the largest value + */ + def maxByValue()(implicit ord: Ordering[V]): (K, V) = rdd.maxBy(_._2)(ord) + + /** Returns the element of this RDD with the smallest value as defined by + * the implicit Ordering[V]. + * + * {{{ RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).minByValue // (1, "a") }}} + * + * @return the element with the smallest value + */ + def minByValue()(implicit ord: Ordering[V]): (K, V) = rdd.minBy(_._2)(ord) } implicit class StringPairRDDExtensions(rdd: RDD[(String, String)]) { diff --git a/src/test/scala/com/spark_helper/SparkHelperTest.scala b/src/test/scala/com/spark_helper/SparkHelperTest.scala index 0fd9da8..0437b9d 100644 --- a/src/test/scala/com/spark_helper/SparkHelperTest.scala +++ b/src/test/scala/com/spark_helper/SparkHelperTest.scala @@ -4,6 +4,8 @@ import com.spark_helper.SparkHelper._ import org.apache.hadoop.io.compress.GzipCodec +import scala.math.Ordering.{String => StringOrdering} + import com.holdenkarau.spark.testing.{SharedSparkContext, RDDComparisons} import org.scalatest.FunSuite @@ -447,6 +449,47 @@ class SparkHelperTest val out = sc.parallelize(Array(1, 2, 8)) assertRDDEquals(in.duplicates(), out) } + + test("Max by") { + // 1: + val in = sc.parallelize(Array((1, "a"), (2, "c"), (3, "b"), (4, "c"))) + assert(Set((2, "c"), (4, "c")).contains(in.maxBy(_._2))) + // 2: + assert(in.maxBy(_._2)(WeirdOrdering) === (3, "b")) + // 3: + val message = intercept[UnsupportedOperationException] { + sc.emptyRDD[(String, Int)].maxBy(_._2) + }.getMessage + assert(message === "empty collection") + } + + test("Min by") { + // 1: + val in = sc.parallelize(Array((1, "a"), (2, "c"), (3, "b"), (4, "c"))) + assert(in.minBy(_._2) === (1, "a")) + // 2: + assert(in.minBy(_._2)(WeirdOrdering) === (1, "a")) + // 3: + val message = intercept[UnsupportedOperationException] { + sc.emptyRDD[(String, Int)].minBy(_._2) + }.getMessage + assert(message === "empty collection") + } + + test("Min/max by key/value") { + val in = sc.parallelize(Array((1, "a"), (2, "c"), (3, "b"), (4, "c"))) + assert(in.maxByKey() === (4, "c")) + assert(in.minByKey() === (1, "a")) + assert(Set((2, "c"), (4, "c")).contains(in.maxByValue())) + assert(in.minByValue() === (1, "a")) + } } case class A(x: Int, y: String) + +object WeirdOrdering extends Ordering[String] { + def compare(a: String, b: String): Int = + if (a == "b") Int.MaxValue + else if (b == "b") -Int.MaxValue + else StringOrdering.compare(a, b) +}