Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-node parallelism on slurm clusters #290

Open
shizhediao opened this issue Sep 22, 2024 · 6 comments
Open

Multi-node parallelism on slurm clusters #290

shizhediao opened this issue Sep 22, 2024 · 6 comments

Comments

@shizhediao
Copy link
Contributor

shizhediao commented Sep 22, 2024

Hi,

Let's say, I have a slurm cluster that contains 100 nodes, each node has 100 cores. Assuming I have 10000 tasks.
This is my current code:

   dist_executor = SlurmPipelineExecutor(
        job_name=f"filter",
        pipeline=[
            JsonlReader(
                args.input_dataset,
                limit=args.data_limit,
                file_progress=True,
                doc_progress=True,
                text_key=args.text_key,
                glob_pattern="*.jsonl.zst",
            ),
            LanguageFilter(
                language_threshold=0.65, 
                languages=[Languages.english],
                exclusion_writer=JsonlWriter(
                    f"{LOCAL_PATH}/removed/1_non_english/",
                    output_filename="data/${rank}.jsonl",
                    compression=None
                    # folder structure: language/dump/file
                ),
            ),
            JsonlWriter(
                output_folder=f"{LOCAL_PATH}/{args.output_name}/intermediate_filtered",
                output_filename="${rank}.jsonl",
                compression=None
            ),
        ],
        tasks=args.n_tasks,
        workers=args.n_workers,
        time="20:00:00",
        partition=args.partition1,
        account=args.account,
        logging_dir=f"{LOCAL_LOGS_PATH}/intermediate_filtered",
        slurm_logs_folder=f"{LOCAL_LOGS_PATH}/intermediate_filtered_slurm_logs",
        randomize_start_duration=180,  # don't hit the bucket all at once with the list requests
        cpus_per_task=12,
        # qos="high",
        mem_per_cpu_gb=3,
    )

I find that the workers is the number of nodes instead of the number of CPU cores. Is my understanding correct?
Then, it seems to me that my 10000 tasks will be executed node by node. For example, the node_1 will process task_1 and then task_101, then task_201. It does not fully utilize all the CPU cores in a single node. I expect all the tasks (task_1, task_101, task_201 ...) to be assigned to node_1 at the beginning and executed parallelly.

If I want to distribute my jobs to each node and fully utilize every core, what should I change to my code?

Thanks!

@ShayDuane
Copy link

ShayDuane commented Sep 25, 2024

I am encountering an issue where, even though I specify nodelist=node41,node42 and allocate 2 nodes, the task ranks do not seem to be shared across the nodes. Instead, each node appears to execute the same rank tasks. Below are the logs:

+ PYTHONUNBUFFERED=TRUE
+ srun -l launch_pickled_pipeline /shared/home/dsq/test_log/executor.pik
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node41
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:log_pipeline:90 - 
0: --- 🛠️ PIPELINE 🛠
0: 📖 - READER: 📷 MINT
0: 💽 - WRITER: 📷 MINTWriter
0: 2024-09-25 16:54:30.608 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node42
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:log_pipeline:90 - 
1: --- 🛠️ PIPELINE 🛠
1: 📖 - READER: 📷 MINT
1: 💽 - WRITER: 📷 MINTWriter
1: 2024-09-25 16:54:35.185 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
0: 2024-09-25 16:58:20.410 | SUCCESS  | vldata.executor.base:_run_for_rank:98 - Processing done for rank=0
0: 2024-09-25 16:58:20.417 | INFO     | vldata.executor.base:_run_for_rank:104 - 
0: 
0: 📉📉📉 Stats: Task 0 📉📉📉

This results in an inability to achieve true parallel computation across nodes.

@guipenedo
Copy link
Collaborator

Hi, do you know if your cluster is configured to always run 1 task per node? Ideally it should be possible for different tasks to share cpu resources on the same node and not always take a node exclusively

@guipenedo
Copy link
Collaborator

I am encountering an issue where, even though I specify nodelist=node41,node42 and allocate 2 nodes, the task ranks do not seem to be shared across the nodes. Instead, each node appears to execute the same rank tasks. Below are the logs:

+ PYTHONUNBUFFERED=TRUE
+ srun -l launch_pickled_pipeline /shared/home/dsq/test_log/executor.pik
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node41
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:log_pipeline:90 - 
0: --- 🛠️ PIPELINE 🛠
0: 📖 - READER: 📷 MINT
0: 💽 - WRITER: 📷 MINTWriter
0: 2024-09-25 16:54:30.608 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node42
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:log_pipeline:90 - 
1: --- 🛠️ PIPELINE 🛠
1: 📖 - READER: 📷 MINT
1: 💽 - WRITER: 📷 MINTWriter
1: 2024-09-25 16:54:35.185 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
0: 2024-09-25 16:58:20.410 | SUCCESS  | vldata.executor.base:_run_for_rank:98 - Processing done for rank=0
0: 2024-09-25 16:58:20.417 | INFO     | vldata.executor.base:_run_for_rank:104 - 
0: 
0: 📉📉📉 Stats: Task 0 📉📉📉

This results in an inability to achieve true parallel computation across nodes.

can you show the config you are using?

@ShayDuane
Copy link

I am encountering an issue where, even though I specify nodelist=node41,node42 and allocate 2 nodes, the task ranks do not seem to be shared across the nodes. Instead, each node appears to execute the same rank tasks. Below are the logs:

+ PYTHONUNBUFFERED=TRUE
+ srun -l launch_pickled_pipeline /shared/home/dsq/test_log/executor.pik
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node41
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:log_pipeline:90 - 
0: --- 🛠️ PIPELINE 🛠
0: 📖 - READER: 📷 MINT
0: 💽 - WRITER: 📷 MINTWriter
0: 2024-09-25 16:54:30.608 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node42
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:log_pipeline:90 - 
1: --- 🛠️ PIPELINE 🛠
1: 📖 - READER: 📷 MINT
1: 💽 - WRITER: 📷 MINTWriter
1: 2024-09-25 16:54:35.185 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
0: 2024-09-25 16:58:20.410 | SUCCESS  | vldata.executor.base:_run_for_rank:98 - Processing done for rank=0
0: 2024-09-25 16:58:20.417 | INFO     | vldata.executor.base:_run_for_rank:104 - 
0: 
0: 📉📉📉 Stats: Task 0 📉📉📉

This results in an inability to achieve true parallel computation across nodes.

can you show the config you are using?

here #292
thanks!

@shizhediao
Copy link
Contributor Author

Hi, do you know if your cluster is configured to always run 1 task per node? Ideally it should be possible for different tasks to share cpu resources on the same node and not always take a node exclusively

Hi,
Thank you for your reply! Do you have any idea that which parameter could configure that? I am sure that I could use more than 1 tasks per node. Perhaps, I miss some config so that the program is running with only 1 task.

@shizhediao
Copy link
Contributor Author

BTW, I find another way to do multi-node parallelism. I turned to use LocalPipelineExecutor following the instruction here: Multi-node parallelism: https://github.com/huggingface/datatrove?tab=readme-ov-file#localpipelineexecutor

Do you think it is a good way?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants