Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question on reliability #11

Open
fhalde opened this issue Mar 12, 2023 · 14 comments
Open

Question on reliability #11

fhalde opened this issue Mar 12, 2023 · 14 comments

Comments

@fhalde
Copy link
Contributor

fhalde commented Mar 12, 2023

Can someone explain to me what really happens to the shuffle files on s3 when an executor is killed? Does it get migrated to a different block manager, more specifically does the map status on the driver get updated to a new block manager? Or does it get recomputed?

We were evaluating the s3 shuffle service from AWS which is not really working the way we intended it to and want to try out this plugin

We expect that once the shuffle files are stored in s3, then, if the executor dies, the shuffle files that were written can still be reusable and that stages don't fail because of fetch failed errors

@fhalde
Copy link
Contributor Author

fhalde commented Mar 13, 2023

@ibm-open-source-bot

@pspoerri
Copy link
Contributor

It depends. If a shuffle block cannot be found, then the whole stage is recomputed. In our case we store all shuffle files on S3, so the files should all be reachable.

It could be that Spark thinks that the Shuffle blocks are unreachable. In this case you can try to set spark.shuffle.s3.useBlockManager=false. In this case the shuffle blocks are computed by listing the shuffle dir: https://github.com/IBM/spark-s3-shuffle/blob/main/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala#L141-L166

@fhalde
Copy link
Contributor Author

fhalde commented Mar 14, 2023

@pspoerri

indeed spark.shuffle.s3.useBlockManager=false helped alleviate the problem of recomputation. but that flag comes with a warning that it can result in invalid outputs for certain workloads. do we know what kind of workload this might be and what invalid outputs are we referring to here?

I understand why this is defaulted to true ( because of potential safety issues ) but isn't the biggest potential of this plugin only possible when this is set to false? wasn't this meant to avoid recomputation?

terrific plugin by the way. this has the potential to make our jobs quite reliable

today in one of our testing, we saw a lot of our threads BLOCKED - some form of lock contention on s3ainputstream

"Executor task launch worker for task 1054.0 in stage 8.0 (TID 81190)" #53 daemon prio=5 os_prio=0 tid=0x00007f9f250ce000 nid=0x1c53 waiting for monitor entry [0x00007f9ff4f74000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.hadoop.fs.s3a.S3AInputStream.readFully(S3AInputStream.java:779)
	- waiting to lock <0x00007fa948e096d0> (a org.apache.hadoop.fs.s3a.S3AInputStream)
	at org.apache.hadoop.fs.FSDataInputStream.readFully(FSDataInputStream.java:117)
	at org.apache.spark.storage.S3ShuffleBlockStream.liftedTree3$1(S3ShuffleBlockStream.scala:94)
	at org.apache.spark.storage.S3ShuffleBlockStream.read(S3ShuffleBlockStream.scala:93)
	- locked <0x00007fa949b291d8> (a org.apache.spark.storage.S3ShuffleBlockStream)
	at net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:271)
	at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:192)
	at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:159)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	- locked <0x00007fa949cdc740> (a java.io.BufferedInputStream)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:129)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:496)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)

which we think is due to multiple threads referring to the same s3ainputstream. but at the time of shuffle, won't multiple threads end up reading from the same s3 file albeit from different offsets?

@pspoerri
Copy link
Contributor

pspoerri commented Mar 14, 2023

do we know what kind of workload this might be and what invalid outputs are we referring to here?

I don't remember the exact case. However I'm currently trying to better understand this issue with a fork of this plugin. In this fork I observed that bytes read and records read does not match for some queries when I run TPC-DS (with spark.shuffle.s3.useBlockManager=false).

today in one of our testing, we saw a lot of our threads BLOCKED - some form of lock contention on s3ainputstream

Huh, that's interesting! You can work around this issue by setting spark.shuffle.s3.supportsUnbuffer=false. This should open a separate stream for each block.

terrific plugin by the way. this has the potential to make our jobs quite reliable

Thank you! Let me know if this plugin works for you!

You might also want to try this pull-request I just pushed: https://github.com/IBM/spark-s3-shuffle/pull/13/files (some configuration changes required).
This should help improve performance when writing data to S3. It also fixes the issue with TeraSort.

