Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TINKERPOP-3133] Allow customize the output partition #3026

Open
wants to merge 1 commit into
base: 3.7-dev
Choose a base branch
from

Conversation

ministat
Copy link

@ministat ministat commented Feb 9, 2025

set the repartition number by, for example, 'gremlin.spark.outputRepartition=500'. only integer values larger than 0 is valid input. Otherwise the repartition will be skipped silently.

@codecov-commenter
Copy link

codecov-commenter commented Feb 9, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 78.06%. Comparing base (cfd6889) to head (c28455a).
Report is 129 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3026      +/-   ##
============================================
+ Coverage     77.87%   78.06%   +0.19%     
- Complexity    13578    13926     +348     
============================================
  Files          1015     1019       +4     
  Lines         59308    59933     +625     
  Branches       6835     6950     +115     
============================================
+ Hits          46184    46787     +603     
+ Misses        10817    10814       -3     
- Partials       2307     2332      +25     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -46,9 +46,15 @@ public final class OutputFormatRDD implements OutputRDD {
public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
final String repartitionString = hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this int parsing can be moved inside the if block as it doesn't need to be done unless the outputLocation is non-null.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accept

graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
JavaPairRDD<Object, VertexWritable> javaPairRDD = graphRDD;
if (repartition > 0) {
javaPairRDD = javaPairRDD.repartition(repartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this configuration also apply to the writeMemoryRDD method? If not should the configuration name be more specific to exclude memory?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be applied to both GraphRDD and MemoryRDD.

private <K, V> JavaPairRDD<K, V> repartitionJavaPairRDD(final org.apache.hadoop.conf.Configuration hadoopConfiguration, JavaPairRDD<K, V> graphRDD) {
JavaPairRDD<K, V> javaPairRDD = graphRDD;
final String repartitionString = hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does PersistedOutputRDD also need to be aware of this configuration?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistedOutputRDD wants to persist the RDD, which is a little different from writing RDD to HDFS and does not generate small files. But in order to keep consistent, I also apply the change.
Here is the summary from ChatGPT:

In summary, persisting an RDD in Spark does not directly lead to the generation of small files in HDFS. The generation of small files in HDFS is more commonly associated with writing RDD data to HDFS using methods like saveAsTextFile, which can happen independently of persisting the RDD in memory or disk.

@andreachild
Copy link
Contributor

VOTE +1

@Cole-Greer
Copy link
Contributor

Cole-Greer commented Feb 13, 2025

What's the impact of this change if the users do not explicitly configure the spark output partitioning? Does this change impact the default behaviour in any meaningful way?

Also is this intended to be targeting the master branch or is it intended for 3.7-dev?

Could you also add a quick changelog entry and document the new configuration in https://github.com/apache/tinkerpop/blob/master/docs/src/reference/implementations-spark.asciidoc?

/**
* Allow users to customize the RDD partitions to reduce HDFS small files
*/
private static <K, V> JavaPairRDD<K, V> repartitionJavaPairRDD(final Configuration configuration, JavaPairRDD<K, V> graphRDD) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can pull this common code into the shared interface OutputRDD as a default method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@ministat
Copy link
Author

ministat commented Feb 14, 2025

What's the impact of this change if the users do not explicitly configure the spark output partitioning? Does this change impact the default behaviour in any meaningful way?

If not explicitly configure this option, the output partitions number is determined by the input dataset. If the input data contains many partitions based on the default partition policy in Spark, that will cause small HDFS files problem. That is why I create this PR. But if user can tolerate small file problem, it is ok to not configure this option.

Also is this intended to be targeting the master branch or is it intended for 3.7-dev?

In my previous PR, I targeted 3.7-dev, so I follow it here. Shall I change to target for master?

Could you also add a quick changelog entry and document the new configuration in https://github.com/apache/
tinkerpop/blob/master/docs/src/reference/implementations-spark.asciidoc?

Sure

set the repartition number by, for example, 'gremlin.spark.outputRepartition=500'.
only integer values larger than 0 is valid input. Otherwise the
repartition will be skipped silently.
@Cole-Greer
Copy link
Contributor

Also is this intended to be targeting the master branch or is it intended for 3.7-dev?

In my previous PR, I targeted 3.7-dev, so I follow it here. Shall I change to target for master?

I'm not sure which previous PR you're referencing here. Our branching strategy is such that any change to an older development branch will be merged up into newer dev branches such that it gets included in all upcoming releases. A PR which targets 3.7-dev will also get merged up into master, but a PR which targets master will only be merged there. In other words, a PR which targets 3.7-dev will be included in the upcoming 3.7.4 and 4.0.0 releases, where a PR targeting master will only be included in the 4.0.0 release.

This is a non-breaking change so it is ok to target 3.7-dev if you would like it included in 3.7.4.

VOTE +1 (pending confirmation of desired target branch)

@ministat
Copy link
Author

Thanks for explanation. I hope this change can be included in the upcoming release.

@Cole-Greer Cole-Greer changed the base branch from master to 3.7-dev February 18, 2025 19:22
@Cole-Greer
Copy link
Contributor

I hope this change can be included in the upcoming release.

I've re-targeted this PR for 3.7-dev

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants