Skip to content

[SPARK-25299][LIVE-DIFF] Use remote storage for persisting shuffle data #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: upstream-master
Choose a base branch
from

Conversation

ifilonenko
Copy link
Collaborator

What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-25299

How was this patch tested?

  • Compiles
  • Unit Tests
  • E2e Test

yifeih and others added 5 commits April 3, 2019 17:08
…ache-spark-on-k8s#498)

* add initial bypass merge sort shuffle writer benchmarks

* dd unsafe shuffle writer benchmarks

* changes in bypassmergesort benchmarks

* cleanup

* add circle script

* add this branch for testing

* fix circle attempt 1

* checkout code

* add some caches?

* why is it not pull caches...

* save as artifact instead of publishing

* mkdir

* typo

* try uploading artifacts again

* try print per iteration to avoid circle erroring out on idle

* blah (apache-spark-on-k8s#495)

* make a PR comment

* actually delete files

* run benchmarks on test build branch

* oops forgot to enable upload

* add sort shuffle writer benchmarks

* add stdev

* cleanup sort a bit

* fix stdev text

* fix sort shuffle

* initial code for read side

* format

* use times and sample stdev

* add assert for at least one iteration

* cleanup shuffle write to use fewer mocks and single base interface

* shuffle read works with transport client... needs lots of cleaning

* test running in cicle

* scalastyle

* dont publish results yet

* cleanup writer code

* get only git message

* fix command to get PR number

* add SortshuffleWriterBenchmark

* writer code

* cleanup

* fix benchmark script

* use ArgumentMatchers

* also in shufflewriterbenchmarkbase

* scalastyle

* add apache license

* fix some scale stuff

* fix up tests

* only copy benchmarks we care about

* increase size for reader again

* delete two writers and reader for PR

* SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506)

* Revert "SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506)"

This reverts commit 9d46fae.

* add -e to bash script

* blah

* enable upload as a PR comment and prevent running benchmarks on this branch

* Revert "enable upload as a PR comment and prevent running benchmarks on this branch"

This reverts commit 13703fa.

* try machine execution

* try uploading benchmarks (apache-spark-on-k8s#498)

* only upload results when merging into the feature branch

* lock down machine image

* don't write input data to disk

* run benchmark test

* stop creating file cleanup threads for every block manager

* use alphanumeric again

* use a new random everytime

* close the writers -__________-

* delete branch and publish results as comment

* close in finally
…pache-spark-on-k8s#524)

Implements the shuffle writer API by writing shuffle files to local disk and using the index block resolver to commit data and write index files.

The logic in `BypassMergeSortShuffleWriter` has been refactored to use the base implementation of the plugin instead.

APIs have been slightly renamed to clarify semantics after considering nuances in how these are to be implemented by other developers.

Follow-up commits are to come for `SortShuffleWriter` and `UnsafeShuffleWriter`.

Ported from #6, credits to @ifilonenko.
@ifilonenko ifilonenko force-pushed the spark-25299-live-diff branch from 95c0ef7 to 4dd5955 Compare April 4, 2019 00:11
mccheah and others added 13 commits April 7, 2019 10:14
apache-spark-on-k8s#532)

* [SPARK-25299] Use the shuffle writer plugin for the SortShuffleWriter.

* Remove unused

* Handle empty partitions properly.

* Adjust formatting

* Don't close streams twice.

Because compressed output streams don't like it.

* Clarify comment
Implements the shuffle locations API as part of SPARK-25299.

This adds an additional field to all `MapStatus` objects: a `MapShuffleLocations` that indicates where a task's map output is stored. This module is optional and implementations of the pluggable shuffle writers and readers can ignore it accordingly.

This API is designed with the use case in mind of future plugin implementations desiring to have the driver store metadata about where shuffle blocks are stored.

There are a few caveats to this design:

- We originally wanted to remove the `BlockManagerId` from `MapStatus` entirely and replace it with this object. However, doing this proves to be very difficult, as many places use the block manager ID for other kinds of shuffle data bookkeeping. As a result, we concede to storing the block manager ID redundantly here. However, the overhead should be minimal: because we cache block manager ids and default map shuffle locations, the two fields in `MapStatus` should point to the same object on the heap. Thus we add `O(M)` storage overhead on the driver, where for each map status we're storing an additional pointer to the same on-heap object. We will run benchmarks against the TPC-DS workload to see if there are significant performance repercussions for this implementation.

- `KryoSerializer` expects `CompressedMapStatus` and `HighlyCompressedMapStatus` to be serialized via reflection, so originally all fields of these classes needed to be registered with Kryo. However, the `MapShuffleLocations` is now pluggable. We think however that previously Kryo was defaulting to Java serialization anyways, so we now just explicitly tell Kryo to use `ExternalizableSerializer` to deal with these objects. There's a small hack in the serialization protocol that attempts to avoid serializing the same `BlockManagerId` twice in the case that the map shuffle locations is a `DefaultMapShuffleLocations`.
…tion ids (apache-spark-on-k8s#540)

We originally made the shuffle map output writer API behave like an iterator in fetching the "next" partition writer. However, the shuffle writer implementations tend to skip opening empty partitions. If we used an iterator-like API though we would be tied down to opening a partition writer for every single partition, even if some of them are empty. Here, we go back to using specific partition identifiers to give us more freedom to avoid needing to create writers for empty partitions.
Fix the stubbing of the reader benchmark tests
…pache-spark-on-k8s#535)

* Propose a new NIO transfer API for partition writing.

This solves the consistency and resource leakage concerns with the first iteration of thie API, where it
would not be obvious that the streamable resources created by ShufflePartitionWriter needed to be closed by
ShuffleParittionWriter#close as opposed to closing the resources directly.

This introduces the following adjustments:

- Channel-based writes are separated out to their own module, SupportsTransferTo. This allows the transfer-to
  APIs to be modified independently, and users that only provide output streams can ignore the NIO APIs entirely.
  This also allows us to mark the base ShufflePartitionWriter as a stable API eventually while keeping the NIO
  APIs marked as experimental or developer-api.

- We add APIs that explicitly encodes the notion of transferring bytes from one source to another. The partition
  writer returns an instance of TransferrableWritableByteChannel, which has APIs for accepting a
  TransferrableReadableByteChannel and can tell the readable byte channel to transfer its bytes out to some
  destination sink.

- The resources returned by ShufflePartitionWriter are always closed. Internally, DefaultMapOutputWriter keeps
  resources open until commitAllPartitions() is called.

* Migrate unsafe shuffle writer to use new byte channel API.

* More sane implementation for unsafe

* Fix style

* Address comments

* Fix imports

* Fix build

* Fix more build problems

* Address comments.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants