-
Notifications
You must be signed in to change notification settings - Fork 794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize take/filter from multiple input arrays to a single large output array #6692
Comments
FYI
|
@alamb just to clarify your idea is to modify the existing |
If creating a new one helps, no reason not to do it. |
I think the idea is sound in principle, but needs a concrete API proposal. I'm not sure the proposed builder API makes sense, as the typing for nested types like ListBuilder and DictionaryBuilder is not what we want here, and they can't easily be type erased. We also ideally want to avoid overly bloating the arrow-array crate with kernel logic. This isn't even touching on the fact these kernels don't use the builders for performance reasons. I think we'd need to introduce a new type-erased MutableArray abstraction or something, potentially replacing the rather problematic MutableArrayData. The only remaining challenge concerns dictionaries, as the output dictionary needs to be computed up front. Simply not supporting dictonaries could potentially be a valid workaround though. |
Not sure about other type but for StringView, I can only think of iterating all the filtered row and |
'append_value' does a copy though. Wouldn't that effectively still be a large amount of copies? |
With this approach, we reduce the copying to a single step. Compared to the current approach, where copying happens in multiple stages (filtering, garbage collection, and coalescing), my proposal combines these steps into one. While benchmarks are needed to confirm any performance gains, this method should, at the very least, not perform worse than the existing one |
That makes sense. This would be a change downstream within Datafusion then correct? |
yes, you can work on it if you want to |
So it sounds like the consensus is to work out how this might look downstream in DataFusion (maybe starting with StringView as that is what is giving us the most trouble now) and then use some of that knowledge to propose something upstream in Arrow -- sounds like a good idea to me |
@jayzhan211 yes I think this is effectively what would happen -- however the actual iteration over filtered values is quite optimized in the The fact that |
This is reasonable -- though I could imagine adding type erased builders like
Is there some fundamental reason the builders can't made faster? If we could make the builders fast enough to use for filter that would seem to be valuable in its own right. But I am likely just dreaming here
A builder based approach could help (e.g. optimize for the case where the input batches had the same dictionary and handle the case where they didn't -- either via deferred computation or on the fly or something else) |
Does approach seems like that for filter?
And at least, we can avoid generate multiple small batches, and concat them(a ton of copies) when big enough. |
This sort of partially type-erased API seems like the worst of both worlds, you either want something that is completely type-erased (e.g. MutableArrayData), or fully typed (e.g. ListBuilder). I could see us adding some sort of
Not without changing their APIs 😅. For the primitive builders one could simply move the current kernel implementations into the builders, but this doesn't really achieve much IMO.
Yeah, it gets very complicated and fiddly. A similar challenge likely exists for StringView, although I'm not sure what level of sophistication we've reached w.r.t automatic GC.
That would be a very naive way to implement the filter kernel, I would encourage looking at what the selection kernels actually do. |
I agree with it seems a naive version for filter. Is it possible to public something like
|
arrow-rs/arrow-select/src/filter.rs Line 740 in e9bf8aa
This line of code extends buffer (byte copied) regardless of the filtered result, I think it is the reason why we need gc. If we do |
To be clear -- I think the copy is of a |
I would try to implement a builder like in datafusion |
Didn't see improvement on this approach apache/datafusion#13450 |
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Upstream in DataFusion, there is a common common pattern where we have multiple input
RecordBatch
es and want to produce an outputRecordBatch
with some subset of the rows from the input batches. This happens inFilterExec
-->CoalesceBatchesExec
when filteringRepartitionExec
-->CoalesceBatchesExec
The kernels used here are:
FilterExec
usesfilter
, takes a single inputArray
and produces a single outputArray
RepartitionExec
usestake
, which also takes a single inputArray
and produces a single outputArray``RepartitionExec
each take a single input batch and produce a single outputArray
CoalesceBatchesExec
callsconcat
which takes multple Arrays and produces a single Array as outputThe use of these kernels and patterns has two downsides:
filter
/take
immediately copies the data, which is copied again inCoalesceBatches
(see illustration below)RecordBatch
es with StringView may consume significant amounts of memory for mostly filtered rows, which requires us to run gc periodically which actually slows some things down (see Reduce copying inCoalesceBatchesExec
for StringViews datafusion#11628)Here is an ascii art picture (from apache/datafusion#7957) that shows the extra copy in action
Describe the solution you'd like
I would like to apply
filter
/take
to each incomingRecordBatch
as it arrives, copying the data to an in progress output array, in a way that is as fast as thefilter
andtake
operations. This would reduce the extra copy that is currently required.Note this is somewhat like the
interleave
kernel, except thatusize
batch index is not needed)Describe alternatives you've considered
One thing I have thought about is extending the builders so they can append more than one row at a time. For example:
Builder::append_filtered
Builder::append_take
So for example, to filter a stream of StringViewArrays I might do something like;
And also add an equivalent for
append_take
I think if we did this right, it wouldn't be a lot of new code, we could just refactor the existing filter/take implementations. For example, I would expect that the
filter
kernel would then devolve into something likeAdditional context
CoalesceBatchesExec
to improve performance datafusion#7957The text was updated successfully, but these errors were encountered: