-
Notifications
You must be signed in to change notification settings - Fork 234
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #49 from mahmoudhanafy/port-HappyPandas-to-Java
Port HappyPandas to java
- Loading branch information
Showing
13 changed files
with
789 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
210 changes: 210 additions & 0 deletions
210
src/main/java/com/highperformancespark/examples/dataframe/JavaHappyPandas.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,210 @@ | ||
package com.highperformancespark.examples.dataframe; | ||
|
||
import org.apache.spark.api.java.JavaRDD; | ||
import org.apache.spark.api.java.JavaSparkContext; | ||
import org.apache.spark.sql.Column; | ||
import org.apache.spark.sql.DataFrame; | ||
import org.apache.spark.sql.SQLContext; | ||
import org.apache.spark.sql.expressions.Window; | ||
import org.apache.spark.sql.expressions.WindowSpec; | ||
import org.apache.spark.sql.hive.HiveContext; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
import static org.apache.spark.sql.functions.*; | ||
|
||
public class JavaHappyPandas { | ||
|
||
/** | ||
* Creates SQLContext with an existing SparkContext. | ||
*/ | ||
public static SQLContext sqlContext(JavaSparkContext jsc) { | ||
SQLContext sqlContext = new SQLContext(jsc); | ||
return sqlContext; | ||
} | ||
|
||
/** | ||
* Creates HiveContext with an existing SparkContext. | ||
*/ | ||
public static HiveContext hiveContext(JavaSparkContext jsc) { | ||
HiveContext hiveContext = new HiveContext(jsc); | ||
return hiveContext; | ||
} | ||
|
||
/** | ||
* Illustrate loading some JSON data. | ||
*/ | ||
public static DataFrame loadDataSimple(JavaSparkContext jsc, SQLContext sqlContext, String path) { | ||
DataFrame df1 = sqlContext.read().json(path); | ||
|
||
DataFrame df2 = sqlContext.read().format("json").option("samplingRatio", "1.0").load(path); | ||
|
||
JavaRDD<String> jsonRDD = jsc.textFile(path); | ||
DataFrame df3 = sqlContext.read().json(jsonRDD); | ||
|
||
return df1; | ||
} | ||
|
||
public static DataFrame jsonLoadFromRDD(SQLContext sqlContext, JavaRDD<String> input) { | ||
JavaRDD<String> rdd = input.filter(e -> e.contains("panda")); | ||
DataFrame df = sqlContext.read().json(rdd); | ||
return df; | ||
} | ||
|
||
// Here will be some examples on PandaInfo DataFrame | ||
|
||
/** | ||
* Gets the percentage of happy pandas per place. | ||
* | ||
* @param pandaInfo the input DataFrame | ||
* @return Returns DataFrame of (place, percentage of happy pandas) | ||
*/ | ||
public static DataFrame happyPandasPercentage(DataFrame pandaInfo) { | ||
DataFrame happyPercentage = pandaInfo.select(pandaInfo.col("place"), | ||
(pandaInfo.col("happyPandas").divide(pandaInfo.col("totalPandas"))).as("percentHappy")); | ||
return happyPercentage; | ||
} | ||
|
||
/** | ||
* Encodes pandaType to Integer values instead of String values. | ||
* | ||
* @param pandaInfo the input DataFrame | ||
* @return Returns a DataFrame of pandaId and integer value for pandaType. | ||
*/ | ||
public static DataFrame encodePandaType(DataFrame pandaInfo) { | ||
DataFrame encodedDF = pandaInfo.select(pandaInfo.col("id"), | ||
when(pandaInfo.col("pt").equalTo("giant"), 0). | ||
when(pandaInfo.col("pt").equalTo("red"), 1). | ||
otherwise(2).as("encodedType")); | ||
|
||
return encodedDF; | ||
} | ||
|
||
/** | ||
* Gets places with happy pandas more than minHappinessBound. | ||
*/ | ||
public static DataFrame minHappyPandas(DataFrame pandaInfo, int minHappyPandas) { | ||
return pandaInfo.filter(pandaInfo.col("happyPandas").geq(minHappyPandas)); | ||
} | ||
|
||
/** | ||
* Find pandas that are sad. | ||
*/ | ||
public static DataFrame sadPandas(DataFrame pandaInfo) { | ||
return pandaInfo.filter(pandaInfo.col("happy").notEqual(true)); | ||
} | ||
|
||
/** | ||
* Find pandas that are happy and fuzzier than squishy. | ||
*/ | ||
public static DataFrame happyFuzzyPandas(DataFrame pandaInfo) { | ||
DataFrame df = pandaInfo.filter( | ||
pandaInfo.col("happy").and(pandaInfo.col("attributes").apply(0)).gt(pandaInfo.col("attributes").apply(1)) | ||
); | ||
|
||
return df; | ||
} | ||
|
||
/** | ||
* Gets places that contains happy pandas more than unhappy pandas. | ||
*/ | ||
public static DataFrame happyPandasPlaces(DataFrame pandaInfo) { | ||
return pandaInfo.filter(pandaInfo.col("happyPandas").geq(pandaInfo.col("totalPandas").divide(2))); | ||
} | ||
|
||
/** | ||
* Remove duplicate pandas by id. | ||
*/ | ||
public static DataFrame removeDuplicates(DataFrame pandas) { | ||
DataFrame df = pandas.dropDuplicates(new String[]{"id"}); | ||
return df; | ||
} | ||
|
||
public static DataFrame describePandas(DataFrame pandas) { | ||
return pandas.describe(); | ||
} | ||
|
||
public static DataFrame maxPandaSizePerZip(DataFrame pandas) { | ||
return pandas.groupBy(pandas.col("zip")).max("pandaSize"); | ||
} | ||
|
||
public static DataFrame minMaxPandaSizePerZip(DataFrame pandas) { | ||
return pandas.groupBy(pandas.col("zip")).agg(min("pandaSize"), max("pandaSize")); | ||
} | ||
|
||
public static DataFrame minPandaSizeMaxAgePerZip(DataFrame pandas) { | ||
Map<String, String> map = new HashMap<>(); | ||
map.put("pandaSize", "min"); | ||
map.put("age", "max"); | ||
|
||
DataFrame df = pandas.groupBy(pandas.col("zip")).agg(map); | ||
return df; | ||
} | ||
|
||
public static DataFrame minMeanSizePerZip(DataFrame pandas) { | ||
return pandas.groupBy(pandas.col("zip")).agg(min(pandas.col("pandaSize")), mean(pandas.col("pandaSize"))); | ||
} | ||
|
||
public static DataFrame simpleSqlExample(DataFrame pandas) { | ||
SQLContext sqlContext = pandas.sqlContext(); | ||
pandas.registerTempTable("pandas"); | ||
|
||
DataFrame miniPandas = sqlContext.sql("SELECT * FROM pandas WHERE pandaSize < 12"); | ||
return miniPandas; | ||
} | ||
|
||
/** | ||
* Orders pandas by size ascending and by age descending. | ||
* Pandas will be sorted by "size" first and if two pandas | ||
* have the same "size" will be sorted by "age". | ||
*/ | ||
public static DataFrame orderPandas(DataFrame pandas) { | ||
return pandas.orderBy(pandas.col("pandaSize").asc(), pandas.col("age").desc()); | ||
} | ||
|
||
public static DataFrame computeRelativePandaSizes(DataFrame pandas) { | ||
//tag::relativePandaSizesWindow[] | ||
WindowSpec windowSpec = Window | ||
.orderBy(pandas.col("age")) | ||
.partitionBy(pandas.col("zip")) | ||
.rowsBetween(-10, 10); // can use rangeBetween for range instead | ||
//end::relativePandaSizesWindow[] | ||
|
||
//tag::relativePandaSizesQuery[] | ||
Column pandaRelativeSizeCol = pandas.col("pandaSize").minus(avg(pandas.col("pandaSize")).over(windowSpec)); | ||
|
||
return pandas.select(pandas.col("name"), pandas.col("zip"), pandas.col("pandaSize"), | ||
pandas.col("age"), pandaRelativeSizeCol.as("panda_relative_size")); | ||
//end::relativePandaSizesQuery[] | ||
} | ||
|
||
public static void joins(DataFrame df1, DataFrame df2) { | ||
//tag::innerJoin[] | ||
// Inner join implicit | ||
df1.join(df2, df1.col("name").equalTo(df2.col("name"))); | ||
// Inner join explicit | ||
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "inner"); | ||
//end::innerJoin[] | ||
|
||
//tag::leftouterJoin[] | ||
// Left outer join explicit | ||
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "left_outer"); | ||
//end::leftouterJoin[] | ||
|
||
//tag::rightouterJoin[] | ||
// Right outer join explicit | ||
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "right_outer"); | ||
//end::rightouterJoin[] | ||
|
||
//tag::leftsemiJoin[] | ||
// Left semi join explicit | ||
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "leftsemi"); | ||
//end::leftsemiJoin[] | ||
} | ||
|
||
public static DataFrame selfJoin(DataFrame df) { | ||
return (df.as("a")).join(df.as("b")).where("a.name = b.name"); | ||
} | ||
|
||
} |
140 changes: 140 additions & 0 deletions
140
src/main/java/com/highperformancespark/examples/dataframe/JavaLoadSave.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
package com.highperformancespark.examples.dataframe; | ||
|
||
import com.highperformancespark.examples.objects.JavaPandaPlace; | ||
import com.highperformancespark.examples.objects.JavaRawPanda; | ||
import org.apache.spark.api.java.JavaRDD; | ||
import org.apache.spark.sql.*; | ||
import org.apache.spark.sql.types.*; | ||
|
||
import java.util.List; | ||
import java.util.Properties; | ||
import java.util.stream.Collectors; | ||
|
||
public class JavaLoadSave { | ||
private SQLContext sqlContext; | ||
|
||
public JavaLoadSave(SQLContext sqlContext) { | ||
this.sqlContext = sqlContext; | ||
} | ||
|
||
//tag::createFromRDD[] | ||
public DataFrame createFromJavaBean(JavaRDD<JavaPandaPlace> input) { | ||
// Create DataFrame using Java Bean | ||
DataFrame df1 = sqlContext.createDataFrame(input, JavaPandaPlace.class); | ||
|
||
// Create DataFrame using JavaRDD<Row> | ||
JavaRDD<Row> rowRDD = input.map(pm -> RowFactory.create(pm.getName(), | ||
pm.getPandas().stream() | ||
.map(pi -> RowFactory.create(pi.getId(), pi.getZip(), pi.isHappy(), pi.getAttributes())) | ||
.collect(Collectors.toList()))); | ||
|
||
ArrayType pandasType = DataTypes.createArrayType(new StructType( | ||
new StructField[]{ | ||
new StructField("id", DataTypes.LongType, true, Metadata.empty()), | ||
new StructField("zip", DataTypes.StringType, true, Metadata.empty()), | ||
new StructField("happy", DataTypes.BooleanType, true, Metadata.empty()), | ||
new StructField("attributes", DataTypes.createArrayType(DataTypes.FloatType), true, Metadata.empty()) | ||
} | ||
)); | ||
|
||
StructType schema = new StructType(new StructField[]{ | ||
new StructField("name", DataTypes.StringType, true, Metadata.empty()), | ||
new StructField("pandas", pandasType, true, Metadata.empty()) | ||
}); | ||
|
||
DataFrame df2 = sqlContext.createDataFrame(rowRDD, schema); | ||
return df2; | ||
} | ||
//end::createFromRDD[] | ||
|
||
//tag::createFromLocal[] | ||
public DataFrame createFromLocal(List<PandaPlace> input) { | ||
return sqlContext.createDataFrame(input, PandaPlace.class); | ||
} | ||
//end::createFromLocal[] | ||
|
||
//tag::collectResults[] | ||
public Row[] collectDF(DataFrame df) { | ||
return df.collect(); | ||
} | ||
//end::collectResults[] | ||
|
||
//tag::toRDD[] | ||
public JavaRDD<JavaRawPanda> toRDD(DataFrame input) { | ||
JavaRDD<JavaRawPanda> rdd = input.javaRDD().map(row -> new JavaRawPanda(row.getLong(0), row.getString(1), | ||
row.getString(2), row.getBoolean(3), row.getList(4))); | ||
return rdd; | ||
} | ||
//end::toRDD[] | ||
|
||
//tag::partitionedOutput[] | ||
public void writeOutByZip(DataFrame input) { | ||
input.write().partitionBy("zipcode").format("json").save("output/"); | ||
} | ||
//end::partitionedOutput[] | ||
|
||
//tag::saveAppend[] | ||
public void writeAppend(DataFrame input) { | ||
input.write().mode(SaveMode.Append).save("output/"); | ||
} | ||
//end::saveAppend[] | ||
|
||
public DataFrame createJDBC() { | ||
//tag::createJDBC[] | ||
DataFrame df1 = sqlContext.read().jdbc("jdbc:dialect:serverName;user=user;password=pass", | ||
"table", new Properties()); | ||
|
||
DataFrame df2 = sqlContext.read().format("jdbc") | ||
.option("url", "jdbc:dialect:serverName") | ||
.option("dbtable", "table").load(); | ||
|
||
return df2; | ||
//end::createJDBC[] | ||
} | ||
|
||
public void writeJDBC(DataFrame df) { | ||
//tag::writeJDBC[] | ||
df.write().jdbc("jdbc:dialect:serverName;user=user;password=pass", | ||
"table", new Properties()); | ||
|
||
df.write().format("jdbc") | ||
.option("url", "jdbc:dialect:serverName") | ||
.option("user", "user") | ||
.option("password", "pass") | ||
.option("dbtable", "table").save(); | ||
//end::writeJDBC[] | ||
} | ||
|
||
//tag::loadParquet[] | ||
public DataFrame loadParquet(String path) { | ||
// Configure Spark to read binary data as string, note: must be configured on SQLContext | ||
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true"); | ||
|
||
// Load parquet data using merge schema (configured through option) | ||
DataFrame df = sqlContext.read() | ||
.option("mergeSchema", "true") | ||
.format("parquet") | ||
.load(path); | ||
|
||
return df; | ||
} | ||
//end::loadParquet[] | ||
|
||
//tag::writeParquet[] | ||
public void writeParquet(DataFrame df, String path) { | ||
df.write().format("parquet").save(path); | ||
} | ||
//end::writeParquet[] | ||
|
||
//tag::loadHiveTable[] | ||
public DataFrame loadHiveTable() { | ||
return sqlContext.read().table("pandas"); | ||
} | ||
//end::loadHiveTable[] | ||
|
||
//tag::saveManagedTable[] | ||
public void saveManagedTable(DataFrame df) { | ||
df.write().saveAsTable("pandas"); | ||
} | ||
//end::saveManagedTable[] | ||
} |
Oops, something went wrong.