diff --git a/README.md b/README.md index 90015ab..ef6e748 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,9 @@ rdd.saveAsTextFileByKey("/my/output/folder/path") // Concept mapper (the following example transforms RDD(1, 3, 2, 7, 8) into RDD(1, 3, 4, 7, 16)): rdd.partialMap { case a if a % 2 == 0 => 2 * a } +// Concept mapper (the following example transforms RDD((1, "a"), (2, "b")) into RDD(1, (1, "a")), (2, (2, "b"))): +rdd.withKey(_._1) + // For when input files contain commas and textFile can't handle it: sc.textFile(Seq("path/hello,world.txt", "path/hello_world.txt")) ``` @@ -298,7 +301,7 @@ With sbt: ```scala resolvers += "jitpack" at "https://jitpack.io" -libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "2.0.0" +libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "2.0.1" ``` With maven: @@ -314,7 +317,7 @@ With maven: com.github.xavierguihot spark_helper - 2.0.0 + 2.0.1 ``` @@ -328,7 +331,7 @@ allprojects { } dependencies { - compile 'com.github.xavierguihot:spark_helper:2.0.0' + compile 'com.github.xavierguihot:spark_helper:2.0.1' } ``` diff --git a/build.sbt b/build.sbt index 15d0838..030b883 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "spark_helper" -version := "2.0.0" +version := "2.0.1" scalaVersion := "2.11.12" diff --git a/docs/com/spark_helper/SparkHelper$$RDDExtensions.html b/docs/com/spark_helper/SparkHelper$$RDDExtensions.html index cfdc746..57a2229 100644 --- a/docs/com/spark_helper/SparkHelper$$RDDExtensions.html +++ b/docs/com/spark_helper/SparkHelper$$RDDExtensions.html @@ -512,6 +512,25 @@

) +
  • + + +

    + + + def + + + withKey[K](toKey: (T) ⇒ K): RDD[(K, T)] + +

    + + Permalink + + +

    Map an RDD of A to an RDD of (B, A) where B is determined from A.

    Map an RDD of A to an RDD of (B, A) where B is determined from A.

    Replaces for example rdd.map(x => (x.smthg, x)) +with rdd.withKey(_.smthg).

    RDD((1, "a"), (2, "b")).withKey(_._1) // RDD(1, (1, "a")), (2, (2, "b"))
    toKey

    the function to apply to extract the key

    returns

    an rdd of (K, V) from an RDD of V where V is determined by +applying toKey on V.

  • diff --git a/docs/index/index-w.html b/docs/index/index-w.html index f7f4a02..368df52 100644 --- a/docs/index/index-w.html +++ b/docs/index/index-w.html @@ -11,6 +11,9 @@
    +
    withKey
    + +
    withPurge
    diff --git a/src/main/scala/com/spark_helper/SparkHelper.scala b/src/main/scala/com/spark_helper/SparkHelper.scala index 9d05d96..97f718a 100644 --- a/src/main/scala/com/spark_helper/SparkHelper.scala +++ b/src/main/scala/com/spark_helper/SparkHelper.scala @@ -96,6 +96,19 @@ object SparkHelper extends Serializable { case x if pf.isDefinedAt(x) => pf(x) case x => x } + + /** Map an RDD of A to an RDD of (B, A) where B is determined from A. + * + * Replaces for example rdd.map(x => (x.smthg, x)) + * with rdd.withKey(_.smthg). + * + * {{{ RDD((1, "a"), (2, "b")).withKey(_._1) // RDD(1, (1, "a")), (2, (2, "b")) }}} + * + * @param toKey the function to apply to extract the key + * @return an rdd of (K, V) from an RDD of V where V is determined by + * applying toKey on V. + */ + def withKey[K](toKey: T => K): RDD[(K, T)] = rdd.map(x => (toKey(x), x)) } implicit class StringRDDExtensions(val rdd: RDD[String]) extends AnyVal { diff --git a/src/test/scala/com/spark_helper/SparkHelperTest.scala b/src/test/scala/com/spark_helper/SparkHelperTest.scala index 70d706d..835d559 100644 --- a/src/test/scala/com/spark_helper/SparkHelperTest.scala +++ b/src/test/scala/com/spark_helper/SparkHelperTest.scala @@ -399,10 +399,17 @@ class SparkHelperTest } test("Partial map") { - val in = sc.parallelize(Array(1, 3, 2, 7, 8)) val computedOut = in.partialMap { case a if a % 2 == 0 => 2 * a } val expectedOut = sc.parallelize(Array(1, 3, 4, 7, 16)) assertRDDEquals(computedOut, expectedOut) } + + test("With Key") { + val in = sc.parallelize(Array(A(1, "a"), A(2, "b"))) + val out = sc.parallelize(Array((1, A(1, "a")), (2, A(2, "b")))) + assertRDDEquals(in.withKey(_.x), out) + } } + +case class A(x: Int, y: String)