Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed execution #549

Open
1 task
RobbeSneyders opened this issue Oct 24, 2023 · 7 comments
Open
1 task

Distributed execution #549

RobbeSneyders opened this issue Oct 24, 2023 · 7 comments
Labels
Infrastructure Infrastructure and deployment

Comments

@RobbeSneyders
Copy link
Member

RobbeSneyders commented Oct 24, 2023

Fondant currently supports execution across cores on a single machine, which enables workflows using millions of images, but becomes a bottleneck when scaling to tens or hundreds of millions of images.

This issue aims to describe how we can move to execution distributed across multiple machines.


The orchestrators we currently support, or are looking to support in the near future, share the following defining features:

  • They orchestrate docker containers. Data is read and written in each container, and only data locations are passed between
    containers.
  • Each container is executed on a single machine. The machine to execute on can be chosen per component, but distributed
    execution of a component across a cluster of machines is not supported out of the box.

Based on this description, we should implement distributed execution as a component that launches a distributed job "somewhere else" and monitors its progress.

This "somewhere else" can be different places:

  • On the Kubernetes cluster where the orchestrator is running by starting multiple kubernetes jobs from the component. (Eg. see these KfP components to start distributed training jobs with different frameworks)
  • On a remote cluster (eg. Dask, Spark) by connecting to an http endpoint and submitting a distributed job.

It seems like the managed orchestrator provide managed versions of these options as well:


To decide how to move forward, I think we need to answer the following questions (among others):


Due to the complexity of this topic, we probably need to create a PoC to understand the possibilities and constraints of the different orchestrators.

Tasks

Preview Give feedback
  1. 0 of 12
    Core Infrastructure enhancement
    GeorgesLorre
@rom1504
Copy link

rom1504 commented Oct 24, 2023

"Data is read and written in each container" does that mean data is stored in a local disk ? that can quickly be a bottleneck for downloading images (and even more for videos)

@RobbeSneyders RobbeSneyders moved this from Backlog to Breakdown in Fondant development Oct 25, 2023
@RobbeSneyders
Copy link
Member Author

Each component streams the data from and to either local disk or remote storage and passes a reference to the stored data to the next component. This "passing by reference" is different from the "passing by value" happening in Apache Spark or similar frameworks.

This design is a trade-off, but allows for the following optimizations:

  • Components only load the data that they use, and only write the data they change
  • The index of the data is tracked separately, which makes filtering very cheap
  • Data lineage is built-in, as we have a snapshot of the data between each pipeline step
  • These snapshots are minimized by referring to unchanged parts of previous snapshots instead of copying them
  • Pipelines can easily be resumed from any step based on the snapshot

The approaches for distributed execution mentioned above would make this trade-off a bit "softer", as you could define a DAG within a certain component, which is then executed on a cluster which does pass data by value between the DAG steps.

We already noticed that this trade-off plays a big part in deciding where to put component boundaries when designing a pipeline. Sometimes it's more efficient to bundle functionality in a single component, but it lowers reusability of the separate functionalities.

Where do you see the bottleneck for downloading images and videos? I assume it's not in the downloading itself, but in combination with other functionality that needs the image / video data?

@rom1504
Copy link

rom1504 commented Oct 25, 2023

What we usually do in img2dataset and video2dataset is to do all intermediary processing in memory, read the input from remote storage with fsspec and write the output to remote storage with fsspec. That allows scaling to any amount of data (for example many dozens TBs)

Writing to disk between each component means you introduce

  • a bottleneck on the local disk space : it would only work if this intermediary storage is temporary and for N simulatenous shards such as the size of N shards fit on local disk
  • a bottleneck on local disk speed : P components reading/writing to their respective intermediary storage would use the local disk speed. It probably requires SSDs for most usage

That said I agree the properties you mention are nice.

I wonder if introducing the possibility to have this intermediary space be in memory would mitigate some of the issues it introduces. Do you think that's possible?

Do you already support reading and writing from/to remote storages (s3,GCS,hdfs,...) through fsspec for the initial input and final output ?

@rom1504
Copy link

rom1504 commented Oct 25, 2023

Just to clarify what I'm saying: I think one very important point to think about is the granularity of the data.

  • one sample: one image, one minute of video, one short text, one minute of audio
  • one shard: around 10000 samples. Between 256MB and 4GB is usually good. Makes it possible to decrease the total amount of units to manipulate
  • whole dataset: N shards. N is usually between 100 and 1M which are manageable numbers

I found having shard be the unit that component manipulate is convenient and makes it possible to reduce issues with temporary data, redundancy etc.
Spark usually call this partitioning. Other distributed frameworks have similar concepts.
I've been assuming you have that too but is that true actually are you using shards ? Or are you working at the level of the whole dataset?

I'm wondering how that question (shards) plays with your concept of intermediary storage between components

@RobbeSneyders
Copy link
Member Author

What we usually do in img2dataset and video2dataset is to do all intermediary processing in memory, read the input from remote storage with fsspec and write the output to remote storage with fsspec. That allows scaling to any amount of data (for example many dozens TBs)

This is exactly what a single Fondant component does. It uses fsspec to read the input data from, and write the output data to remote storage.

I might have created some confusion by mentioning local disk in this regard, since the local runner can use local disk as "remote storage" as well. Within a component, the local disk is not explicitly used, although data might spill from memory to disk when running low on memory.

I've been assuming you have that too but is that true actually are you using shards ? Or are you working at the level of the whole dataset?

We indeed have shards, and we call them partitions as well. The data on the remote storage is partitioned, and the partitions can be read, processed, and written one by one.

A component actually creates a (Dask) computation graph of its work, which can be executed on the partitions separately. The graph is currently executed inside the component, which means that it can only use the single machine the component is running on.


With this context in mind, the approach we would take to enable distributed execution, is to enable distribution within a single component, by submitting the computation graph as a job to a kubernetes / dask / spark cluster instead. The component will still read its input data and write its output data to / from remote storeage.

If we would want to implement img2dataset and video2dataset using Fondant, we should evaluate which parts need to be implemented in a single component, and which parts can be split into separate components.

@rom1504
Copy link

rom1504 commented Oct 30, 2023 via email

@RobbeSneyders
Copy link
Member Author

I think there's benefits of having them within a single component or split across components.

  • If you combine them in a component, data only needs to be read & written once, but if you want to reprocess a step, you need to reprocess all steps.
  • If you split them across steps, you need to write the data and read it again, but you can reprocess only the steps you want to reprocess.

Not all steps need access to all the data though. Some of the subsamplers only use the index (eg. the frame subsampler), only audio data (eg. whisper subsampler), or only image data (eg. caption subsampler). Since fondant only reads the data that a component needs, this reduces the cost of writing & reading the data between components.

@RobbeSneyders RobbeSneyders moved this from Breakdown to Backlog in Fondant development Nov 6, 2023
@RobbeSneyders RobbeSneyders added the Infrastructure Infrastructure and deployment label Dec 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Infrastructure Infrastructure and deployment
Projects
Status: Backlog
Development

No branches or pull requests

2 participants