Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datatrove fails to handle tasks >1k with slurm job arrays #238

Open
stas00 opened this issue Jul 5, 2024 · 25 comments
Open

datatrove fails to handle tasks >1k with slurm job arrays #238

stas00 opened this issue Jul 5, 2024 · 25 comments

Comments

@stas00
Copy link

stas00 commented Jul 5, 2024

If I have more tasks than 1k, datatrove splits it into multiple job arrays 1k-each.

the first job array of 1k runs fine, the subsequent ones all fail

0: 2024-07-04 23:59:34.496 | ERROR    | datatrove.executor.base:_run_for_rank:108 - list index out of range
0: Traceback (most recent call last):
0: 
0:   File "/env/lib/conda/ctx-shared/bin/launch_pickled_pipeline", line 8, in <module>
0:     sys.exit(main())
0:     │   │    └ <function main at 0x7f4b5f9fd6c0>
0:     │   └ <built-in function exit>
0:     └ <module 'sys' (built-in)>
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/tools/launch_pickled_pipeline.py", line 18, in main
0:     executor.run()
0:     │        └ <function SlurmPipelineExecutor.run at 0x7f4b5ccee4d0>
0:     └ <datatrove.executor.slurm.SlurmPipelineExecutor object at 0x7f4b5ccb7c10>
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/executor/slurm.py", line 180, in run
0:     self._run_for_rank(rank)
0:     │    │             └ 7113
0:     │    └ <function PipelineExecutor._run_for_rank at 0x7f4b5ccedb40>
0:     └ <datatrove.executor.slurm.SlurmPipelineExecutor object at 0x7f4b5ccb7c10>
0: > File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/executor/base.py", line 96, in _run_for_rank
0:     deque(pipelined_data, maxlen=0)
0:     │     └ <generator object DiskWriter.run at 0x7f4a9570eab0>
0:     └ <class 'collections.deque'>
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/pipeline/writers/disk_base.py", line 176, in run
0:     for document in data:
0:                     └ <generator object BaseFilter.run at 0x7f4a9570dcb0>
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/pipeline/filters/base_filter.py", line 47, in run
0:     for doc in data:
0:                └ <generator object HuggingFaceDatasetReader.run at 0x7f4a9ad4edc0>
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/pipeline/readers/huggingface.py", line 96, in run
0:     shard = self._get_dataset_shard(ds, rank, world_size)
0:             │    │                  │   │     └ 100000
0:             │    │                  │   └ 7113
0:             │    │                  └ IterableDataset({
0:             │    │                        features: ['text', 'id', 'dump', 'url', 'date', 'file_path', 'language', 'language_score', 'token_count...
0:             │    └ <function HuggingFaceDatasetReader._get_dataset_shard at 0x7f4b5ccef5b0>
0:             └ 📖 - READER: 🤗 HuggingFace
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/pipeline/readers/huggingface.py", line 69, in _get_dataset_shard
0:     ex_iterable = dst._ex_iterable.shard_data_sources(rank, world_size)
0:                   │   │            │                  │     └ 100000
0:                   │   │            │                  └ 7113
0:                   │   │            └ <function ArrowExamplesIterable.shard_data_sources at 0x7f48750bcaf0>
0:                   │   └ <datasets.iterable_dataset.ArrowExamplesIterable object at 0x7f47a012ea10>
0:                   └ IterableDataset({
0:                         features: ['text', 'id', 'dump', 'url', 'date', 'file_path', 'language', 'language_score', 'token_count...
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datasets/iterable_dataset.py", line 298, in shard_data_sources
0:     requested_gen_kwargs = _merge_gen_kwargs([gen_kwargs_list[i] for i in shard_indices])
0:                            │                  │                           └ []
0:                            │                  └ [{'files': [<datasets.download.streaming_download_manager.FilesIterable object at 0x7f4b4dd99fc0>]}, {'files': [<datasets.dow...
0:                            └ <function _merge_gen_kwargs at 0x7f487508f880>
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datasets/utils/sharding.py", line 76, in _merge_gen_kwargs
0:     for key in gen_kwargs_list[0]
0:                └ []
0:
0: IndexError: list index out of range
0: Traceback (most recent call last):
0:   File "/env/lib/conda/ctx-shared/bin/launch_pickled_pipeline", line 8, in <module>
0:     sys.exit(main())
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/tools/launch_pickled_pipeline.py", line 18, in main
0:     executor.run()
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/executor/slurm.py", line 180, in run
0:     self._run_for_rank(rank)
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/executor/base.py", line 109, in _run_for_rank
0:     raise e
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/executor/base.py", line 96, in _run_for_rank
0:     deque(pipelined_data, maxlen=0)
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/pipeline/writers/disk_base.py", line 176, in run
0:     for document in data:
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/pipeline/filters/base_filter.py", line 47, in run
0:     for doc in data:
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/pipeline/readers/huggingface.py", line 96, in run
0:     shard = self._get_dataset_shard(ds, rank, world_size)
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datatrove/pipeline/readers/huggingface.py", line 69, in _get_dataset_shard
0:     ex_iterable = dst._ex_iterable.shard_data_sources(rank, world_size)
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datasets/iterable_dataset.py", line 298, in shard_data_sources
0:     requested_gen_kwargs = _merge_gen_kwargs([gen_kwargs_list[i] for i in shard_indices])
0:   File "/env/lib/conda/ctx-shared/lib/python3.10/site-packages/datasets/utils/sharding.py", line 76, in _merge_gen_kwargs
0:     for key in gen_kwargs_list[0]
0: IndexError: list index out of range
srun: error: dojo-a3-ghpc-41: task 0: Exited with exit code 1

This failing behavior is consistent

This is with datatrove@main - I can't use the official release as it doesn't support datasets streaming.

This is just doing a slightly modified FineWeb filter from the example.

@guipenedo
Copy link
Collaborator

I think this is due to the number of available dataset shards/files (possibly lower than 7113, the rank in the logs). Can you share how you instantiated HuggingFaceReader? Was it something like this
HuggingFaceDatasetReader("HuggingFaceFW/fineweb", streaming=True, dataset_options={"split": "train"}) (it's fineweb right?)

@stas00
Copy link
Author

stas00 commented Jul 7, 2024

I did:

        HuggingFaceDatasetReader(
            dataset="HuggingFaceFW/fineweb",
            dataset_options=dict(name="CC-MAIN-2024-10", split="train"),
            streaming=True,  # to avoid the huge overhead of memory and time
            limit=10000,
            batch_size=1,
            doc_progress=True,
            text_key="text",
        ),

I suspect the prompt comes from me trying to use 10k tasks, and when this gets submitted to slurm it generates 10 job arrays with 1k jobs each. And then is it possible that datatrove isn't aware of that and relies on the job id to calculate the index being unaware that it also has to include the job_array id in its indexing logic?

So with 10k tasks what we have here is SLURM with job ids as following:

11111_1
11111_2
...
11111_999
11112_1
11112_2
...
11112_999
11113_1
11113_2
...
11113_999
...

so if it just looks at m/_\d+/ of the job id, then only the first job array works.

I could be wrong of course and the cause of the issue is something else

@stas00
Copy link
Author

stas00 commented Jul 7, 2024

Is there a way to tell datatrove to pass tasks=-1 and have it figure out how many tasks it needs? Assuming that dataset has __len__ - but then we use it in streaming mode where I don't think it knows the length.

@guipenedo
Copy link
Collaborator

It should assign the correct rank, you can check the first few lines of the log files, they should say "rank=number" and it should be >1000 for the other job arrays.
The reason datatrove splits each array at 1001 is because default slurm configs have a maximum job array size of 1001 (it's the "MaxArraySize") option. If this is not the case for your cluster you can pass a different value for max_array_size= on the slurm executor.
As to the issue itself, I'll check it better tomorrow but I think we only parallelize on the file level for the HF reader, so each task will stream a specific file. I think 10k is probably bigger than the total number of files for that dump of fineWeb, but obviously it should just skip the other tasks instead of crashing them, so I'll look into that bug.

@guipenedo
Copy link
Collaborator

Is there a way to tell datatrove to pass tasks=-1 and have it figure out how many tasks it needs? Assuming that dataset has __len__ - but then we use it in streaming mode where I don't think it knows the length.

Not currently, as you can have multiple readers stacked one after the other with different file sizes each, or some pipelines without readers at all. Your approach with 10k should work (extra tasks should just not do anything). For now until we fix this bug I recommend using the ParquetReader to stream and process FineWeb data, there's an example snippet for this on the FineWeb dataset page

@guipenedo
Copy link
Collaborator

This should fix the bug/crash: e01bd0a
for this particular config you are using, since there are only 250 files available, tasks>250 will not do anything

@stas00
Copy link
Author

stas00 commented Jul 8, 2024

Thank you for working with me on this, @guipenedo - I'm currently testing your fix

The other weird thing about this splitting into 1k job arrays, is that it won't start the next job array till it finishes the first one. I guess this is normal because of job dependency. Just a peculiarity I noticed - if even one job is lagging behind, it won't expand to num workers until it's done.

So if one or more of the jobs crashes - how does it recover?

@guipenedo
Copy link
Collaborator

The dependencies for these arrays are "afterany:" and not "afterok:", so if one crashes the next job array should still launch. If you want them to run concurrently you can set max_array_launch_parallel=True and then they won't wait for each other

@stas00
Copy link
Author

stas00 commented Jul 8, 2024

for this particular config you are using, since there are only 250 files available, tasks>250 will not do anything

I'm not sure what you mean by only 250 files available. I'm passing:

        HuggingFaceDatasetReader(..., limit=1000)

so should take a lot more tasks than 250.

Unless I don't understand the purpose of limit and it simply drops the rest of data beyond limit?

I thought limit helps the user to set how many samples to feed per slurm job.

Otherwise, if you can't control that, how would you deal with time-limited slurm jobs? Say your job can't be longer than 24h and it'd take more than 24h to process one file - it'll than get cancelled by slurm and it'd be impossible to finish data processing.

@stas00
Copy link
Author

stas00 commented Jul 8, 2024

The dependencies for these arrays are "afterany:" and not "afterok:", so if one crashes the next job array should still launch. If you want them to run concurrently you can set max_array_launch_parallel=True and then they won't wait for each other

oh! that's very cool! Thank you! Shouldn't that be the default setting? It makes sense that the limitation imposed by slurm to have only 1k jobs per job array, ideally should be transparent to the user and if resources are available it should utilize all of them at all times?

@guipenedo
Copy link
Collaborator

guipenedo commented Jul 8, 2024

for this particular config you are using, since there are only 250 files available, tasks>250 will not do anything

I'm not sure what you mean by only 250 files available. I'm passing:

        HuggingFaceDatasetReader(..., limit=1000)

so should take a lot more tasks than 250.

Unless I don't understand the purpose of limit and it simply drops the rest of data beyond limit?

I thought limit helps the user to set how many samples to feed per slurm job.

Otherwise, if you can't control that, how would you deal with time-limited slurm jobs? Say your job can't be longer than 24h and it'd take more than 24h to process one file - it'll than get cancelled by slurm and it'd be impossible to finish data processing.

Yes it drops the rest, we use it mostly for testing/debugging (a small limit just to check everything runs fine before scaling up).
I suppose in-file-parallelism would fix this issue with the time limit but we currently don't support it, internally we try to have a lot of (small) files so that we can run many tasks without risking them crashing/timing out and wasting a lot of work.

For the HuggingFaceReader in particular you can actually get in file parallelism to work if you don't use streaming (but then obviously it would take up some storage), as you can scale the total number of tasks as much as you want and it will split everything, but for JsonlReader, ParquetReader, etc it's still just a planned feature

@guipenedo
Copy link
Collaborator

The dependencies for these arrays are "afterany:" and not "afterok:", so if one crashes the next job array should still launch. If you want them to run concurrently you can set max_array_launch_parallel=True and then they won't wait for each other

oh! that's very cool! Thank you! Shouldn't that be the default setting? It makes sense that the limitation imposed by slurm to have only 1k jobs per job array, ideally should be transparent to the user and if resources are available it should utilize all of them at all times?

That's a good point, I might change the default

@stas00
Copy link
Author

stas00 commented Jul 8, 2024

To examples of datatrove+fineweb - it'd be very useful to have more of those

Incidentally any idea why datatrove wasn't used for fineweb-edu and used HF Trainer instead? https://github.com/huggingface/cosmopedia/tree/main/classification It appears that they somehow managed to run the whole thing in 24h https://github.com/huggingface/cosmopedia/blob/main/classification/train_edu_bert.slurm#L11

@guipenedo
Copy link
Collaborator

To examples of datatrove+fineweb - it'd be very useful to have more of those

Incidentally any idea why datatrove wasn't used for fineweb-edu and used HF Trainer instead? https://github.com/huggingface/cosmopedia/tree/main/classification It appears that they somehow managed to run the whole thing in 24h https://github.com/huggingface/cosmopedia/blob/main/classification/train_edu_bert.slurm#L11

I think the slurm file you linked is to just train the classifier, which only takes a relatively small number of annotated samples. Actually classifying all of FineWeb took quite a lot longer.

Datatrove could have been used but this was originally the work of a separate team who wasn't too familiar with it

@stas00
Copy link
Author

stas00 commented Jul 8, 2024

oh, and the other thing I noticed it creates one job array too many that never gets satisfied

           JOBID PARTITION NAME                                    STATE       TIME TIME_LIM  NODES           START_TIME NODELIST(REASON)
   98217_[0%128] a3mixed   classifier_filter             PENDING       0:00 23:40:00      1                  N/A (DependencyNeverSatisfied)

so with 2k items it created 98215_[0-999%128], 98216_[0-999%128], 98217_[0%128]

To validate I have just repeated with 1k items and got again one job array too many:

           JOBID PARTITION NAME                                                                                STATE       TIME TIME_LIM  NODES           START_TIME NODELIST(REASON)
100314_[128-999% a3mixed   classifier_filter                                                         PENDING       0:00 23:40:00      1                  N/A (JobArrayTaskLimit)
  100315_[0%128] a3mixed   classifier_filter                                                         PENDING       0:00 23:40:00      1                  N/A (Dependency)

and later it'll become DependencyNeverSatisfied and I will need to manually cancel it.

@guipenedo
Copy link
Collaborator

oh, and the other thing I noticed it creates one job array too many that never gets satisfied

           JOBID PARTITION NAME                                    STATE       TIME TIME_LIM  NODES           START_TIME NODELIST(REASON)
   98217_[0%128] a3mixed   classifier_filter             PENDING       0:00 23:40:00      1                  N/A (DependencyNeverSatisfied)

so with 2k items it created 98215_[0-999%128], 98216_[0-999%128], 98217_[0%128]

To validate I have just repeated with 1k items and got again one job array too many:

           JOBID PARTITION NAME                                                                                STATE       TIME TIME_LIM  NODES           START_TIME NODELIST(REASON)
100314_[128-999% a3mixed   classifier_filter                                                         PENDING       0:00 23:40:00      1                  N/A (JobArrayTaskLimit)
  100315_[0%128] a3mixed   classifier_filter                                                         PENDING       0:00 23:40:00      1                  N/A (Dependency)

and later it'll become DependencyNeverSatisfied and I will need to manually cancel it.

This one task will take all the individual job stats and merge them into a global stats.json file, showing how many documents were processed and dropped on each step.

This one actually has an afterok: dependency so it only runs if all the actual jobs succeed. I often use it as a quick way to check if my job is fully finished or not: if there's a DependencyNeverSatisfied job, then I have to rerun or fix something

@stas00
Copy link
Author

stas00 commented Jul 8, 2024

I think the slurm file you linked is to just train the classifier, which only takes a relatively small number of annotated samples. Actually classifying all of FineWeb took quite a lot longer.

oops, linked to the wrong file, the inference is here:
https://github.com/huggingface/cosmopedia/blob/main/classification/run_edu_bert.py

https://github.com/huggingface/cosmopedia/blob/main/classification/run_edu_bert.slurm#L13C1-L14C26

#SBATCH --time=7-00:00:00
#SBATCH --array=0-127%128

so it was running on a slurm env w/o time effective limits
and it wasn't doing any error handling - so if any job failed, that data was lost.

So I still would like to know - how does datatrove handles failed jobs?

Do you need to restart the same process a few times and it'll recheck if any of the jobs didn't complete and it'll then re-do them from scratch? W/o taking into an account any partial completion?

Is there a way to have some finalizer script that can report to the operator if all jobs were successful or whether it needs to re-run some?

And of course it'd be silly to launch 1k jobs if only one failed and needs repeating, albeit most will exit immediately... so probably ok.

edit: I see you already replied partially to my questions in the previous comment of yours.

@stas00
Copy link
Author

stas00 commented Jul 8, 2024

ok, so I launched again with no limit on that single shard, how do I interpret this part of the log?

4711it [12:06,  5.89it/s]
4768it [12:14,  7.48it/s]/s]
4812it [12:22,  6.30it/s]
4861it [12:29,  7.12it/s]
4913it [12:37,  7.88it/s]
4961it [12:44,  6.13it/s]
5013it [12:51,  7.44it/s]
5062it [12:59,  6.89it/s]
5107it [13:06,  4.68it/s]
5154it [13:13,  6.16it/s]]
5201it [13:21,  6.48it/s]

That is how do I know the actual progress - it doesn't tell me how many iterations will be run and whether the input file will be smaller than the capacity I have to train in 24h, since that's the longest job I can run and I don't want to lose data.

I'm using doc_progress=True,

Is this because of streaming? Should I not use streaming - it felt it was much slower w/o streaming when I tried it out first.

@stas00
Copy link
Author

stas00 commented Jul 8, 2024

Also, this might be of interest to you:

0: 2024-07-08 23:22:45.541 | INFO     | datatrove.utils.logging:add_task_logger:58 - Launching pipeline for rank=0
0: 2024-07-08 23:22:45.541 | INFO     | datatrove.utils.logging:log_pipeline:90 - 
0: --- 🛠️ PIPELINE 🛠
0: 📖 - READER: 🤗 HuggingFace
0: 🔻 - FILTER:  Classifier Filter
0: 💽 - WRITER: 🐿 Jsonl
0: Using the latest cached version of the dataset since HuggingFaceFW/fineweb couldn't be found on the Hugging Face Hub
0: Found the latest cached dataset configuration 'CC-MAIN-2024-10' at /data/huggingface/datasets/HuggingFaceFW___fineweb/CC-MAIN-2024-10/0.0.0/922442327a589c50e417c98f934c7b62729017b6 (last modified on Wed Jul  3 05:29:49 2024).

It says it can't find HuggingFaceFW/fineweb - the setup is:

        HuggingFaceDatasetReader(
            dataset="HuggingFaceFW/fineweb",
            dataset_options=dict(name="CC-MAIN-2024-10", split="train"),

@guipenedo
Copy link
Collaborator

oops, linked to the wrong file, the inference is here: https://github.com/huggingface/cosmopedia/blob/main/classification/run_edu_bert.py

https://github.com/huggingface/cosmopedia/blob/main/classification/run_edu_bert.slurm#L13C1-L14C26

#SBATCH --time=7-00:00:00
#SBATCH --array=0-127%128

so it was running on a slurm env w/o time effective limits and it wasn't doing any error handling - so if any job failed, that data was lost.

Yes exactly, each individual task was processing 1/128th (job array with 128 jobs, and passing the position in the array to --shard) of a dump, so basically 128 jobs per dump.

So I still would like to know - how does datatrove handles failed jobs?

Do you need to restart the same process a few times and it'll recheck if any of the jobs didn't complete and it'll then re-do them from scratch? W/o taking into an account any partial completion?

Yes, it will only relaunch the incomplete tasks. We don't really have any "checkpointing" as it would depend a lot on when the output files were last flushed (could often even just corrupt the files if the final data before a task crashed was incomplete), and just to in general simplify some other logic in general.

Is there a way to have some finalizer script that can report to the operator if all jobs were successful or whether it needs to re-run some?

We have two commands that you can use to track job status:

  • failed_logs (path to a job's logging_dir) will show you the log files of failed tasks
  • jobs_status (path) is meant to be used on the parent folder of your log files, for example if I am tokenizing things in /logs/tokenize/datasetX running jobs_status /logs/tokenize will show how many tasks completed for each dataset

And of course it'd be silly to launch 1k jobs if only one failed and needs repeating, albeit most will exit immediately... so probably ok.
Yes in this case datatrove will only launch 1 job.

ok, so I launched again with no limit on that single shard, how do I interpret this part of the log?

4711it [12:06,  5.89it/s]
4768it [12:14,  7.48it/s]/s]
4812it [12:22,  6.30it/s]
4861it [12:29,  7.12it/s]
4913it [12:37,  7.88it/s]
4961it [12:44,  6.13it/s]
5013it [12:51,  7.44it/s]
5062it [12:59,  6.89it/s]
5107it [13:06,  4.68it/s]
5154it [13:13,  6.16it/s]]
5201it [13:21,  6.48it/s]

That is how do I know the actual progress - it doesn't tell me how many iterations will be run and whether the input file will be smaller than the capacity I have to train in 24h, since that's the longest job I can run and I don't want to lose data.

I'm using doc_progress=True,

Is this because of streaming? Should I not use streaming - it felt it was much slower w/o streaming when I tried it out first.

We do not really check total number of documents as some formats don't have it (jsonl, etc), even though in theory for a dataset on the hub we could find a way to fetch it. When you're processing multiple files file_progress will give you a time estimate as the total nb of files is known (once you finish processing one of them) but currently not for docs.

@stas00
Copy link
Author

stas00 commented Jul 9, 2024

re: jobs_status - good to know

  1. can it be documented please?
  2. the output coloring is an issue again, the output is not readable as it assumes the dark theme again.

snapshot_866

I tried all the tricks discussed at #180 but none of them work. Tried all 3: DATATROVE_COLORIZE_LOG_FILES=0 DATATROVE_COLORIZE_LOGS=0 LOGURU_COLORIZE=NO

@stas00
Copy link
Author

stas00 commented Jul 9, 2024

failed_logs is different from jobs_status - seems to want the low-level subdir. i.e. logs/slurm_processing/ instead of logs and it has the same issue with coloring.

may I suggest to add a prefix to these utils? as in:

datatrove_jobs_status
datatrove_failed_logs

or dt_* if it's shorter.

otherwise the names are too generic and may collide with other pre-existing scripts/aliases.

@stas00
Copy link
Author

stas00 commented Jul 11, 2024

@guipenedo, max_array_launch_parallel=True isn't doing what you proposed - it actually ends up running many more workers than configured.

so if I submit 2k tasks and I set 128 workers, it'd run 256 workers!

@stas00
Copy link
Author

stas00 commented Jul 11, 2024

And another thing I can't figure out.

The 250 files job was fine, but then I run the same on the CC-MAIN-2023-06 shard which proved to be much bigger. So how do I know how many tasks I'd need? Is there no way for datatrove to figure that out dynamically?

And so I first did 1k items and noticed none of them finished early and they all had data to process, so I raised it to 2k and launched it again and it did another 1k item - hitting rank 1999 - I said OK, then there is even more than 2k to do so I raised to 3k and launched again...

Should there be as many tasks as shards or files?

python -c "import sys; from datasets import load_dataset; ds=load_dataset('HuggingFaceFW/fineweb', sys.argv[1])" CC-MAIN-2023-06

Resolving data files: 100%|██████████████████████████████████████████████████████████████████████| 23781/23781 [00:01<00:00, 12054.50it/s]
Resolving data files: 100%|██████████████████████████████████████████████████████████████████████████| 350/350 [00:00<00:00, 37389.05it/s]
Loading dataset shards: 100%|█████████████████████████████████████████████████████████████████████████| 2264/2264 [07:33<00:00,  4.99it/s]

so 2264 tasks? clearly it is not 350 shards

@stas00
Copy link
Author

stas00 commented Jul 12, 2024

it just keeps on going: rank=3129

cat /data/stas/classify/data/logs/slurm_processing/slurm_logs/118829_129.out
Starting data processing job fin_class-CC-MAIN-2023-06
+ export PYTHONUNBUFFERED=TRUE
+ PYTHONUNBUFFERED=TRUE
+ srun -l launch_pickled_pipeline /data/stas/classify/data/logs/slurm_processing/executor.pik
0: 2024-07-12 00:56:16.392 | INFO     | datatrove.utils.logging:add_task_logger:58 - Launching pipeline for rank=3129
0: 2024-07-12 00:56:16.392 | INFO     | datatrove.utils.logging:log_pipeline:90 -
0: --- 🛠️ PIPELINE 🛠
0: 📖 - READER: 🤗 HuggingFace
0: 🔻 - FILTER:  Classifier Filter
0: 💽 - WRITER: 🐿 Jsonl
37001it [02:47, 213.25it/s]

how do I know how many tasks I need?

I wonder if it's because I had to switch from streaming to cached and then it had more shards?

But even then it's more than what it said when loading the dataset - it's past 3k tasks now

Loading dataset shards: 100%| 2264/2264 [07:33<00:00,  4.99it/s]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants