Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TINKERPOP-3133] Allow customize the output partition #3026

Open
wants to merge 1 commit into
base: 3.7-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/src/reference/implementations-spark.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ This can save a significant amount of time and space resources. If the `InputRDD
`SparkGraphComputer` will partition the graph using a `org.apache.spark.HashPartitioner` with the number of partitions
being either the number of existing partitions in the input (i.e. input splits) or the user specified number of `GraphComputer.workers()`.

If the provider/user finds there are many small HDFS files generated by `OutputRDD`. The option `gremlin.spark.outputRepartition`
can help to repartition the output according to the specified number. The option is disabled by default.

===== Storage Levels

The `SparkGraphComputer` uses `MEMORY_ONLY` to cache the input graph and the output graph by default. Users should be aware of the impact of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private Constants() {
public static final String GREMLIN_SPARK_SKIP_PARTITIONER = "gremlin.spark.skipPartitioner"; // don't partition the loadedGraphRDD
public static final String GREMLIN_SPARK_SKIP_GRAPH_CACHE = "gremlin.spark.skipGraphCache"; // don't cache the loadedGraphRDD (ignores graphStorageLevel)
public static final String GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT = "gremlin.spark.dontDeleteNonEmptyOutput"; // don't delete the output if it is not empty
public static final String GREMLIN_SPARK_OUTPUT_REPARTITION = "gremlin.spark.outputRepartition"; // allow set the repartition number of the outputRDD to reduce HDFS small files
public static final String SPARK_SERIALIZER = "spark.serializer";
public static final String SPARK_KRYO_REGISTRATOR = "spark.kryo.registrator";
public static final String SPARK_KRYO_REGISTRATION_REQUIRED = "spark.kryo.registrationRequired";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<O
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
if (null != outputLocation) {
// map back to a <nullwritable,vertexwritable> stream for output
graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
JavaPairRDD<Object, VertexWritable> javaPairRDD = repartitionJavaPairRDD(hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), graphRDD);
javaPairRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
.saveAsNewAPIHadoopFile(Constants.getGraphLocation(outputLocation),
NullWritable.class,
VertexWritable.class,
Expand All @@ -62,7 +63,8 @@ public <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration config
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
if (null != outputLocation) {
// map back to a Hadoop stream for output
memoryRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2())))
JavaPairRDD<K, V> javaPairRDD = repartitionJavaPairRDD(hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), memoryRDD);
javaPairRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2())))
.saveAsNewAPIHadoopFile(Constants.getMemoryLocation(outputLocation, memoryKey),
ObjectWritable.class,
ObjectWritable.class,
Expand All @@ -75,4 +77,4 @@ public <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration config
}
return Collections.emptyIterator();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,17 @@ public interface OutputRDD {
public default <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration configuration, final String memoryKey, final JavaPairRDD<K, V> memoryRDD) {
return Collections.emptyIterator();
}

/**
* Allow users to customize the RDD partitions to reduce HDFS small files
*/
public default <K, V> JavaPairRDD<K, V> repartitionJavaPairRDD(final String repartitionString, JavaPairRDD<K, V> graphRDD) {
JavaPairRDD<K, V> javaPairRDD = graphRDD;
final int repartition = null == repartitionString ? -1 : Integer.parseInt(repartitionString);
if (repartition > 0) {
javaPairRDD = javaPairRDD.repartition(repartition);
}
return javaPairRDD;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<O
SparkContextStorage.open(configuration).rm(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)); // this might be bad cause it unpersists the job RDD
// determine which storage level to persist the RDD as with MEMORY_ONLY being the default cache()
final StorageLevel storageLevel = StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY"));
final JavaPairRDD<Object, VertexWritable> javaPairRDD = repartitionJavaPairRDD(configuration.getString(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), graphRDD);
if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true))
graphRDD.mapValues(vertex -> {
javaPairRDD.mapValues(vertex -> {
vertex.get().dropEdges(Direction.BOTH);
return vertex;
}).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
// call action to eager store rdd
.count();
else
graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
javaPairRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
// call action to eager store rdd
.count();
Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
Expand All @@ -73,11 +74,12 @@ public <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration config
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
final String memoryRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey);
Spark.removeRDD(memoryRDDName);
memoryRDD.setName(memoryRDDName).persist(StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY")))
final JavaPairRDD<K, V> javaPairRDD = repartitionJavaPairRDD(configuration.getString(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), memoryRDD);
javaPairRDD.setName(memoryRDDName).persist(StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY")))
// call action to eager store rdd
.count();
Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
return IteratorUtils.map(memoryRDD.collect().iterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
return IteratorUtils.map(javaPairRDD.collect().iterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
}

@Override
Expand Down
Loading