diff --git a/src/main/java/com/highperformancespark/examples/dataframe/JavaHappyPandas.java b/src/main/java/com/highperformancespark/examples/dataframe/JavaHappyPandas.java index 73636de..09b5e2f 100644 --- a/src/main/java/com/highperformancespark/examples/dataframe/JavaHappyPandas.java +++ b/src/main/java/com/highperformancespark/examples/dataframe/JavaHappyPandas.java @@ -19,7 +19,7 @@ public class JavaHappyPandas { /** * Creates SQLContext with an existing SparkContext. */ - public SQLContext sqlContext(JavaSparkContext jsc) { + public static SQLContext sqlContext(JavaSparkContext jsc) { SQLContext sqlContext = new SQLContext(jsc); return sqlContext; } @@ -27,7 +27,7 @@ public SQLContext sqlContext(JavaSparkContext jsc) { /** * Creates HiveContext with an existing SparkContext. */ - public HiveContext hiveContext(JavaSparkContext jsc) { + public static HiveContext hiveContext(JavaSparkContext jsc) { HiveContext hiveContext = new HiveContext(jsc); return hiveContext; } @@ -35,7 +35,7 @@ public HiveContext hiveContext(JavaSparkContext jsc) { /** * Illustrate loading some JSON data. */ - public DataFrame loadDataSimple(JavaSparkContext jsc, SQLContext sqlContext, String path) { + public static DataFrame loadDataSimple(JavaSparkContext jsc, SQLContext sqlContext, String path) { DataFrame df1 = sqlContext.read().json(path); DataFrame df2 = sqlContext.read().format("json").option("samplingRatio", "1.0").load(path); @@ -46,7 +46,7 @@ public DataFrame loadDataSimple(JavaSparkContext jsc, SQLContext sqlContext, Str return df1; } - public DataFrame jsonLoadFromRDD(SQLContext sqlContext, JavaRDD input) { + public static DataFrame jsonLoadFromRDD(SQLContext sqlContext, JavaRDD input) { JavaRDD rdd = input.filter(e -> e.contains("panda")); DataFrame df = sqlContext.read().json(rdd); return df; @@ -60,7 +60,7 @@ public DataFrame jsonLoadFromRDD(SQLContext sqlContext, JavaRDD input) { * @param pandaInfo the input DataFrame * @return Returns DataFrame of (place, percentage of happy pandas) */ - public DataFrame happyPandasPercentage(DataFrame pandaInfo) { + public static DataFrame happyPandasPercentage(DataFrame pandaInfo) { DataFrame happyPercentage = pandaInfo.select(pandaInfo.col("place"), pandaInfo.col("happyPandas").divide(pandaInfo.col("totalPandas")).as("percentHappy")); return happyPercentage; @@ -72,7 +72,7 @@ public DataFrame happyPandasPercentage(DataFrame pandaInfo) { * @param pandaInfo the input DataFrame * @return Returns a DataFrame of pandaId and integer value for pandaType. */ - public DataFrame encodePandaType(DataFrame pandaInfo) { + public static DataFrame encodePandaType(DataFrame pandaInfo) { DataFrame encodedDF = pandaInfo.select(pandaInfo.col("id"), when(pandaInfo.col("pt").equalTo("giant"), 0). when(pandaInfo.col("pt").equalTo("red"), 1). @@ -84,21 +84,21 @@ public DataFrame encodePandaType(DataFrame pandaInfo) { /** * Gets places with happy pandas more than minHappinessBound. */ - public DataFrame minHappyPandas(DataFrame pandaInfo, int minHappyPandas) { + public static DataFrame minHappyPandas(DataFrame pandaInfo, int minHappyPandas) { return pandaInfo.filter(pandaInfo.col("happyPandas").geq(minHappyPandas)); } /** * Find pandas that are sad. */ - public DataFrame sadPandas(DataFrame pandaInfo) { + public static DataFrame sadPandas(DataFrame pandaInfo) { return pandaInfo.filter(pandaInfo.col("happy").notEqual(true)); } /** * Find pandas that are happy and fuzzier than squishy. */ - public DataFrame happyFuzzyPandas(DataFrame pandaInfo) { + public static DataFrame happyFuzzyPandas(DataFrame pandaInfo) { DataFrame df = pandaInfo.filter( pandaInfo.col("happy").and(pandaInfo.col("attributes").apply(0)).gt(pandaInfo.col("attributes").apply(1)) ); @@ -109,31 +109,31 @@ public DataFrame happyFuzzyPandas(DataFrame pandaInfo) { /** * Gets places that contains happy pandas more than unhappy pandas. */ - public DataFrame happyPandasPlaces(DataFrame pandaInfo) { + public static DataFrame happyPandasPlaces(DataFrame pandaInfo) { return pandaInfo.filter(pandaInfo.col("happyPandas").geq(pandaInfo.col("totalPandas").divide(2))); } /** * Remove duplicate pandas by id. */ - public DataFrame removeDuplicates(DataFrame pandas) { + public static DataFrame removeDuplicates(DataFrame pandas) { DataFrame df = pandas.dropDuplicates(new String[]{"id"}); return df; } - public DataFrame describePandas(DataFrame pandas) { + public static DataFrame describePandas(DataFrame pandas) { return pandas.describe(); } - public DataFrame maxPandaSizePerZip(DataFrame pandas) { + public static DataFrame maxPandaSizePerZip(DataFrame pandas) { return pandas.groupBy(pandas.col("zip")).max("pandaSize"); } - public DataFrame minMaxPandaSizePerZip(DataFrame pandas) { + public static DataFrame minMaxPandaSizePerZip(DataFrame pandas) { return pandas.groupBy(pandas.col("zip")).agg(min("pandaSize"), max("pandaSize")); } - public DataFrame minPandaSizeMaxAgePerZip(DataFrame pandas) { + public static DataFrame minPandaSizeMaxAgePerZip(DataFrame pandas) { Map map = new HashMap<>(); map.put("pandaSize", "min"); map.put("age", "max"); @@ -142,11 +142,11 @@ public DataFrame minPandaSizeMaxAgePerZip(DataFrame pandas) { return df; } - public DataFrame minMeanSizePerZip(DataFrame pandas) { + public static DataFrame minMeanSizePerZip(DataFrame pandas) { return pandas.groupBy(pandas.col("zip")).agg(min(pandas.col("pandaSize")), mean(pandas.col("pandaSize"))); } - public DataFrame simpleSqlExample(DataFrame pandas) { + public static DataFrame simpleSqlExample(DataFrame pandas) { SQLContext sqlContext = pandas.sqlContext(); pandas.registerTempTable("pandas"); @@ -159,11 +159,11 @@ public DataFrame simpleSqlExample(DataFrame pandas) { * Pandas will be sorted by "size" first and if two pandas * have the same "size" will be sorted by "age". */ - public DataFrame orderPandas(DataFrame pandas) { + public static DataFrame orderPandas(DataFrame pandas) { return pandas.orderBy(pandas.col("pandaSize").asc(), pandas.col("age").desc()); } - public DataFrame computeRelativePandaSizes(DataFrame pandas) { + public static DataFrame computeRelativePandaSizes(DataFrame pandas) { //tag::relativePandaSizesWindow[] WindowSpec windowSpec = Window .orderBy(pandas.col("age")) @@ -179,7 +179,7 @@ public DataFrame computeRelativePandaSizes(DataFrame pandas) { //end::relativePandaSizesQuery[] } - public void joins(DataFrame df1, DataFrame df2) { + public static void joins(DataFrame df1, DataFrame df2) { //tag::innerJoin[] // Inner join implicit df1.join(df2, df1.col("name").equalTo(df2.col("name"))); @@ -203,7 +203,7 @@ public void joins(DataFrame df1, DataFrame df2) { //end::leftsemiJoin[] } - public DataFrame selfJoin(DataFrame df) { + public static DataFrame selfJoin(DataFrame df) { return df.as("a").join(df.as("b")).where(df.col("name").equalTo(df.col("name"))); }