Skip to content

Commit

Permalink
Add wait_for_slurm
Browse files Browse the repository at this point in the history
  • Loading branch information
GianlucaFicarelli committed Dec 19, 2023
1 parent a21e55c commit f97b90d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
12 changes: 7 additions & 5 deletions custom_analyses/src/a02/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from bluepysnap.frame_report import FrameReport
from bluepysnap.simulation import Simulation
from bluepysnap.spike_report import SpikeReport
from common.utils import L, run_analysis
from common.utils import L, clean_slurm_env, run_analysis, wait_for_slurm

from blueetl.campaign.config import SimulationCampaign

Expand Down Expand Up @@ -79,13 +79,14 @@ def _plot(index: int, path: str, conditions: dict, analysis_config: dict) -> tup
def main(analysis_config: dict) -> dict:
campaign = SimulationCampaign.load(analysis_config["simulation_campaign"])
slurm_args = {**DEFAULT_SLURM_ARGS, **analysis_config.get("slurm_args", {})}
clean_slurm_env()

log_folder = "log_test/%j"
executor = submitit.AutoExecutor(folder=log_folder)
executor.update_parameters(**slurm_args)
L.info("Using %s executor.", executor.cluster)
L.info("Using %s executor", executor.cluster)

# submit all jobs at once in a Slurm job array
# submit all the jobs at once in a Slurm job array
with executor.batch():
jobs = [
executor.submit(
Expand All @@ -97,9 +98,10 @@ def main(analysis_config: dict) -> dict:
)
for sim in campaign
]
L.info("Number of jobs %s", len(jobs))
L.info("Waiting for slurm to be ready...")
wait_for_slurm()

# process the results
L.info("Waiting for %s jobs to complete...", len(jobs))
outputs = []
for job in jobs:
sim_index, output_path = job.result()
Expand Down
14 changes: 13 additions & 1 deletion custom_analyses/src/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import sys
import time
from collections.abc import Callable
from pathlib import Path

Expand Down Expand Up @@ -49,7 +50,6 @@ def wrapper(
log_level: str | int = logging.INFO,
) -> dict:
"""Call the wrapped function, and write the result to file."""
clean_slurm_env()
setup_logging(log_format=log_format, log_level=log_level)
result = func(analysis_config)
if analysis_output:
Expand Down Expand Up @@ -78,3 +78,15 @@ def clean_slurm_env():
if key.startswith(("PMI_", "SLURM_")) and not key.endswith(("_ACCOUNT", "_PARTITION")):
L.debug("Deleting env variable %s", key)
del os.environ[key]


def wait_for_slurm():
"""Wait for some time to allow sacct to return the correct status of the submitted jobs.
This may be needed when the slurm ids have been reset, and re-used.
See https://github.com/facebookincubator/submitit/issues/1660.
"""
initial_sleep = float(os.getenv("SUBMIT_JOBS_INITIAL_SLEEP", "10"))
L.debug("SUBMIT_JOBS_INITIAL_SLEEP=%s", initial_sleep)
time.sleep(initial_sleep)

0 comments on commit f97b90d

Please sign in to comment.