Skip to content

Commit

Permalink
[TINKERPOP-3133] Allow customize the output partition
Browse files Browse the repository at this point in the history
set the repartition number by, for example, 'gremlin.spark.outputRepartition=500'.
only integer values larger than 0 is valid input. Otherwise the
repartition will be skipped silently.
  • Loading branch information
Hongjiang Zhang committed Feb 14, 2025
1 parent bdba436 commit c28455a
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 7 deletions.
3 changes: 3 additions & 0 deletions docs/src/reference/implementations-spark.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ This can save a significant amount of time and space resources. If the `InputRDD
`SparkGraphComputer` will partition the graph using a `org.apache.spark.HashPartitioner` with the number of partitions
being either the number of existing partitions in the input (i.e. input splits) or the user specified number of `GraphComputer.workers()`.
If the provider/user finds there are many small HDFS files generated by `OutputRDD`. The option `gremlin.spark.outputRepartition`
can help to repartition the output according to the specified number. The option is disabled by default.
===== Storage Levels
The `SparkGraphComputer` uses `MEMORY_ONLY` to cache the input graph and the output graph by default. Users should be aware of the impact of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private Constants() {
public static final String GREMLIN_SPARK_SKIP_PARTITIONER = "gremlin.spark.skipPartitioner"; // don't partition the loadedGraphRDD
public static final String GREMLIN_SPARK_SKIP_GRAPH_CACHE = "gremlin.spark.skipGraphCache"; // don't cache the loadedGraphRDD (ignores graphStorageLevel)
public static final String GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT = "gremlin.spark.dontDeleteNonEmptyOutput"; // don't delete the output if it is not empty
public static final String GREMLIN_SPARK_OUTPUT_REPARTITION = "gremlin.spark.outputRepartition"; // allow set the repartition number of the outputRDD to reduce HDFS small files
public static final String SPARK_SERIALIZER = "spark.serializer";
public static final String SPARK_KRYO_REGISTRATOR = "spark.kryo.registrator";
public static final String SPARK_KRYO_REGISTRATION_REQUIRED = "spark.kryo.registrationRequired";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<O
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
if (null != outputLocation) {
// map back to a <nullwritable,vertexwritable> stream for output
graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
JavaPairRDD<Object, VertexWritable> javaPairRDD = repartitionJavaPairRDD(hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), graphRDD);
javaPairRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
.saveAsNewAPIHadoopFile(Constants.getGraphLocation(outputLocation),
NullWritable.class,
VertexWritable.class,
Expand All @@ -62,7 +63,8 @@ public <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration config
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
if (null != outputLocation) {
// map back to a Hadoop stream for output
memoryRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2())))
JavaPairRDD<K, V> javaPairRDD = repartitionJavaPairRDD(hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), memoryRDD);
javaPairRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2())))
.saveAsNewAPIHadoopFile(Constants.getMemoryLocation(outputLocation, memoryKey),
ObjectWritable.class,
ObjectWritable.class,
Expand All @@ -75,4 +77,4 @@ public <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration config
}
return Collections.emptyIterator();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,17 @@ public interface OutputRDD {
public default <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration configuration, final String memoryKey, final JavaPairRDD<K, V> memoryRDD) {
return Collections.emptyIterator();
}

/**
* Allow users to customize the RDD partitions to reduce HDFS small files
*/
public default <K, V> JavaPairRDD<K, V> repartitionJavaPairRDD(final String repartitionString, JavaPairRDD<K, V> graphRDD) {
JavaPairRDD<K, V> javaPairRDD = graphRDD;
final int repartition = null == repartitionString ? -1 : Integer.parseInt(repartitionString);
if (repartition > 0) {
javaPairRDD = javaPairRDD.repartition(repartition);
}
return javaPairRDD;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<O
SparkContextStorage.open(configuration).rm(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)); // this might be bad cause it unpersists the job RDD
// determine which storage level to persist the RDD as with MEMORY_ONLY being the default cache()
final StorageLevel storageLevel = StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY"));
final JavaPairRDD<Object, VertexWritable> javaPairRDD = repartitionJavaPairRDD(configuration.getString(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), graphRDD);
if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true))
graphRDD.mapValues(vertex -> {
javaPairRDD.mapValues(vertex -> {
vertex.get().dropEdges(Direction.BOTH);
return vertex;
}).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
// call action to eager store rdd
.count();
else
graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
javaPairRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
// call action to eager store rdd
.count();
Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
Expand All @@ -73,11 +74,12 @@ public <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration config
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
final String memoryRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey);
Spark.removeRDD(memoryRDDName);
memoryRDD.setName(memoryRDDName).persist(StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY")))
final JavaPairRDD<K, V> javaPairRDD = repartitionJavaPairRDD(configuration.getString(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), memoryRDD);
javaPairRDD.setName(memoryRDDName).persist(StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY")))
// call action to eager store rdd
.count();
Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
return IteratorUtils.map(memoryRDD.collect().iterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
return IteratorUtils.map(javaPairRDD.collect().iterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
}

@Override
Expand Down

0 comments on commit c28455a

Please sign in to comment.