@fhalde
Copy link
Contributor Author

fhalde commented Mar 14, 2023

In this fork I observed that bytes read and records read does not match for some queries when I run TPC-DS (with spark.shuffle.s3.useBlockManager=false).

Is this being tracked somewhere publicly where I can take a look?

@pspoerri

@fhalde
Copy link
Contributor Author

fhalde commented Mar 14, 2023

In this fork I observed that bytes read and records read does not match for some queries when I run TPC-DS

if interested, I could help. it will be worth checking if it is just a bad metric being reported or the fact that there are indeed fewer records being processed

@pspoerri
Copy link
Contributor

pspoerri commented Mar 15, 2023

Unfortunately, I don't have the tools readily available for sharing right now.

In this fork I observed that bytes read and records read does not match for some queries when I run TPC-DS (with spark.shuffle.s3.useBlockManager=false).

Is this being tracked somewhere publicly where I can take a look?

One issue we might observe is the following: Objects might not be immediately become visible for file listing after writing. This issue might be exacerbate since spark-s3-shuffle writes a large amount of small files. The Hadoop-AWS filesystem implementation mentions this in the "Warning" section here.

@fhalde
Copy link
Contributor Author

fhalde commented Mar 15, 2023

that's right. I suspected that so I spoke to AWS about their strong consistency guarantees and they said the listing API is strongly consistent as long as you can make sure you do the listing after your writes have completed. And in spark we have the ordering guarantee since stages only run after their parents have computed

ideally, the # of files created would be the sum of number of partition per parent stage right? would it be possible to attain this info and do a naive checksum?

We could set some debug log

  1. Log each time a new file shuffle file is created
  2. Log the number of files list API returns

@fhalde
Copy link
Contributor Author

fhalde commented Mar 15, 2023

Also, worth confirming if https://github.com/IBM/spark-s3-shuffle/blob/main/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala#L158 is a bug

shouldn't the range be half-open [startMapIndex,endMapIndex)? ( i was reading the spark MapOutputTracker )

block.mapId >= startMapIndex && block.mapId < endMapIndex

instead of

block.mapId >= startMapIndex && block.mapId <= endMapIndex

?

#15

@pspoerri
Copy link
Contributor

Yes, that's correct.

@fhalde
Copy link
Contributor Author

fhalde commented Mar 22, 2023

@pspoerri
any way to parallelize the calls to s3 in the iterator? it seems sequential today

what do you think of using an object pool for cached blocks? or randomizing the block ids so that at any point in time there is a less chance that multiple task in the same executor will read the same block. i think object pool makes more sense

to give you context. some of our jobs have stages such that stage 1 has 71k tasks and the shuffle partition is 17k i.e. next stage has 17k tasks

this results in 71k files on s3 and each task in the next stage has to query 71k files. luckily, most of the blocks that a task wants to read are empty. the driver filters them out. we see roughly 75% of our blocks to be empty luckily. each task then makes 71k * 0.25=17k GET calls ( depending on fadvise it can be a little bit more ). the total GET calls would be 71k * 0.25 * 17k=300k calls

anyway, object pooling can help more threads to create s3ainputstreams instead of being locked onto 1 input stream. lastly, parallelism can help improve throughput. all of this should be configurable

@pspoerri
Copy link
Contributor

You can try this pull-request here: https://github.com/IBM/spark-s3-shuffle/pull/19/files - The idea is that a BufferedInputStream is pre-filled up to a prefetch size, and then read by an input stream.

Note: I haven't had time to properly test this yet. Let me know if this helps.

@pspoerri
Copy link
Contributor

I'm not sure if commit e8a1537 is helpful when reading from S3.

@pspoerri
Copy link
Contributor

@fhalde there's now three configuration options which might be of value to you:

  • spark.shuffle.s3.prefetchBatchSize - controls the number of async calls that prefetch shuffle blocks.
  • spark.shuffle.s3.prefetchThreadPoolSize - controls the size of the thread pool.
  • spark.shuffle.s3.folderPrefixes - controls the number of prefixes which is useful when you want to avoid any issues with AWS S3 (see here: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html ). The default, 10 prefixes allows 55'000 read requests per second.

They are all available in version 0.8 and 0.8-spark3.1.

Let me know if this helps you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants