-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Question on reliability #11
Comments
It depends. If a shuffle block cannot be found, then the whole stage is recomputed. In our case we store all shuffle files on S3, so the files should all be reachable. It could be that Spark thinks that the Shuffle blocks are unreachable. In this case you can try to set |
indeed I understand why this is defaulted to true ( because of potential safety issues ) but isn't the biggest potential of this plugin only possible when this is set to false? wasn't this meant to avoid recomputation? terrific plugin by the way. this has the potential to make our jobs quite reliable today in one of our testing, we saw a lot of our threads BLOCKED - some form of lock contention on s3ainputstream
which we think is due to multiple threads referring to the same s3ainputstream. but at the time of shuffle, won't multiple threads end up reading from the same s3 file albeit from different offsets? |
I don't remember the exact case. However I'm currently trying to better understand this issue with a fork of this plugin. In this fork I observed that
Huh, that's interesting! You can work around this issue by setting
Thank you! Let me know if this plugin works for you! You might also want to try this pull-request I just pushed: https://github.com/IBM/spark-s3-shuffle/pull/13/files (some configuration changes required). |
Is this being tracked somewhere publicly where I can take a look? |
if interested, I could help. it will be worth checking if it is just a bad metric being reported or the fact that there are indeed fewer records being processed |
Unfortunately, I don't have the tools readily available for sharing right now.
One issue we might observe is the following: Objects might not be immediately become visible for file listing after writing. This issue might be exacerbate since spark-s3-shuffle writes a large amount of small files. The Hadoop-AWS filesystem implementation mentions this in the "Warning" section here. |
that's right. I suspected that so I spoke to AWS about their strong consistency guarantees and they said the listing API is strongly consistent as long as you can make sure you do the listing after your writes have completed. And in spark we have the ordering guarantee since stages only run after their parents have computed ideally, the # of files created would be the sum of number of partition per parent stage right? would it be possible to attain this info and do a naive checksum? We could set some debug log
|
Also, worth confirming if https://github.com/IBM/spark-s3-shuffle/blob/main/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala#L158 is a bug shouldn't the range be half-open
instead of
? |
Yes, that's correct. |
@pspoerri what do you think of using an object pool for cached blocks? or randomizing the block ids so that at any point in time there is a less chance that multiple task in the same executor will read the same block. i think object pool makes more sense to give you context. some of our jobs have stages such that stage 1 has 71k tasks and the shuffle partition is 17k i.e. next stage has 17k tasks this results in 71k files on s3 and each task in the next stage has to query 71k files. luckily, most of the blocks that a task wants to read are empty. the driver filters them out. we see roughly 75% of our blocks to be empty luckily. each task then makes 71k * 0.25=17k GET calls ( depending on fadvise it can be a little bit more ). the total GET calls would be 71k * 0.25 * 17k=300k calls anyway, object pooling can help more threads to create s3ainputstreams instead of being locked onto 1 input stream. lastly, parallelism can help improve throughput. all of this should be configurable |
You can try this pull-request here: https://github.com/IBM/spark-s3-shuffle/pull/19/files - The idea is that a BufferedInputStream is pre-filled up to a prefetch size, and then read by an input stream. Note: I haven't had time to properly test this yet. Let me know if this helps. |
I'm not sure if commit e8a1537 is helpful when reading from S3. |
@fhalde there's now three configuration options which might be of value to you:
They are all available in version 0.8 and 0.8-spark3.1. Let me know if this helps you. |
Can someone explain to me what really happens to the shuffle files on s3 when an executor is killed? Does it get migrated to a different block manager, more specifically does the map status on the driver get updated to a new block manager? Or does it get recomputed?
We were evaluating the s3 shuffle service from AWS which is not really working the way we intended it to and want to try out this plugin
We expect that once the shuffle files are stored in s3, then, if the executor dies, the shuffle files that were written can still be reusable and that stages don't fail because of fetch failed errors
The text was updated successfully, but these errors were encountered: