-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributed execution #549
Comments
"Data is read and written in each container" does that mean data is stored in a local disk ? that can quickly be a bottleneck for downloading images (and even more for videos) |
Each component streams the data from and to either local disk or remote storage and passes a reference to the stored data to the next component. This "passing by reference" is different from the "passing by value" happening in Apache Spark or similar frameworks. This design is a trade-off, but allows for the following optimizations:
The approaches for distributed execution mentioned above would make this trade-off a bit "softer", as you could define a DAG within a certain component, which is then executed on a cluster which does pass data by value between the DAG steps. We already noticed that this trade-off plays a big part in deciding where to put component boundaries when designing a pipeline. Sometimes it's more efficient to bundle functionality in a single component, but it lowers reusability of the separate functionalities. Where do you see the bottleneck for downloading images and videos? I assume it's not in the downloading itself, but in combination with other functionality that needs the image / video data? |
What we usually do in img2dataset and video2dataset is to do all intermediary processing in memory, read the input from remote storage with fsspec and write the output to remote storage with fsspec. That allows scaling to any amount of data (for example many dozens TBs) Writing to disk between each component means you introduce
That said I agree the properties you mention are nice. I wonder if introducing the possibility to have this intermediary space be in memory would mitigate some of the issues it introduces. Do you think that's possible? Do you already support reading and writing from/to remote storages (s3,GCS,hdfs,...) through fsspec for the initial input and final output ? |
Just to clarify what I'm saying: I think one very important point to think about is the granularity of the data.
I found having shard be the unit that component manipulate is convenient and makes it possible to reduce issues with temporary data, redundancy etc. I'm wondering how that question (shards) plays with your concept of intermediary storage between components |
This is exactly what a single Fondant component does. It uses I might have created some confusion by mentioning local disk in this regard, since the local runner can use local disk as "remote storage" as well. Within a component, the local disk is not explicitly used, although data might spill from memory to disk when running low on memory.
We indeed have shards, and we call them partitions as well. The data on the remote storage is partitioned, and the partitions can be read, processed, and written one by one. A component actually creates a (Dask) computation graph of its work, which can be executed on the partitions separately. The graph is currently executed inside the component, which means that it can only use the single machine the component is running on. With this context in mind, the approach we would take to enable distributed execution, is to enable distribution within a single component, by submitting the computation graph as a job to a kubernetes / dask / spark cluster instead. The component will still read its input data and write its output data to / from remote storeage. If we would want to implement img2dataset and video2dataset using Fondant, we should evaluate which parts need to be implemented in a single component, and which parts can be split into separate components. |
Ok, I see. Doing distribution inside a component means one component = one
job. I think that limits the scope of potential components.
I am interested by a different kind of "component" (currently called
subsampler in video2dataset) which looks like
- downloader, resizer in img2dataset
- downloader, frame fps subsampling, transcription, ... (See subsamplers
folder) in video2dataset
I think these components should live inside the computation graph.
This can be much more flexible than having dozens of jobs. Probably that
means it's probably best to do this outside of fondant, it's fine.
I understand your plans for fondant, I guess the next step is for you all
to try implementing these jobs / components and see how well that'd work on
some scaled up examples.
Basically you could just have a img2dataset fondant component if you like.
For the current implementation it probably makes sense.
…On Mon, Oct 30, 2023, 17:10 Robbe Sneyders ***@***.***> wrote:
What we usually do in img2dataset and video2dataset is to do all
intermediary processing in memory, read the input from remote storage with
fsspec and write the output to remote storage with fsspec. That allows
scaling to any amount of data (for example many dozens TBs)
This is exactly what a single Fondant component does. It uses fsspec to
read the input data from, and write the output data to remote storage.
I might have created some confusion by mentioning local disk in this
regard, since the local runner can use local disk as "remote storage" as
well. Within a component, the local disk is not explicitly used, although
data might spill from memory to disk when running low on memory.
I've been assuming you have that too but is that true actually are you
using shards ? Or are you working at the level of the whole dataset?
We indeed have shards, and we call them partitions as well. The data on
the remote storage is partitioned, and the partitions can be read,
processed, and written one by one.
A component actually creates a (Dask) computation graph of its work, which
can be executed on the partitions separately. The graph is currently
executed inside the component, which means that it can only use the single
machine the component is running on.
------------------------------
With this context in mind, the approach we would take to enable
distributed execution, is to enable distribution *within* a single
component, by submitting the computation graph as a job to a kubernetes /
dask / spark cluster instead. The component will still read its input data
and write its output data to / from remote storeage.
If we would want to implement *img2dataset* and *video2dataset* using
Fondant, we should evaluate which parts need to be implemented in a single
component, and which parts can be split into separate components.
—
Reply to this email directly, view it on GitHub
<#549 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAR437WJOHS5M435JDEX7XLYB5VIJAVCNFSM6AAAAAA6NZA356VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTOOBUG43DQNBUHE>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
I think there's benefits of having them within a single component or split across components.
Not all steps need access to all the data though. Some of the subsamplers only use the index (eg. the frame subsampler), only audio data (eg. whisper subsampler), or only image data (eg. caption subsampler). Since fondant only reads the data that a component needs, this reduces the cost of writing & reading the data between components. |
Fondant currently supports execution across cores on a single machine, which enables workflows using millions of images, but becomes a bottleneck when scaling to tens or hundreds of millions of images.
This issue aims to describe how we can move to execution distributed across multiple machines.
The orchestrators we currently support, or are looking to support in the near future, share the following defining features:
containers.
execution of a component across a cluster of machines is not supported out of the box.
Based on this description, we should implement distributed execution as a component that launches a distributed job "somewhere else" and monitors its progress.
This "somewhere else" can be different places:
It seems like the managed orchestrator provide managed versions of these options as well:
To decide how to move forward, I think we need to answer the following questions (among others):
Due to the complexity of this topic, we probably need to create a PoC to understand the possibilities and constraints of the different orchestrators.
Tasks
The text was updated successfully, but these errors were encountered: