Skip to content

Commit

Permalink
Merge pull request #49 from mahmoudhanafy/port-HappyPandas-to-Java
Browse files Browse the repository at this point in the history
Port HappyPandas to java
  • Loading branch information
holdenk committed May 23, 2016
2 parents 09b3d85 + d7de259 commit 8a395d4
Show file tree
Hide file tree
Showing 13 changed files with 789 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ cache:
- $HOME/.sbt/launchers
scala:
- 2.11.6
jdk:
- oraclejdk8
apt:
sources:
- ubuntu-toolchain-r-test
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ scalaVersion := "2.11.6"

crossScalaVersions := Seq("2.11.6")

javacOptions ++= Seq("-source", "1.7", "-target", "1.7")
javacOptions ++= Seq("-source", "1.8", "-target", "1.8")

sparkVersion := "1.6.1"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package com.highperformancespark.examples.dataframe;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.hive.HiveContext;

import java.util.HashMap;
import java.util.Map;

import static org.apache.spark.sql.functions.*;

public class JavaHappyPandas {

/**
* Creates SQLContext with an existing SparkContext.
*/
public static SQLContext sqlContext(JavaSparkContext jsc) {
SQLContext sqlContext = new SQLContext(jsc);
return sqlContext;
}

/**
* Creates HiveContext with an existing SparkContext.
*/
public static HiveContext hiveContext(JavaSparkContext jsc) {
HiveContext hiveContext = new HiveContext(jsc);
return hiveContext;
}

/**
* Illustrate loading some JSON data.
*/
public static DataFrame loadDataSimple(JavaSparkContext jsc, SQLContext sqlContext, String path) {
DataFrame df1 = sqlContext.read().json(path);

DataFrame df2 = sqlContext.read().format("json").option("samplingRatio", "1.0").load(path);

JavaRDD<String> jsonRDD = jsc.textFile(path);
DataFrame df3 = sqlContext.read().json(jsonRDD);

return df1;
}

public static DataFrame jsonLoadFromRDD(SQLContext sqlContext, JavaRDD<String> input) {
JavaRDD<String> rdd = input.filter(e -> e.contains("panda"));
DataFrame df = sqlContext.read().json(rdd);
return df;
}

// Here will be some examples on PandaInfo DataFrame

/**
* Gets the percentage of happy pandas per place.
*
* @param pandaInfo the input DataFrame
* @return Returns DataFrame of (place, percentage of happy pandas)
*/
public static DataFrame happyPandasPercentage(DataFrame pandaInfo) {
DataFrame happyPercentage = pandaInfo.select(pandaInfo.col("place"),
(pandaInfo.col("happyPandas").divide(pandaInfo.col("totalPandas"))).as("percentHappy"));
return happyPercentage;
}

/**
* Encodes pandaType to Integer values instead of String values.
*
* @param pandaInfo the input DataFrame
* @return Returns a DataFrame of pandaId and integer value for pandaType.
*/
public static DataFrame encodePandaType(DataFrame pandaInfo) {
DataFrame encodedDF = pandaInfo.select(pandaInfo.col("id"),
when(pandaInfo.col("pt").equalTo("giant"), 0).
when(pandaInfo.col("pt").equalTo("red"), 1).
otherwise(2).as("encodedType"));

return encodedDF;
}

/**
* Gets places with happy pandas more than minHappinessBound.
*/
public static DataFrame minHappyPandas(DataFrame pandaInfo, int minHappyPandas) {
return pandaInfo.filter(pandaInfo.col("happyPandas").geq(minHappyPandas));
}

/**
* Find pandas that are sad.
*/
public static DataFrame sadPandas(DataFrame pandaInfo) {
return pandaInfo.filter(pandaInfo.col("happy").notEqual(true));
}

/**
* Find pandas that are happy and fuzzier than squishy.
*/
public static DataFrame happyFuzzyPandas(DataFrame pandaInfo) {
DataFrame df = pandaInfo.filter(
pandaInfo.col("happy").and(pandaInfo.col("attributes").apply(0)).gt(pandaInfo.col("attributes").apply(1))
);

return df;
}

/**
* Gets places that contains happy pandas more than unhappy pandas.
*/
public static DataFrame happyPandasPlaces(DataFrame pandaInfo) {
return pandaInfo.filter(pandaInfo.col("happyPandas").geq(pandaInfo.col("totalPandas").divide(2)));
}

/**
* Remove duplicate pandas by id.
*/
public static DataFrame removeDuplicates(DataFrame pandas) {
DataFrame df = pandas.dropDuplicates(new String[]{"id"});
return df;
}

public static DataFrame describePandas(DataFrame pandas) {
return pandas.describe();
}

public static DataFrame maxPandaSizePerZip(DataFrame pandas) {
return pandas.groupBy(pandas.col("zip")).max("pandaSize");
}

public static DataFrame minMaxPandaSizePerZip(DataFrame pandas) {
return pandas.groupBy(pandas.col("zip")).agg(min("pandaSize"), max("pandaSize"));
}

public static DataFrame minPandaSizeMaxAgePerZip(DataFrame pandas) {
Map<String, String> map = new HashMap<>();
map.put("pandaSize", "min");
map.put("age", "max");

DataFrame df = pandas.groupBy(pandas.col("zip")).agg(map);
return df;
}

public static DataFrame minMeanSizePerZip(DataFrame pandas) {
return pandas.groupBy(pandas.col("zip")).agg(min(pandas.col("pandaSize")), mean(pandas.col("pandaSize")));
}

public static DataFrame simpleSqlExample(DataFrame pandas) {
SQLContext sqlContext = pandas.sqlContext();
pandas.registerTempTable("pandas");

DataFrame miniPandas = sqlContext.sql("SELECT * FROM pandas WHERE pandaSize < 12");
return miniPandas;
}

/**
* Orders pandas by size ascending and by age descending.
* Pandas will be sorted by "size" first and if two pandas
* have the same "size" will be sorted by "age".
*/
public static DataFrame orderPandas(DataFrame pandas) {
return pandas.orderBy(pandas.col("pandaSize").asc(), pandas.col("age").desc());
}

public static DataFrame computeRelativePandaSizes(DataFrame pandas) {
//tag::relativePandaSizesWindow[]
WindowSpec windowSpec = Window
.orderBy(pandas.col("age"))
.partitionBy(pandas.col("zip"))
.rowsBetween(-10, 10); // can use rangeBetween for range instead
//end::relativePandaSizesWindow[]

//tag::relativePandaSizesQuery[]
Column pandaRelativeSizeCol = pandas.col("pandaSize").minus(avg(pandas.col("pandaSize")).over(windowSpec));

return pandas.select(pandas.col("name"), pandas.col("zip"), pandas.col("pandaSize"),
pandas.col("age"), pandaRelativeSizeCol.as("panda_relative_size"));
//end::relativePandaSizesQuery[]
}

public static void joins(DataFrame df1, DataFrame df2) {
//tag::innerJoin[]
// Inner join implicit
df1.join(df2, df1.col("name").equalTo(df2.col("name")));
// Inner join explicit
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "inner");
//end::innerJoin[]

//tag::leftouterJoin[]
// Left outer join explicit
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "left_outer");
//end::leftouterJoin[]

