Skip to content

Commit

Permalink
Remove squishPandaFromPace example
Browse files Browse the repository at this point in the history
  • Loading branch information
mahmoudhanafy authored and Mahmoud Hanafy committed May 19, 2016
1 parent d276733 commit c40b69d
Showing 1 changed file with 2 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
package com.highperformancespark.examples.dataframe;

import com.highperformancespark.examples.objects.JavaRawPanda;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2;
import scala.collection.JavaConversions;
import scala.collection.mutable.Buffer;
import scala.reflect.api.TypeTags;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.spark.sql.functions.*;

Expand Down Expand Up @@ -97,37 +89,6 @@ public DataFrame minHappyPandas(DataFrame pandaInfo, int minHappyPandas) {
return pandaInfo.filter(pandaInfo.col("happyPandas").geq(minHappyPandas));
}

/**
* Extra the panda info from panda places and compute the squisheness of the panda
*/
public DataFrame squishPandaFromPace(DataFrame pandaPlace) {
Buffer<Column> inputCols = JavaConversions.asScalaBuffer(Arrays.asList(pandaPlace.col("pandas")));

TypeTags.TypeTag tag = null; // TODO don't know how to create Type Tag in java ??

DataFrame pandaInfo = pandaPlace.explode(inputCols.toList(), r -> {
List<Row> pandas = r.getList(0);
List<JavaRawPanda> rawPandasList = pandas
.stream()
.map(a -> {
long id = a.getLong(0);
String zip = a.getString(1);
String pt = a.getString(2);
boolean happy = a.getBoolean(3);
List<Double> attrs = a.getList(4);
return new JavaRawPanda(id, zip, pt, happy, attrs);
}).collect(Collectors.toList());

return JavaConversions.asScalaBuffer(rawPandasList);
}, tag);

DataFrame squishyness =
pandaInfo.select((pandaInfo.col("attributes").apply(0).divide(pandaInfo.col("attributes")).apply(1))
.as("squishyness"));

return squishyness;
}

/**
* Find pandas that are sad.
*/
Expand Down Expand Up @@ -201,8 +162,8 @@ public void startJDBCServer(HiveContext sqlContext) {

/**
* Orders pandas by size ascending and by age descending.
* Pandas will be sorted by "size" first and if two pandas have the same "size"
* will be sorted by "age".
* Pandas will be sorted by "size" first and if two pandas
* have the same "size" will be sorted by "age".
*/
public DataFrame orderPandas(DataFrame pandas) {
return pandas.orderBy(pandas.col("pandaSize").asc(), pandas.col("age").desc());
Expand Down

0 comments on commit c40b69d

Please sign in to comment.