From 2202e90ad3a03843ff53300cdb1c01988e8722f8 Mon Sep 17 00:00:00 2001 From: Mahmoud Hanafy Date: Sat, 21 May 2016 09:28:08 +0200 Subject: [PATCH] Port LoadSave to Java --- .../examples/dataframe/JavaLoadSave.java | 140 ++++++++++++++++++ .../dataframe/LoadSave.scala | 15 +- 2 files changed, 150 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/highperformancespark/examples/dataframe/JavaLoadSave.java diff --git a/src/main/java/com/highperformancespark/examples/dataframe/JavaLoadSave.java b/src/main/java/com/highperformancespark/examples/dataframe/JavaLoadSave.java new file mode 100644 index 0000000..9d36dd8 --- /dev/null +++ b/src/main/java/com/highperformancespark/examples/dataframe/JavaLoadSave.java @@ -0,0 +1,140 @@ +package com.highperformancespark.examples.dataframe; + +import com.highperformancespark.examples.objects.JavaPandaPlace; +import com.highperformancespark.examples.objects.JavaRawPanda; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.*; +import org.apache.spark.sql.types.*; + +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +public class JavaLoadSave { + private SQLContext sqlContext; + + public JavaLoadSave(SQLContext sqlContext) { + this.sqlContext = sqlContext; + } + + //tag::createFromRDD[] + public DataFrame createFromJavaBean(JavaRDD input) { + // Create DataFrame using Java Bean + DataFrame df1 = sqlContext.createDataFrame(input, JavaPandaPlace.class); + + // Create DataFrame using JavaRDD + JavaRDD rowRDD = input.map(pm -> RowFactory.create(pm.getName(), + pm.getPandas().stream() + .map(pi -> RowFactory.create(pi.getId(), pi.getZip(), pi.isHappy(), pi.getAttributes())) + .collect(Collectors.toList()))); + + ArrayType pandasType = DataTypes.createArrayType(new StructType( + new StructField[]{ + new StructField("id", DataTypes.LongType, true, Metadata.empty()), + new StructField("zip", DataTypes.StringType, true, Metadata.empty()), + new StructField("happy", DataTypes.BooleanType, true, Metadata.empty()), + new StructField("attributes", DataTypes.createArrayType(DataTypes.FloatType), true, Metadata.empty()) + } + )); + + StructType schema = new StructType(new StructField[]{ + new StructField("name", DataTypes.StringType, true, Metadata.empty()), + new StructField("pandas", pandasType, true, Metadata.empty()) + }); + + DataFrame df2 = sqlContext.createDataFrame(rowRDD, schema); + return df2; + } + //end::createFromRDD[] + + //tag::createFromLocal[] + public DataFrame createFromLocal(List input) { + return sqlContext.createDataFrame(input, PandaPlace.class); + } + //end::createFromLocal[] + + //tag::collectResults[] + public Row[] collectDF(DataFrame df) { + return df.collect(); + } + //end::collectResults[] + + //tag::toRDD[] + public JavaRDD toRDD(DataFrame input) { + JavaRDD rdd = input.javaRDD().map(row -> new JavaRawPanda(row.getLong(0), row.getString(1), + row.getString(2), row.getBoolean(3), row.getList(4))); + return rdd; + } + //end::toRDD[] + + //tag::partitionedOutput[] + public void writeOutByZip(DataFrame input) { + input.write().partitionBy("zipcode").format("json").save("output/"); + } + //end::partitionedOutput[] + + //tag::saveAppend[] + public void writeAppend(DataFrame input) { + input.write().mode(SaveMode.Append).save("output/"); + } + //end::saveAppend[] + + public DataFrame createJDBC() { + //tag::createJDBC[] + DataFrame df1 = sqlContext.read().jdbc("jdbc:dialect:serverName;user=user;password=pass", + "table", new Properties()); + + DataFrame df2 = sqlContext.read().format("jdbc") + .option("url", "jdbc:dialect:serverName") + .option("dbtable", "table").load(); + + return df2; + //end::createJDBC[] + } + + public void writeJDBC(DataFrame df) { + //tag::writeJDBC[] + df.write().jdbc("jdbc:dialect:serverName;user=user;password=pass", + "table", new Properties()); + + df.write().format("jdbc") + .option("url", "jdbc:dialect:serverName") + .option("user", "user") + .option("password", "pass") + .option("dbtable", "table").save(); + //end::writeJDBC[] + } + + //tag::loadParquet[] + public DataFrame loadParquet(String path) { + // Configure Spark to read binary data as string, note: must be configured on SQLContext + sqlContext.setConf("spark.sql.parquet.binaryAsString", "true"); + + // Load parquet data using merge schema (configured through option) + DataFrame df = sqlContext.read() + .option("mergeSchema", "true") + .format("parquet") + .load(path); + + return df; + } + //end::loadParquet[] + + //tag::writeParquet[] + public void writeParquet(DataFrame df, String path) { + df.write().format("parquet").save(path); + } + //end::writeParquet[] + + //tag::loadHiveTable[] + public DataFrame loadHiveTable() { + return sqlContext.read().table("pandas"); + } + //end::loadHiveTable[] + + //tag::saveManagedTable[] + public void saveManagedTable(DataFrame df) { + df.write().saveAsTable("pandas"); + } + //end::saveManagedTable[] +} diff --git a/src/main/scala/com/high-performance-spark-examples/dataframe/LoadSave.scala b/src/main/scala/com/high-performance-spark-examples/dataframe/LoadSave.scala index ec63e12..2c865f8 100644 --- a/src/main/scala/com/high-performance-spark-examples/dataframe/LoadSave.scala +++ b/src/main/scala/com/high-performance-spark-examples/dataframe/LoadSave.scala @@ -23,13 +23,15 @@ case class LoadSave(sqlContext: SQLContext) { val rowRDD = input.map(pm => Row(pm.name, pm.pandas.map(pi => Row(pi.id, pi.zip, pi.happy, pi.attributes)))) + val pandasType = ArrayType(StructType(List( + StructField("id", LongType, true), + StructField("zip", StringType, true), + StructField("happy", BooleanType, true), + StructField("attributes", ArrayType(FloatType), true)))) + // Create DataFrame explicitly with specified schema val schema = StructType(List(StructField("name", StringType, true), - StructField("pandas", ArrayType(StructType(List( - StructField("id", LongType, true), - StructField("zip", StringType, true), - StructField("happy", BooleanType, true), - StructField("attributes", ArrayType(FloatType), true))))))) + StructField("pandas", pandasType))) val df3 = sqlContext.createDataFrame(rowRDD, schema) } @@ -72,6 +74,7 @@ case class LoadSave(sqlContext: SQLContext) { //tag::createJDBC[] sqlContext.read.jdbc("jdbc:dialect:serverName;user=user;password=pass", "table", new Properties) + sqlContext.read.format("jdbc") .option("url", "jdbc:dialect:serverName") .option("dbtable", "table").load() @@ -82,6 +85,7 @@ case class LoadSave(sqlContext: SQLContext) { //tag::writeJDBC[] df.write.jdbc("jdbc:dialect:serverName;user=user;password=pass", "table", new Properties) + df.write.format("jdbc") .option("url", "jdbc:dialect:serverName") .option("user", "user") @@ -94,6 +98,7 @@ case class LoadSave(sqlContext: SQLContext) { def loadParquet(path: String): DataFrame = { // Configure Spark to read binary data as string, note: must be configured on SQLContext sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") + // Load parquet data using merge schema (configured through option) sqlContext.read .option("mergeSchema", "true")