Skip to content

Commit 072c496

Browse files
author
Mahmoud Hanafy
committed
Port LoadSave to Java
1 parent 6ce7baf commit 072c496

File tree

2 files changed

+151
-5
lines changed

2 files changed

+151
-5
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package com.highperformancespark.examples.dataframe;
2+
3+
import com.highperformancespark.examples.objects.JavaPandaPlace;
4+
import com.highperformancespark.examples.objects.JavaRawPanda;
5+
import org.apache.spark.api.java.JavaRDD;
6+
import org.apache.spark.sql.*;
7+
import org.apache.spark.sql.types.*;
8+
9+
import java.util.List;
10+
import java.util.Properties;
11+
import java.util.stream.Collectors;
12+
13+
public class JavaLoadSave {
14+
private SQLContext sqlContext;
15+
16+
public JavaLoadSave(SQLContext sqlContext) {
17+
this.sqlContext = sqlContext;
18+
}
19+
20+
//tag::createFromRDD[]
21+
public DataFrame createFromJavaBean(JavaRDD<JavaPandaPlace> input) {
22+
// Create DataFrame using Java Bean
23+
DataFrame df1 = sqlContext.createDataFrame(input, JavaPandaPlace.class);
24+
25+
// Create DataFrame using JavaRDD<Row>
26+
JavaRDD<Row> rowRDD = input.map(pm -> RowFactory.create(pm.getName(),
27+
pm.getPandas().stream()
28+
.map(pi -> RowFactory.create(pi.getId(), pi.getZip(), pi.isHappy(), pi.getAttributes()))
29+
.collect(Collectors.toList())));
30+
31+
ArrayType pandasType = DataTypes.createArrayType(new StructType(
32+
new StructField[]{
33+
new StructField("id", DataTypes.LongType, true, Metadata.empty()),
34+
new StructField("zip", DataTypes.StringType, true, Metadata.empty()),
35+
new StructField("happy", DataTypes.BooleanType, true, Metadata.empty()),
36+
new StructField("attributes", DataTypes.createArrayType(DataTypes.FloatType), true, Metadata.empty())
37+
}
38+
));
39+
40+
StructType schema = new StructType(new StructField[]{
41+
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
42+
new StructField("pandas", pandasType, true, Metadata.empty())
43+
});
44+
45+
DataFrame df2 = sqlContext.createDataFrame(rowRDD, schema);
46+
return df2;
47+
}
48+
//end::createFromRDD[]
49+
50+
//tag::createFromLocal[]
51+
public DataFrame createFromLocal(List<PandaPlace> input) {
52+
return sqlContext.createDataFrame(input, PandaPlace.class);
53+
}
54+
//end::createFromLocal[]
55+
56+
//tag::collectResults[]
57+
public Row[] collectDF(DataFrame df) {
58+
return df.collect();
59+
}
60+
//end::collectResults[]
61+
62+
//tag::toRDD[]
63+
public JavaRDD<JavaRawPanda> toRDD(DataFrame input) {
64+
JavaRDD<JavaRawPanda> rdd = input.javaRDD().map(row -> new JavaRawPanda(row.getLong(0), row.getString(1),
65+
row.getString(2), row.getBoolean(3), row.getList(4)));
66+
return rdd;
67+
}
68+
//end::toRDD[]
69+
70+
//tag::partitionedOutput[]
71+
public void writeOutByZip(DataFrame input) {
72+
input.write().partitionBy("zipcode").format("json").save("output/");
73+
}
74+
//end::partitionedOutput[]
75+
76+
//tag::saveAppend[]
77+
public void writeAppend(DataFrame input) {
78+
input.write().mode(SaveMode.Append).save("output/");
79+
}
80+
//end::saveAppend[]
81+
82+
public DataFrame createJDBC() {
83+
//tag::createJDBC[]
84+
DataFrame df1 = sqlContext.read().jdbc("jdbc:dialect:serverName;user=user;password=pass",
85+
"table", new Properties());
86+
87+
DataFrame df2 = sqlContext.read().format("jdbc")
88+
.option("url", "jdbc:dialect:serverName")
89+
.option("dbtable", "table").load();
90+
91+
return df2;
92+
//end::createJDBC[]
93+
}
94+
95+
public void writeJDBC(DataFrame df) {
96+
//tag::writeJDBC[]
97+
df.write().jdbc("jdbc:dialect:serverName;user=user;password=pass",
98+
"table", new Properties());
99+
100+
df.write().format("jdbc")
101+
.option("url", "jdbc:dialect:serverName")
102+
.option("user", "user")
103+
.option("password", "pass")
104+
.option("dbtable", "table").save();
105+
//end::writeJDBC[]
106+
}
107+
108+
//tag::loadParquet[]
109+
public DataFrame loadParquet(String path) {
110+
// Configure Spark to read binary data as string, note: must be configured on SQLContext
111+
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true");
112+
113+
// Load parquet data using merge schema (configured through option)
114+
DataFrame df = sqlContext.read()
115+
.option("mergeSchema", "true")
116+
.format("parquet")
117+
.load(path);
118+
119+
return df;
120+
}
121+
//end::loadParquet[]
122+
123+
//tag::writeParquet[]
124+
public void writeParquet(DataFrame df, String path) {
125+
df.write().format("parquet").save(path);
126+
}
127+
//end::writeParquet[]
128+
129+
//tag::loadHiveTable[]
130+
public DataFrame loadHiveTable() {
131+
return sqlContext.read().table("pandas");
132+
}
133+
//end::loadHiveTable[]
134+
135+
//tag::saveManagedTable[]
136+
public void saveManagedTable(DataFrame df) {
137+
df.write().saveAsTable("pandas");
138+
}
139+
//end::saveManagedTable[]
140+
141+
}

