-
Notifications
You must be signed in to change notification settings - Fork 34
error decoding response body
after upgrade to object store 0.10
#272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
error decoding response body
after upgrade to object store 0.10
I think we would need a reproducer to action this, the linked issues aren't even clearly implicating object_store |
Please also print the source of the error via |
@thomasfrederikhoeck @k-ye can you guys provide additional details please |
@Xuanwo I would love to be of more help but I don't now how to do this in delta-rs (an in turn object_store). I didn't help setting the timeout to 300s. @ion-elgreco Can you point me in the direction of how I can provide better logs? |
Hi, if you can consistently reproduce this issue, please change the following places: fn object_store_to_py(err: ObjectStoreError) -> PyErr {
match err {
ObjectStoreError::NotFound { .. } => PyFileNotFoundError::new_err(err.to_string()),
ObjectStoreError::Generic { source, .. }
if source.to_string().contains("AWS_S3_ALLOW_UNSAFE_RENAME") =>
{
DeltaProtocolError::new_err(source.to_string())
}
_ => PyIOError::new_err(err.to_string()),
}
} Don't use |
@Xuanwo Ah thanks!! I get the following consistently :
I also tried bumping the timeout to
|
I suspect there's an issue with the network connection between your environment and Azure. Could you provide more details about your setup?
|
@Xuanwo I might be network related but I have some feeling that is related to how
The benchmark took 1+ hours with no failure while the delta-rs call fails within a few minutes. |
@Xuanwo @tustvold I can concur this also happens to us in v0.18.1/2, I can see logs in LakeFS which says "context canceled". Somewhere in object store the connection is getting dropped constantly with large files. Can you guys give suggestions on how to debug. For me I am connecting within a VNET in EU amsterdam |
Are you mixing IO with CPU bound work, I wonder if you are stalling out the tokio runtime |
Hmm I am not sure, I started working on delta-rs a year ago and most of this FileSystem handling code was already there. We essentially create a DeltaFileSystemHandler which we expose to Python. In python we create a DeltaStorageHandler which inherits the pyarrow FileSystemHandler methods, which we implement to call the Rust DeltaFileSystemHandler. I think Pyarrow just calls read on an ObjectInputFile, which in rust calls
Here |
That at least looks plausible, how big are the ranges we're fetching and how long are we fetching for? I wonder if we're running into some Azure limit, it sounds like they're hanging up for some reason |
What do you mean with looks plausible? :) @tustvold I'll put some print statements in the ranges, to see what is being requested! Will get back to you on that! |
I can't see anything obviously wrong, but also don't know much about pyarrow so can't say definitively if it is correct |
@tustvold It seems pyarrow fetches 4 files in parallel and then reads around 30MB each time: https://gist.github.com/ion-elgreco/e2339990843755b40475dbd6e72e4697 |
That's on the chonkier end of optimal, but not ludicrous. How long do the fetches take? |
Hmm what would you suggest is more optimal? Like 10MB? So now my VPN connection throughput is working fine, so it seems each fetch takes around 4-8 secs. python/src/filesystem.rs:501:17] elapsed.as_secs() = 6
[python/src/filesystem.rs:473:9] (&self.path, &nbytes) = (
Path {
raw: "product_line_code=DUMMY/100-f1cafe66-476f-4818-8199-5c5a4a6eb4ef-0.parquet",
},
Some(
33231426,
),
) |
Oh... This is almost certainly what is causing this issue, very few VPNs will support large volume data transfer. It is almost certainly dropping the connections in the interest of preserving QoS for other users. Shuttling data through a VPN box is not only likely to be the cause of your issue, it is also likely very expensive. |
I see that could explain it for me, however my colleague saw timeouts on his Azure Compute instance, so Azure <-> Azure connection within our vnet |
I'm afraid I don't have any other ideas, something outside of object_store is dropping the connection. This could be Azure itself, Azure blob storage definitely gives off the impression of being an MVP that somehow got shipped, but it is more likely to be some middleware network appliance, like a VPN, NAT gateway or similar. AWS has private gateway endpoints that must be configured for S3, I am not sure if Azure needs something similar. |
@tustvold The weird thing is that I can run some rather large data opeartions (taking an 1+ hour) with I can maybe add: Before this PR in polars we sometimes saw similar issues but I'm very far from knowledge-able on networking. |
This is why I asked about starving the tokio threadpool, this does not appear to be the issue @ion-elgreco is running into from what he has shared.
Azcopy will be using multipart uploads, which uses smaller requests that are therefore less susceptible to dropped connections |
I got a bit confused myself here, but @thomasfrederikhoeck you have issues during Optimize where the data is being read differently. @tustvold, here it seems we read a Parquet object within a tokio task, should this be rayon threadpool instead? let stream = match operations {
OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
.flat_map(|(_, (partition, bins))| {
futures::stream::iter(bins).map(move |bin| (partition.clone(), bin))
})
.map(|(partition, files)| {
debug!(
"merging a group of {} files in partition {:?}",
files.len(),
partition,
);
for file in files.iter() {
debug!(" file {}", file.location);
}
let object_store_ref = log_store.object_store();
let batch_stream = futures::stream::iter(files.clone())
.then(move |file| {
let object_store_ref = object_store_ref.clone();
async move {
let file_reader = ParquetObjectReader::new(object_store_ref, file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
}
})
.try_flatten()
.boxed();
let rewrite_result = tokio::task::spawn(Self::rewrite_files(
self.task_parameters.clone(),
partition,
files,
log_store.object_store().clone(),
futures::future::ready(Ok(batch_stream)),
));
util::flatten_join_error(rewrite_result)
})
.boxed(), Later down in the code we read that stream from above one by one and cast each recordBatch which is cpu bound I guess? And then we write it as a parquet again: while let Some(maybe_batch) = read_stream.next().await {
let mut batch = maybe_batch?;
batch = super::cast::cast_record_batch(
&batch,
task_parameters.file_schema.clone(),
false,
true,
)?;
partial_metrics.num_batches += 1;
writer.write(&batch).await.map_err(DeltaTableError::from)?;
} |
You should avoid doing any non-trivial CPU-bound work on the tokio threadpool that you use for IO. The way I've seen this done successfully is running DF in one tokio threadpool, and then spawning IO from it into a different one. There was some work in the past to make this easier, see apache/arrow-rs#4040, but I never got it over the line. I'll file a ticket Edit: Filed apache/arrow-rs#6248 |
@tustvold thanks for the support! And insights 😄, not extreme expert on rust async yet, so I might need to look into this on how I could allocate or split up these pools. How fast do you think apache/arrow-rs#6248 could land? |
The issue pertains to how CPU bound work is starving IO, this side-channel will not be reflected in stack traces. Additionally there is something in-between that is connecting pyarrow to object_store, we don't provide such an integration. The delta-rs people will likely be best placed to comment on what this is. |
Let's leave this ticket open until we sort out the next steps (though I agree with @tustvold that I don't predict any code changes in arrow-rs) |
My question is "would you be willing to summarize this ticket / write up a blog post (perhaps on the DataFusion blog) explaining how to spawn IO related tasks on a different thread pool? I am 🎣 for help as I would like to write this blog too (so we can distill down this ticket and others for future discussion) but I am struggling to find time |
I think there's something else going on that is independent of the tokio runtime issues. I was able to reproduce this locally with a somewhat broken So I think we should extend the retry logic to capture this case. However this might be difficult in the streaming case (i.e. when the error occurs mid-stream), see At least we should try to improve the error message. It seems that the |
Retrying interrupted streaming requests is tracked by - #53 I'm a bit wary of this ticket just becoming a general dumping ground for any networking related issue, which is part of why I closed it... |
Error display improvements tracked by #48. |
Ok so here is my summary of this ticket, and the action items going forward. Problem The Causes There are two related causes of this:
Outcome As for the follow on work:
Please let me know if I have missed anything, otherwise I will look to close this issue in favour of the linked issues in the next few days. I think this issue has been very helpful, and I'm grateful for everyone who has participated, but I am keen to put this on a more actionable footing. |
I could but not soon, I recently started a new job so that's keeping me quite busy |
What I still don't quite get is that we are still seeing errors but only on the reading side through the DeltaFileHandler which is exposed into a pyarrow filesystem, there should be zero cpu bound tasks on that tokio runtime |
Perhaps it is related to just some networks errors and a retry of streaming and #53 tracks retrying interrupted streaming requests |
@alamb delta-io/delta-rs#2595 (comment) I asked them to add a timeout increase and that resolved it, i guess it would already help a lot if the true error surfaces, it might be all those folks have low network throughput |
I believe @itsjunetime may be able to take a look at improving the errors and retries. We'll keep the tickets updated |
I've opened a PR apache/arrow-rs#6519 that will retry on |
I've got the same error when I using ParquetRecordBatchStream with ParquetObjectReader. I found that it may relate to file size. When I try to read file size bigger than 335MB, reading file fails always. Not fail with until 335MB. I used ceph with S3 API. |
@Tangeroooo I think we're open for contributions to fix that. The essence is described in apache/arrow-rs#6519 (comment) or in my words: The |
@crepererum Thank you for your kind explanation! |
Since this keeps coming up, I filed a separate ticket to track |
I am convinced that this issue / error would be solved by retrying stream on errors |
Describe the bug
We bumped the object store to 0.10 in delta-rs, and now we already seeing a couple reports on the following error
error decoding response body
. Happens on Azure and S3.See delta-io/delta-rs#2595 and delta-io/delta-rs#2592
To Reproduce
Seems to occur when reading tables or doing operations on them.
Expected behavior
Don't have an issue decoding the response body
Additional context
@thomasfrederikhoeck @k-ye
The text was updated successfully, but these errors were encountered: