-
Notifications
You must be signed in to change notification settings - Fork 819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[TINKERPOP-3133] Allow customize the output partition #3026
base: 3.7-dev
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #3026 +/- ##
============================================
+ Coverage 77.87% 78.06% +0.19%
- Complexity 13578 13926 +348
============================================
Files 1015 1019 +4
Lines 59308 59933 +625
Branches 6835 6950 +115
============================================
+ Hits 46184 46787 +603
+ Misses 10817 10814 -3
- Partials 2307 2332 +25 ☔ View full report in Codecov by Sentry. |
@@ -46,9 +46,15 @@ public final class OutputFormatRDD implements OutputRDD { | |||
public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) { | |||
final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration); | |||
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION); | |||
final String repartitionString = hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this int parsing can be moved inside the if block as it doesn't need to be done unless the outputLocation
is non-null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accept
graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2())) | ||
JavaPairRDD<Object, VertexWritable> javaPairRDD = graphRDD; | ||
if (repartition > 0) { | ||
javaPairRDD = javaPairRDD.repartition(repartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this configuration also apply to the writeMemoryRDD
method? If not should the configuration name be more specific to exclude memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should be applied to both GraphRDD and MemoryRDD.
3697f88
to
e3ff08f
Compare
private <K, V> JavaPairRDD<K, V> repartitionJavaPairRDD(final org.apache.hadoop.conf.Configuration hadoopConfiguration, JavaPairRDD<K, V> graphRDD) { | ||
JavaPairRDD<K, V> javaPairRDD = graphRDD; | ||
final String repartitionString = hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does PersistedOutputRDD
also need to be aware of this configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PersistedOutputRDD wants to persist the RDD, which is a little different from writing RDD to HDFS and does not generate small files. But in order to keep consistent, I also apply the change.
Here is the summary from ChatGPT:
In summary, persisting an RDD in Spark does not directly lead to the generation of small files in HDFS. The generation of small files in HDFS is more commonly associated with writing RDD data to HDFS using methods like saveAsTextFile, which can happen independently of persisting the RDD in memory or disk.
e3ff08f
to
41edcc0
Compare
VOTE +1 |
What's the impact of this change if the users do not explicitly configure the spark output partitioning? Does this change impact the default behaviour in any meaningful way? Also is this intended to be targeting the master branch or is it intended for 3.7-dev? Could you also add a quick changelog entry and document the new configuration in https://github.com/apache/tinkerpop/blob/master/docs/src/reference/implementations-spark.asciidoc? |
/** | ||
* Allow users to customize the RDD partitions to reduce HDFS small files | ||
*/ | ||
private static <K, V> JavaPairRDD<K, V> repartitionJavaPairRDD(final Configuration configuration, JavaPairRDD<K, V> graphRDD) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you can pull this common code into the shared interface OutputRDD
as a default method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
If not explicitly configure this option, the output partitions number is determined by the input dataset. If the input data contains many partitions based on the default partition policy in Spark, that will cause small HDFS files problem. That is why I create this PR. But if user can tolerate small file problem, it is ok to not configure this option.
In my previous PR, I targeted 3.7-dev, so I follow it here. Shall I change to target for master?
Sure |
set the repartition number by, for example, 'gremlin.spark.outputRepartition=500'. only integer values larger than 0 is valid input. Otherwise the repartition will be skipped silently.
41edcc0
to
c28455a
Compare
I'm not sure which previous PR you're referencing here. Our branching strategy is such that any change to an older development branch will be merged up into newer dev branches such that it gets included in all upcoming releases. A PR which targets 3.7-dev will also get merged up into master, but a PR which targets master will only be merged there. In other words, a PR which targets 3.7-dev will be included in the upcoming 3.7.4 and 4.0.0 releases, where a PR targeting master will only be included in the 4.0.0 release. This is a non-breaking change so it is ok to target 3.7-dev if you would like it included in 3.7.4. VOTE +1 (pending confirmation of desired target branch) |
Thanks for explanation. I hope this change can be included in the upcoming release. |
I've re-targeted this PR for 3.7-dev |
set the repartition number by, for example, 'gremlin.spark.outputRepartition=500'. only integer values larger than 0 is valid input. Otherwise the repartition will be skipped silently.