src/main/scala/com/high-performance-spark-examples/dataframe/LoadSave.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ case class LoadSave(sqlContext: SQLContext) {
2323
val rowRDD = input.map(pm => Row(pm.name,
2424
pm.pandas.map(pi => Row(pi.id, pi.zip, pi.happy, pi.attributes))))
2525

26+
val pandasType = ArrayType(StructType(List(
27+
StructField("id", LongType, true),
28+
StructField("zip", StringType, true),
29+
StructField("happy", BooleanType, true),
30+
StructField("attributes", ArrayType(FloatType), true))))
31+
2632
// Create DataFrame explicitly with specified schema
2733
val schema = StructType(List(StructField("name", StringType, true),
28-
StructField("pandas", ArrayType(StructType(List(
29-
StructField("id", LongType, true),
30-
StructField("zip", StringType, true),
31-
StructField("happy", BooleanType, true),
32-
StructField("attributes", ArrayType(FloatType), true)))))))
34+
StructField("pandas", pandasType)))
3335

3436
val df3 = sqlContext.createDataFrame(rowRDD, schema)
3537
}
@@ -72,6 +74,7 @@ case class LoadSave(sqlContext: SQLContext) {
7274
//tag::createJDBC[]
7375
sqlContext.read.jdbc("jdbc:dialect:serverName;user=user;password=pass",
7476
"table", new Properties)
77+
7578
sqlContext.read.format("jdbc")
7679
.option("url", "jdbc:dialect:serverName")
7780
.option("dbtable", "table").load()
@@ -82,6 +85,7 @@ case class LoadSave(sqlContext: SQLContext) {
8285
//tag::writeJDBC[]
8386
df.write.jdbc("jdbc:dialect:serverName;user=user;password=pass",
8487
"table", new Properties)
88+
8589
df.write.format("jdbc")
8690
.option("url", "jdbc:dialect:serverName")
8791
.option("user", "user")
@@ -94,6 +98,7 @@ case class LoadSave(sqlContext: SQLContext) {
9498
def loadParquet(path: String): DataFrame = {
9599
// Configure Spark to read binary data as string, note: must be configured on SQLContext
96100
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
101+
97102
// Load parquet data using merge schema (configured through option)
98103
sqlContext.read
99104
.option("mergeSchema", "true")

0 commit comments

Comments
 (0)