diff --git a/README.md b/README.md
index 4c204b5..857f425 100644
--- a/README.md
+++ b/README.md
@@ -133,6 +133,9 @@ rdd.toList // equivalent to rdd.collect.toList - alias: rdd.collectAsList
rdd.toMap // RDD((1, "a"), (2, "b"), (2, "c")) => Map((1, "a"), (2, "c"))
rdd.duplicates // RDD(1, 3, 2, 1, 7, 8, 8, 1, 2) => RDD(1, 2, 8)
rdd.reduceWithCount // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1))
+rdd.maxBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c")
+rdd.minBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a")
+rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")
```
diff --git a/docs/com/spark_helper/SparkHelper$$PairRDDExtensions.html b/docs/com/spark_helper/SparkHelper$$PairRDDExtensions.html
index 41cb52e..cb8a426 100644
--- a/docs/com/spark_helper/SparkHelper$$PairRDDExtensions.html
+++ b/docs/com/spark_helper/SparkHelper$$PairRDDExtensions.html
@@ -347,6 +347,82 @@
+
+
+
+
+
+
+ def
+
+
+ maxByKey()(implicit ord: Ordering[K]): (K, V)
+
+
+
+
+
+
+
+
+
+
+
+
+
+ def
+
+
+ maxByValue()(implicit ord: Ordering[V]): (K, V)
+
+
+
+
+
+
+
+
+
+
+
+
+
+ def
+
+
+ minByKey()(implicit ord: Ordering[K]): (K, V)
+
+
+
+
+
+
+
+
+
+
+
+
+
+ def
+
+
+ minByValue()(implicit ord: Ordering[V]): (K, V)
+
+
+
+
+
+
+
diff --git a/docs/com/spark_helper/SparkHelper$$RDDExtensions.html b/docs/com/spark_helper/SparkHelper$$RDDExtensions.html
index a206659..5248400 100644
--- a/docs/com/spark_helper/SparkHelper$$RDDExtensions.html
+++ b/docs/com/spark_helper/SparkHelper$$RDDExtensions.html
@@ -367,6 +367,44 @@
+
+
+
+
+
+
+ def
+
+
+ maxBy[U](f: (T) ⇒ U)(implicit ord: Ordering[U]): T
+
+
+
+
+
+
+
+
+
+
+
+
+
+ def
+
+
+ minBy[U](f: (T) ⇒ U)(implicit ord: Ordering[U]): T
+
+
+
+
+
+
+
diff --git a/docs/com/spark_helper/SparkHelper$.html b/docs/com/spark_helper/SparkHelper$.html
index 748b917..338143b 100644
--- a/docs/com/spark_helper/SparkHelper$.html
+++ b/docs/com/spark_helper/SparkHelper$.html
@@ -90,7 +90,10 @@
rdd.toList // equivalent to rdd.collect.toList - alias: rdd.collectAsList
rdd.toMap // RDD((1, "a"), (2, "b"), (2, "c")) => Map((1, "a"), (2, "c"))
rdd.duplicates // RDD(1, 3, 2, 1, 7, 8, 8, 1, 2) => RDD(1, 2, 8)
-rdd.reduceWithCount // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1))
Source // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1))
+rdd.maxBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c")
+rdd.minBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a")
+rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")Source SparkHelper
- Since
2017-02
- To do
sc.parallelize[T](elmts: T*) instead of sc.parallelize[T](elmts: Array[T])
Linear Supertypes
diff --git a/docs/com/spark_helper/package.html b/docs/com/spark_helper/package.html
index b376ec6..f08d516 100644
--- a/docs/com/spark_helper/package.html
+++ b/docs/com/spark_helper/package.html
@@ -344,7 +344,10 @@
rdd.toList // equivalent to rdd.collect.toList - alias: rdd.collectAsList
rdd.toMap // RDD((1, "a"), (2, "b"), (2, "c")) => Map((1, "a"), (2, "c"))
rdd.duplicates // RDD(1, 3, 2, 1, 7, 8, 8, 1, 2) => RDD(1, 2, 8)
-rdd.reduceWithCount // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1))
Source // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1))
+rdd.maxBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c")
+rdd.minBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a")
+rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")Source SparkHelper
- Since
2017-02
- To do
sc.parallelize[T](elmts: T*) instead of sc.parallelize[T](elmts: Array[T])
diff --git a/docs/index/index-m.html b/docs/index/index-m.html
index aba778b..c58f225 100644
--- a/docs/index/index-m.html
+++ b/docs/index/index-m.html
@@ -13,6 +13,24 @@
monitoring
diff --git a/src/main/scala/com/spark_helper/SparkHelper.scala b/src/main/scala/com/spark_helper/SparkHelper.scala
index e3444ff..d0a9a0a 100644
--- a/src/main/scala/com/spark_helper/SparkHelper.scala
+++ b/src/main/scala/com/spark_helper/SparkHelper.scala
@@ -66,6 +66,9 @@ import scala.util.Random
* rdd.toMap // RDD((1, "a"), (2, "b"), (2, "c")) => Map((1, "a"), (2, "c"))
* rdd.duplicates // RDD(1, 3, 2, 1, 7, 8, 8, 1, 2) => RDD(1, 2, 8)
* rdd.reduceWithCount // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1))
+ * rdd.maxBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c")
+ * rdd.minBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a")
+ * rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")
* }}}
*
* Source
U)(implicit ord: Ordering[U]): T =
+ rdd.reduce { case (x, y) => if (ord.compare(f(x), f(y)) > 0) x else y }
+
+ /** Returns the min of this RDD by the given predicate as defined by the
+ * implicit Ordering[T].
+ *
+ * {{{ RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).minBy(_._2) // (1, "a") }}}
+ *
+ * @return the min of this RDD by the given predicate
+ * */
+ def minBy[U](f: T => U)(implicit ord: Ordering[U]): T =
+ rdd.reduce { case (x, y) => if (ord.compare(f(x), f(y)) > 0) y else x }
}
implicit class StringRDDExtensions(rdd: RDD[String]) {
@@ -423,6 +446,42 @@ object SparkHelper extends Serializable {
* @return the collected Map version of the RDD on the driver
*/
def toMap: Map[K, V] = rdd.collect().toMap
+
+ /** Returns the element of this RDD with the largest key as defined by the
+ * implicit Ordering[K].
+ *
+ * {{{ RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey // (4, "c") }}}
+ *
+ * @return the element with the largest key
+ */
+ def maxByKey()(implicit ord: Ordering[K]): (K, V) = rdd.maxBy(_._1)(ord)
+
+ /** Returns the element of this RDD with the smallest key as defined by the
+ * implicit Ordering[T].
+ *
+ * {{{ RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).minByKey // (1, "a") }}}
+ *
+ * @return the element with the smallest key
+ */
+ def minByKey()(implicit ord: Ordering[K]): (K, V) = rdd.minBy(_._1)(ord)
+
+ /** Returns the element of this RDD with the largest value as defined by the
+ * implicit Ordering[V].
+ *
+ * {{{ RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByValue // (2, "c") or (4, "c") }}}
+ *
+ * @return the element with the largest value
+ */
+ def maxByValue()(implicit ord: Ordering[V]): (K, V) = rdd.maxBy(_._2)(ord)
+
+ /** Returns the element of this RDD with the smallest value as defined by
+ * the implicit Ordering[V].
+ *
+ * {{{ RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).minByValue // (1, "a") }}}
+ *
+ * @return the element with the smallest value
+ */
+ def minByValue()(implicit ord: Ordering[V]): (K, V) = rdd.minBy(_._2)(ord)
}
implicit class StringPairRDDExtensions(rdd: RDD[(String, String)]) {
diff --git a/src/test/scala/com/spark_helper/SparkHelperTest.scala b/src/test/scala/com/spark_helper/SparkHelperTest.scala
index 0fd9da8..0437b9d 100644
--- a/src/test/scala/com/spark_helper/SparkHelperTest.scala
+++ b/src/test/scala/com/spark_helper/SparkHelperTest.scala
@@ -4,6 +4,8 @@ import com.spark_helper.SparkHelper._
import org.apache.hadoop.io.compress.GzipCodec
+import scala.math.Ordering.{String => StringOrdering}
+
import com.holdenkarau.spark.testing.{SharedSparkContext, RDDComparisons}
import org.scalatest.FunSuite
@@ -447,6 +449,47 @@ class SparkHelperTest
val out = sc.parallelize(Array(1, 2, 8))
assertRDDEquals(in.duplicates(), out)
}
+
+ test("Max by") {
+ // 1:
+ val in = sc.parallelize(Array((1, "a"), (2, "c"), (3, "b"), (4, "c")))
+ assert(Set((2, "c"), (4, "c")).contains(in.maxBy(_._2)))
+ // 2:
+ assert(in.maxBy(_._2)(WeirdOrdering) === (3, "b"))
+ // 3:
+ val message = intercept[UnsupportedOperationException] {
+ sc.emptyRDD[(String, Int)].maxBy(_._2)
+ }.getMessage
+ assert(message === "empty collection")
+ }
+
+ test("Min by") {
+ // 1:
+ val in = sc.parallelize(Array((1, "a"), (2, "c"), (3, "b"), (4, "c")))
+ assert(in.minBy(_._2) === (1, "a"))
+ // 2:
+ assert(in.minBy(_._2)(WeirdOrdering) === (1, "a"))
+ // 3:
+ val message = intercept[UnsupportedOperationException] {
+ sc.emptyRDD[(String, Int)].minBy(_._2)
+ }.getMessage
+ assert(message === "empty collection")
+ }
+
+ test("Min/max by key/value") {
+ val in = sc.parallelize(Array((1, "a"), (2, "c"), (3, "b"), (4, "c")))
+ assert(in.maxByKey() === (4, "c"))
+ assert(in.minByKey() === (1, "a"))
+ assert(Set((2, "c"), (4, "c")).contains(in.maxByValue()))
+ assert(in.minByValue() === (1, "a"))
+ }
}
case class A(x: Int, y: String)
+
+object WeirdOrdering extends Ordering[String] {
+ def compare(a: String, b: String): Int =
+ if (a == "b") Int.MaxValue
+ else if (b == "b") -Int.MaxValue
+ else StringOrdering.compare(a, b)
+}