forked from apache-spark-on-k8s/spark
-
Notifications
You must be signed in to change notification settings - Fork 6
[SPARK-25299][LIVE-DIFF] Use remote storage for persisting shuffle data #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ifilonenko
wants to merge
18
commits into
upstream-master
Choose a base branch
from
spark-25299-live-diff
base: upstream-master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…ache-spark-on-k8s#498) * add initial bypass merge sort shuffle writer benchmarks * dd unsafe shuffle writer benchmarks * changes in bypassmergesort benchmarks * cleanup * add circle script * add this branch for testing * fix circle attempt 1 * checkout code * add some caches? * why is it not pull caches... * save as artifact instead of publishing * mkdir * typo * try uploading artifacts again * try print per iteration to avoid circle erroring out on idle * blah (apache-spark-on-k8s#495) * make a PR comment * actually delete files * run benchmarks on test build branch * oops forgot to enable upload * add sort shuffle writer benchmarks * add stdev * cleanup sort a bit * fix stdev text * fix sort shuffle * initial code for read side * format * use times and sample stdev * add assert for at least one iteration * cleanup shuffle write to use fewer mocks and single base interface * shuffle read works with transport client... needs lots of cleaning * test running in cicle * scalastyle * dont publish results yet * cleanup writer code * get only git message * fix command to get PR number * add SortshuffleWriterBenchmark * writer code * cleanup * fix benchmark script * use ArgumentMatchers * also in shufflewriterbenchmarkbase * scalastyle * add apache license * fix some scale stuff * fix up tests * only copy benchmarks we care about * increase size for reader again * delete two writers and reader for PR * SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506) * Revert "SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506)" This reverts commit 9d46fae. * add -e to bash script * blah * enable upload as a PR comment and prevent running benchmarks on this branch * Revert "enable upload as a PR comment and prevent running benchmarks on this branch" This reverts commit 13703fa. * try machine execution * try uploading benchmarks (apache-spark-on-k8s#498) * only upload results when merging into the feature branch * lock down machine image * don't write input data to disk * run benchmark test * stop creating file cleanup threads for every block manager * use alphanumeric again * use a new random everytime * close the writers -__________- * delete branch and publish results as comment * close in finally
…-on-k8s#520) Introduces the new Shuffle Writer API. Ported from #5.
…pache-spark-on-k8s#524) Implements the shuffle writer API by writing shuffle files to local disk and using the index block resolver to commit data and write index files. The logic in `BypassMergeSortShuffleWriter` has been refactored to use the base implementation of the plugin instead. APIs have been slightly renamed to clarify semantics after considering nuances in how these are to be implemented by other developers. Follow-up commits are to come for `SortShuffleWriter` and `UnsafeShuffleWriter`. Ported from #6, credits to @ifilonenko.
95c0ef7
to
4dd5955
Compare
apache-spark-on-k8s#532) * [SPARK-25299] Use the shuffle writer plugin for the SortShuffleWriter. * Remove unused * Handle empty partitions properly. * Adjust formatting * Don't close streams twice. Because compressed output streams don't like it. * Clarify comment
…on-k8s#536) Ported from #9. Credits to @ifilonenko!
Implements the shuffle locations API as part of SPARK-25299. This adds an additional field to all `MapStatus` objects: a `MapShuffleLocations` that indicates where a task's map output is stored. This module is optional and implementations of the pluggable shuffle writers and readers can ignore it accordingly. This API is designed with the use case in mind of future plugin implementations desiring to have the driver store metadata about where shuffle blocks are stored. There are a few caveats to this design: - We originally wanted to remove the `BlockManagerId` from `MapStatus` entirely and replace it with this object. However, doing this proves to be very difficult, as many places use the block manager ID for other kinds of shuffle data bookkeeping. As a result, we concede to storing the block manager ID redundantly here. However, the overhead should be minimal: because we cache block manager ids and default map shuffle locations, the two fields in `MapStatus` should point to the same object on the heap. Thus we add `O(M)` storage overhead on the driver, where for each map status we're storing an additional pointer to the same on-heap object. We will run benchmarks against the TPC-DS workload to see if there are significant performance repercussions for this implementation. - `KryoSerializer` expects `CompressedMapStatus` and `HighlyCompressedMapStatus` to be serialized via reflection, so originally all fields of these classes needed to be registered with Kryo. However, the `MapShuffleLocations` is now pluggable. We think however that previously Kryo was defaulting to Java serialization anyways, so we now just explicitly tell Kryo to use `ExternalizableSerializer` to deal with these objects. There's a small hack in the serialization protocol that attempts to avoid serializing the same `BlockManagerId` twice in the case that the map shuffle locations is a `DefaultMapShuffleLocations`.
…tion ids (apache-spark-on-k8s#540) We originally made the shuffle map output writer API behave like an iterator in fetching the "next" partition writer. However, the shuffle writer implementations tend to skip opening empty partitions. If we used an iterator-like API though we would be tied down to opening a partition writer for every single partition, even if some of them are empty. Here, we go back to using specific partition identifiers to give us more freedom to avoid needing to create writers for empty partitions.
Fix the stubbing of the reader benchmark tests
Introduce driver shuffle lifecycle APIs
…pache-spark-on-k8s#535) * Propose a new NIO transfer API for partition writing. This solves the consistency and resource leakage concerns with the first iteration of thie API, where it would not be obvious that the streamable resources created by ShufflePartitionWriter needed to be closed by ShuffleParittionWriter#close as opposed to closing the resources directly. This introduces the following adjustments: - Channel-based writes are separated out to their own module, SupportsTransferTo. This allows the transfer-to APIs to be modified independently, and users that only provide output streams can ignore the NIO APIs entirely. This also allows us to mark the base ShufflePartitionWriter as a stable API eventually while keeping the NIO APIs marked as experimental or developer-api. - We add APIs that explicitly encodes the notion of transferring bytes from one source to another. The partition writer returns an instance of TransferrableWritableByteChannel, which has APIs for accepting a TransferrableReadableByteChannel and can tell the readable byte channel to transfer its bytes out to some destination sink. - The resources returned by ShufflePartitionWriter are always closed. Internally, DefaultMapOutputWriter keeps resources open until commitAllPartitions() is called. * Migrate unsafe shuffle writer to use new byte channel API. * More sane implementation for unsafe * Fix style * Address comments * Fix imports * Fix build * Fix more build problems * Address comments.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-25299
How was this patch tested?