Skip to content

Commit

Permalink
Pimp RDD with a .withKey method
Browse files Browse the repository at this point in the history
  • Loading branch information
xavierguihot committed Jun 18, 2018
1 parent 405bf5c commit 49b2ad2
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 5 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ rdd.saveAsTextFileByKey("/my/output/folder/path")
// Concept mapper (the following example transforms RDD(1, 3, 2, 7, 8) into RDD(1, 3, 4, 7, 16)):
rdd.partialMap { case a if a % 2 == 0 => 2 * a }

// Concept mapper (the following example transforms RDD((1, "a"), (2, "b")) into RDD(1, (1, "a")), (2, (2, "b"))):
rdd.withKey(_._1)

// For when input files contain commas and textFile can't handle it:
sc.textFile(Seq("path/hello,world.txt", "path/hello_world.txt"))
```
Expand Down Expand Up @@ -298,7 +301,7 @@ With sbt:
```scala
resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "2.0.0"
libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "2.0.1"
```

With maven:
Expand All @@ -314,7 +317,7 @@ With maven:
<dependency>
<groupId>com.github.xavierguihot</groupId>
<artifactId>spark_helper</artifactId>
<version>2.0.0</version>
<version>2.0.1</version>
</dependency>
```

Expand All @@ -328,7 +331,7 @@ allprojects {
}
dependencies {
compile 'com.github.xavierguihot:spark_helper:2.0.0'
compile 'com.github.xavierguihot:spark_helper:2.0.1'
}
```

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "spark_helper"

version := "2.0.0"
version := "2.0.1"

scalaVersion := "2.11.12"

Expand Down
19 changes: 19 additions & 0 deletions docs/com/spark_helper/SparkHelper$$RDDExtensions.html
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,25 @@ <h4 class="signature">
</span>)</span>

</dd></dl></div>
</li><li name="com.spark_helper.SparkHelper.RDDExtensions#withKey" visbl="pub" data-isabs="false" fullComment="yes" group="Ungrouped">
<a id="withKey[K](toKey:T=&gt;K):org.apache.spark.rdd.RDD[(K,T)]"></a>
<a id="withKey[K]((T)⇒K):RDD[(K,T)]"></a>
<h4 class="signature">
<span class="modifier_kind">
<span class="modifier"></span>
<span class="kind">def</span>
</span>
<span class="symbol">
<span class="name">withKey</span><span class="tparams">[<span name="K">K</span>]</span><span class="params">(<span name="toKey">toKey: (<span class="extype" name="com.spark_helper.SparkHelper.RDDExtensions.T">T</span>) ⇒ <span class="extype" name="com.spark_helper.SparkHelper.RDDExtensions.withKey.K">K</span></span>)</span><span class="result">: <span class="extype" name="org.apache.spark.rdd.RDD">RDD</span>[(<span class="extype" name="com.spark_helper.SparkHelper.RDDExtensions.withKey.K">K</span>, <span class="extype" name="com.spark_helper.SparkHelper.RDDExtensions.T">T</span>)]</span>
</span>
</h4><span class="permalink">
<a href="../../index.html#com.spark_helper.SparkHelper$$RDDExtensions@withKey[K](toKey:T=&gt;K):org.apache.spark.rdd.RDD[(K,T)]" title="Permalink" target="_top">
<img src="../../lib/permalink.png" alt="Permalink" />
</a>
</span>
<p class="shortcomment cmt">Map an RDD of A to an RDD of (B, A) where B is determined from A.</p><div class="fullcomment"><div class="comment cmt"><p>Map an RDD of A to an RDD of (B, A) where B is determined from A.</p><p>Replaces for example <code style="background-color:#eff0f1;padding:1px 5px;font-size:12px">rdd.map(x => (x.smthg, x))</code>
with <code style="background-color:#eff0f1;padding:1px 5px;font-size:12px">rdd.withKey(_.smthg)</code>.</p><pre>RDD((<span class="num">1</span>, <span class="lit">"a"</span>), (<span class="num">2</span>, <span class="lit">"b"</span>)).withKey(_._1) <span class="cmt">// RDD(1, (1, "a")), (2, (2, "b"))</span></pre></div><dl class="paramcmts block"><dt class="param">toKey</dt><dd class="cmt"><p>the function to apply to extract the key</p></dd><dt>returns</dt><dd class="cmt"><p>an rdd of (K, V) from an RDD of V where V is determined by
applying toKey on V.</p></dd></dl></div>
</li></ol>
</div>

Expand Down
3 changes: 3 additions & 0 deletions docs/index/index-w.html
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

</head>
<body><div class="entry">
<div class="name">withKey</div>
<div class="occurrences"><a href="../com/spark_helper/SparkHelper$$RDDExtensions.html" class="extype" name="com.spark_helper.SparkHelper.RDDExtensions">RDDExtensions</a> </div>
</div><div class="entry">
<div class="name">withPurge</div>
<div class="occurrences"><a href="../com/spark_helper/Monitor$.html" class="extype" name="com.spark_helper.Monitor">Monitor</a> </div>
</div><div class="entry">
Expand Down
13 changes: 13 additions & 0 deletions src/main/scala/com/spark_helper/SparkHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ object SparkHelper extends Serializable {
case x if pf.isDefinedAt(x) => pf(x)
case x => x
}

/** Map an RDD of A to an RDD of (B, A) where B is determined from A.
*
* Replaces for example <code style="background-color:#eff0f1;padding:1px 5px;font-size:12px">rdd.map(x => (x.smthg, x))</code>
* with <code style="background-color:#eff0f1;padding:1px 5px;font-size:12px">rdd.withKey(_.smthg)</code>.
*
* {{{ RDD((1, "a"), (2, "b")).withKey(_._1) // RDD(1, (1, "a")), (2, (2, "b")) }}}
*
* @param toKey the function to apply to extract the key
* @return an rdd of (K, V) from an RDD of V where V is determined by
* applying toKey on V.
*/
def withKey[K](toKey: T => K): RDD[(K, T)] = rdd.map(x => (toKey(x), x))
}

implicit class StringRDDExtensions(val rdd: RDD[String]) extends AnyVal {
Expand Down
9 changes: 8 additions & 1 deletion src/test/scala/com/spark_helper/SparkHelperTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,17 @@ class SparkHelperTest
}

test("Partial map") {

val in = sc.parallelize(Array(1, 3, 2, 7, 8))
val computedOut = in.partialMap { case a if a % 2 == 0 => 2 * a }
val expectedOut = sc.parallelize(Array(1, 3, 4, 7, 16))
assertRDDEquals(computedOut, expectedOut)
}

test("With Key") {
val in = sc.parallelize(Array(A(1, "a"), A(2, "b")))
val out = sc.parallelize(Array((1, A(1, "a")), (2, A(2, "b"))))
assertRDDEquals(in.withKey(_.x), out)
}
}

case class A(x: Int, y: String)

0 comments on commit 49b2ad2

Please sign in to comment.