diff --git a/bin/mpi-fastspecfit b/bin/mpi-fastspecfit index ba60e7e7..5f2ff15e 100755 --- a/bin/mpi-fastspecfit +++ b/bin/mpi-fastspecfit @@ -189,9 +189,12 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None, makeqa=F #print(f'Rank={comm.rank}, subrank={subcomm.rank}, redrockfiles={redrockfiles}, ntargets={ntargets}') for redrockfile, outfile, ntarget in zip(redrockfiles, outfiles, ntargets): - if subcomm.rank == 0: - log.debug(f'Rank {rank} (subrank {subcomm.rank}) started ' + \ - f'at {time.asctime()}') + if subcomm: + if subcomm.rank == 0: + log.debug(f'Rank {rank} (subrank {subcomm.rank}) started ' + \ + f'at {time.asctime()}') + elif rank == 0: + log.debug(f'Rank {rank} started at {time.asctime()}') if args.makeqa: from fastspecfit.qa import fastqa as fast @@ -204,15 +207,24 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None, makeqa=F cmd, cmdargs, logfile = build_cmdargs(args, redrockfile, outfile, sample=sample, fastphot=fastphot, input_redshifts=input_redshifts) - if subcomm.rank == 0: - log.info(f'Rank {rank} (nsubrank={subcomm.size}): ' + \ - f'ntargets={ntarget}: {cmd} {cmdargs}') + if subcomm: + if subcomm.rank == 0: + log.info(f'Rank {rank} (nsubrank={subcomm.size}): ' + \ + f'ntargets={ntarget}: {cmd} {cmdargs}') + elif rank == 0: + log.info(f'Rank {rank}: ntargets={ntarget}: {cmd} {cmdargs}') if args.dry_run: continue try: - if subcomm.rank == 0: + if subcomm: + if subcomm.rank == 0: + t1 = time.time() + outdir = os.path.dirname(logfile) + if not os.path.isdir(outdir): + os.makedirs(outdir, exist_ok=True) + elif rank == 0: t1 = time.time() outdir = os.path.dirname(logfile) if not os.path.isdir(outdir): @@ -225,19 +237,31 @@ def run_fastspecfit(args, comm=None, fastphot=False, specprod_dir=None, makeqa=F #log.info(f'rank={comm.rank} subrank={subcomm.rank}') err = fast(args=cmdargs.split(), comm=subcomm) - if subcomm.rank == 0: + if subcomm: + if subcomm.rank == 0: + dt1 = time.time() - t1 + log.info(f' rank {rank} done in {dt1:.2f} sec') + if err != 0: + if not os.path.exists(outfile): + log.warning(f' rank {rank} missing {outfile}') + raise IOError + elif rank == 0: dt1 = time.time() - t1 log.info(f' rank {rank} done in {dt1:.2f} sec') if err != 0: if not os.path.exists(outfile): log.warning(f' rank {rank} missing {outfile}') raise IOError + except: log.warning(f' rank {rank} raised an exception') import traceback traceback.print_exc() - if subcomm.rank == 0: + if subcomm: + if subcomm.rank == 0: + log.debug(f' rank {rank} is done') + elif rank == 0: log.debug(f' rank {rank} is done') if comm: @@ -322,11 +346,9 @@ def main(): else: try: from mpi4py import MPI - MPI.Init_thread(MPI.THREAD_SINGLE) # needed when profiling; no effect otherwise # https://docs.linaroforge.com/24.0.6/html/forge/map/python_profiling/profile_python_script.html - mpi4py.rc.threaded = False - #mpi4py.rc.thread_level = "funneled" + #MPI.Init_thread(MPI.THREAD_SINGLE) comm = MPI.COMM_WORLD except ImportError: comm = None