Skip to content

Commit

Permalink
Pimp RDD with fraction
Browse files Browse the repository at this point in the history
  • Loading branch information
xavierguihot committed Jun 24, 2018
1 parent c77a945 commit e68768e
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 6 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ rdd.reduceWithCount // RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), (
rdd.maxBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c")
rdd.minBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a")
rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")
rdd.fraction(_ > 0) // RDD(1, 0, -2, -1, 7, -8, 8, 1, -2) => 0.4444

```

Expand Down Expand Up @@ -314,7 +315,7 @@ With sbt:
```scala
resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "2.0.2"
libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "2.0.3"
```

With maven:
Expand All @@ -330,7 +331,7 @@ With maven:
<dependency>
<groupId>com.github.xavierguihot</groupId>
<artifactId>spark_helper</artifactId>
<version>2.0.2</version>
<version>2.0.3</version>
</dependency>
```

Expand All @@ -344,7 +345,7 @@ allprojects {
}
dependencies {
compile 'com.github.xavierguihot:spark_helper:2.0.2'
compile 'com.github.xavierguihot:spark_helper:2.0.3'
}
```

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "spark_helper"

version := "2.0.2"
version := "2.0.3"

scalaVersion := "2.11.12"

Expand Down
17 changes: 17 additions & 0 deletions docs/com/spark_helper/SparkHelper$$RDDExtensions.html
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,23 @@ <h4 class="signature">
</span>)</span>

</dd></dl></div>
</li><li name="com.spark_helper.SparkHelper.RDDExtensions#fraction" visbl="pub" data-isabs="false" fullComment="yes" group="Ungrouped">
<a id="fraction(p:T=&gt;Boolean):Double"></a>
<a id="fraction((T)⇒Boolean):Double"></a>
<h4 class="signature">
<span class="modifier_kind">
<span class="modifier"></span>
<span class="kind">def</span>
</span>
<span class="symbol">
<span class="name">fraction</span><span class="params">(<span name="p">p: (<span class="extype" name="com.spark_helper.SparkHelper.RDDExtensions.T">T</span>) ⇒ <span class="extype" name="scala.Boolean">Boolean</span></span>)</span><span class="result">: <span class="extype" name="scala.Double">Double</span></span>
</span>
</h4><span class="permalink">
<a href="../../index.html#com.spark_helper.SparkHelper$$RDDExtensions@fraction(p:T=&gt;Boolean):Double" title="Permalink" target="_top">
<img src="../../lib/permalink.png" alt="Permalink" />
</a>
</span>
<p class="shortcomment cmt">Determines what fraction of the RDD passes the predicate.</p><div class="fullcomment"><div class="comment cmt"><p>Determines what fraction of the RDD passes the predicate.</p><pre>RDD(<span class="num">1</span>, <span class="num">0</span>, -<span class="num">2</span>, -<span class="num">1</span>, <span class="num">7</span>, -<span class="num">8</span>, <span class="num">8</span>, <span class="num">1</span>, -<span class="num">2</span>).fraction(_ &gt; <span class="num">0</span>) <span class="cmt">// 0.4444</span></pre></div><dl class="paramcmts block"><dt>returns</dt><dd class="cmt"><p>the fraction elements in the RDD which passe the given predicate</p></dd></dl></div>
</li><li name="scala.AnyRef#getClass" visbl="pub" data-isabs="false" fullComment="yes" group="Ungrouped">
<a id="getClass():Class[_]"></a>
<a id="getClass():Class[_]"></a>
Expand Down
3 changes: 2 additions & 1 deletion docs/com/spark_helper/SparkHelper$.html
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ <h4 id="signature" class="signature">
rdd.reduceWithCount <span class="cmt">// RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1))</span>
rdd.maxBy(_._2) <span class="cmt">// RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c")</span>
rdd.minBy(_._2) <span class="cmt">// RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a")</span>
rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... <span class="cmt">// RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")</span></pre><p>Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... <span class="cmt">// RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")</span>
rdd.fraction(_ &gt; <span class="num">0</span>) <span class="cmt">// RDD(1, 0, -2, -1, 7, -8, 8, 1, -2) => 0.4444</span></pre><p>Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
/main/scala/com/spark_helper/SparkHelper.scala">SparkHelper</a>
</p></div><dl class="attributes block"> <dt>Since</dt><dd><p>2017-02</p></dd><dt>To do</dt><dd><span class="cmt"><p>sc.parallelize[T](elmts: T*) instead of sc.parallelize[T](elmts: Array[T])</p></span></dd></dl><div class="toggleContainer block">
<span class="toggle">Linear Supertypes</span>
Expand Down
3 changes: 2 additions & 1 deletion docs/com/spark_helper/package.html
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ <h4 class="signature">
rdd.reduceWithCount <span class="cmt">// RDD("a", "b", "c", "a", "d", "a", "c") => RDD(("a", 3), ("b", 1), ("c", 2), ("d", 1))</span>
rdd.maxBy(_._2) <span class="cmt">// RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c")</span>
rdd.minBy(_._2) <span class="cmt">// RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a")</span>
rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... <span class="cmt">// RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")</span></pre><p>Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... <span class="cmt">// RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")</span>
rdd.fraction(_ &gt; <span class="num">0</span>) <span class="cmt">// RDD(1, 0, -2, -1, 7, -8, 8, 1, -2) => 0.4444</span></pre><p>Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
/main/scala/com/spark_helper/SparkHelper.scala">SparkHelper</a>
</p></div><dl class="attributes block"> <dt>Since</dt><dd><p>2017-02</p></dd><dt>To do</dt><dd><span class="cmt"><p>sc.parallelize[T](elmts: T*) instead of sc.parallelize[T](elmts: Array[T])</p></span></dd></dl></div>
</li><li name="com.spark_helper.monitoring" visbl="pub" data-isabs="false" fullComment="no" group="Ungrouped">
Expand Down
3 changes: 3 additions & 0 deletions docs/index/index-f.html
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,8 @@
</div><div class="entry">
<div class="name">folderModificationDateTime</div>
<div class="occurrences"><a href="../com/spark_helper/HdfsHelper$.html" class="extype" name="com.spark_helper.HdfsHelper">HdfsHelper</a> </div>
</div><div class="entry">
<div class="name">fraction</div>
<div class="occurrences"><a href="../com/spark_helper/SparkHelper$$RDDExtensions.html" class="extype" name="com.spark_helper.SparkHelper.RDDExtensions">RDDExtensions</a> </div>
</div></body>
</html>
14 changes: 14 additions & 0 deletions src/main/scala/com/spark_helper/SparkHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import scala.util.Random
* rdd.maxBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (2, "c") or (4, "c")
* rdd.minBy(_._2) // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")) => (1, "a")
* rdd.maxByKey; rdd.minByKey; rdd.maxByValue, ... // RDD((1, "a"), (2, "c"), (3, "b"), (4, "c")).maxByKey => (4, "c")
* rdd.fraction(_ > 0) // RDD(1, 0, -2, -1, 7, -8, 8, 1, -2) => 0.4444
* }}}
*
* Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
Expand Down Expand Up @@ -198,6 +199,19 @@ object SparkHelper extends Serializable {
* */
def minBy[U](f: T => U)(implicit ord: Ordering[U]): T =
rdd.reduce { case (x, y) => if (ord.compare(f(x), f(y)) > 0) y else x }

/** Determines what fraction of the RDD passes the predicate.
*
* {{{ RDD(1, 0, -2, -1, 7, -8, 8, 1, -2).fraction(_ > 0) // 0.4444 }}}
*
* @return the fraction elements in the RDD which passe the given predicate
* */
def fraction(p: T => Boolean): Double = {
val sides = rdd.map(x => p(x)).reduceWithCount().collect()
val fit = sides.find(_._1 == true).map(_._2).getOrElse(0L).toDouble
val rest = sides.find(_._1 == false).map(_._2).getOrElse(0L).toDouble
fit / (fit + rest)
}
}

implicit class StringRDDExtensions(rdd: RDD[String]) {
Expand Down
10 changes: 10 additions & 0 deletions src/test/scala/com/spark_helper/SparkHelperTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,16 @@ class SparkHelperTest
assert(Set((2, "c"), (4, "c")).contains(in.maxByValue()))
assert(in.minByValue() === (1, "a"))
}

test("Fraction") {
// 1:
val in = sc.parallelize(Array(1, 0, -2, -1, 7, -8, 8, 1, -2))
assert(in.fraction(_ > 0) === 4d / 9d)
// 2:
assert(sc.parallelize(Array(1, 7, 8, 1)).fraction(_ > 0) === 1d)
// 3:
assert(sc.parallelize(Array(-1, -7, 0, -1)).fraction(_ > 0) === 0d)
}
}

case class A(x: Int, y: String)
Expand Down

0 comments on commit e68768e

Please sign in to comment.