//tag::rightouterJoin[]
// Right outer join explicit
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "right_outer");
//end::rightouterJoin[]

//tag::leftsemiJoin[]
// Left semi join explicit
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "leftsemi");
//end::leftsemiJoin[]
}

public static DataFrame selfJoin(DataFrame df) {
return (df.as("a")).join(df.as("b")).where("a.name = b.name");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package com.highperformancespark.examples.dataframe;

import com.highperformancespark.examples.objects.JavaPandaPlace;
import com.highperformancespark.examples.objects.JavaRawPanda;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;

import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

public class JavaLoadSave {
private SQLContext sqlContext;

public JavaLoadSave(SQLContext sqlContext) {
this.sqlContext = sqlContext;
}

//tag::createFromRDD[]
public DataFrame createFromJavaBean(JavaRDD<JavaPandaPlace> input) {
// Create DataFrame using Java Bean
DataFrame df1 = sqlContext.createDataFrame(input, JavaPandaPlace.class);

// Create DataFrame using JavaRDD<Row>
JavaRDD<Row> rowRDD = input.map(pm -> RowFactory.create(pm.getName(),
pm.getPandas().stream()
.map(pi -> RowFactory.create(pi.getId(), pi.getZip(), pi.isHappy(), pi.getAttributes()))
.collect(Collectors.toList())));

ArrayType pandasType = DataTypes.createArrayType(new StructType(
new StructField[]{
new StructField("id", DataTypes.LongType, true, Metadata.empty()),
new StructField("zip", DataTypes.StringType, true, Metadata.empty()),
new StructField("happy", DataTypes.BooleanType, true, Metadata.empty()),
new StructField("attributes", DataTypes.createArrayType(DataTypes.FloatType), true, Metadata.empty())
}
));

StructType schema = new StructType(new StructField[]{
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("pandas", pandasType, true, Metadata.empty())
});

DataFrame df2 = sqlContext.createDataFrame(rowRDD, schema);
return df2;
}
//end::createFromRDD[]

//tag::createFromLocal[]
public DataFrame createFromLocal(List<PandaPlace> input) {
return sqlContext.createDataFrame(input, PandaPlace.class);
}
//end::createFromLocal[]

//tag::collectResults[]
public Row[] collectDF(DataFrame df) {
return df.collect();
}
//end::collectResults[]

//tag::toRDD[]
public JavaRDD<JavaRawPanda> toRDD(DataFrame input) {
JavaRDD<JavaRawPanda> rdd = input.javaRDD().map(row -> new JavaRawPanda(row.getLong(0), row.getString(1),
row.getString(2), row.getBoolean(3), row.getList(4)));
return rdd;
}
//end::toRDD[]

//tag::partitionedOutput[]
public void writeOutByZip(DataFrame input) {
input.write().partitionBy("zipcode").format("json").save("output/");
}
//end::partitionedOutput[]

//tag::saveAppend[]
public void writeAppend(DataFrame input) {
input.write().mode(SaveMode.Append).save("output/");
}
//end::saveAppend[]

public DataFrame createJDBC() {
//tag::createJDBC[]
DataFrame df1 = sqlContext.read().jdbc("jdbc:dialect:serverName;user=user;password=pass",
"table", new Properties());

DataFrame df2 = sqlContext.read().format("jdbc")
.option("url", "jdbc:dialect:serverName")
.option("dbtable", "table").load();

return df2;
//end::createJDBC[]
}

public void writeJDBC(DataFrame df) {
//tag::writeJDBC[]
df.write().jdbc("jdbc:dialect:serverName;user=user;password=pass",
"table", new Properties());

df.write().format("jdbc")
.option("url", "jdbc:dialect:serverName")
.option("user", "user")
.option("password", "pass")
.option("dbtable", "table").save();
//end::writeJDBC[]
}

//tag::loadParquet[]
public DataFrame loadParquet(String path) {
// Configure Spark to read binary data as string, note: must be configured on SQLContext
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true");

// Load parquet data using merge schema (configured through option)
DataFrame df = sqlContext.read()
.option("mergeSchema", "true")
.format("parquet")
.load(path);

return df;
}
//end::loadParquet[]

//tag::writeParquet[]
public void writeParquet(DataFrame df, String path) {
df.write().format("parquet").save(path);
}
//end::writeParquet[]

//tag::loadHiveTable[]
public DataFrame loadHiveTable() {
return sqlContext.read().table("pandas");
}
//end::loadHiveTable[]

//tag::saveManagedTable[]
public void saveManagedTable(DataFrame df) {
df.write().saveAsTable("pandas");
}
//end::saveManagedTable[]
}
Loading

0 comments on commit 8a395d4

Please sign in to comment.