From 6a0387f0394fe6555c40098d317403d06e8f589f Mon Sep 17 00:00:00 2001 From: ErikOsinga Date: Thu, 9 May 2024 12:34:53 -0400 Subject: [PATCH 1/4] fix: launch jobs using Pool Fixes deadlock bug that arose from Prefect+Docker+process_map not working together (Issue #130). By setting verbose=False, jobs are now launched using Pool, setting start method to spawn --- RMtools_3D/do_fitIcube.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/RMtools_3D/do_fitIcube.py b/RMtools_3D/do_fitIcube.py index 6cb753c..c8466b9 100755 --- a/RMtools_3D/do_fitIcube.py +++ b/RMtools_3D/do_fitIcube.py @@ -43,6 +43,7 @@ import astropy.io.fits as pf import numpy as np from tqdm.contrib.concurrent import process_map +import multiprocessing as mp from RMtools_3D.do_RMsynth_3D import readFitsCube from RMtools_3D.make_freq_file import get_freq_array @@ -530,16 +531,23 @@ def make_model_I( nDetectPix=nDetectPix, verbose=verbose, ) + # Send each spectrum to a different core - results = process_map( - func, - srcData, - max_workers=num_cores, - chunksize=chunk_size, - disable=not verbose, - desc="Fitting spectra", - total=nDetectPix, - ) + if verbose: # Note that 'verbose' is not compatible with Prefect + results = process_map( + func, + srcData, + max_workers=num_cores, + chunksize=chunk_size, + disable=not verbose, + desc="Fitting spectra", + total=nDetectPix, + ) + else: + mp.set_start_method("spawn") + args_list = [d for d in srcData] + with mp.Pool(processes=num_cores) as pool: + results = pool.map(func, args_list) headcoeff = strip_fits_dims(header=header, minDim=2) del headcoeff["BUNIT"] From 8364e34b75ef42904ae6af35f5355c8905ea4705 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 9 May 2024 16:37:03 +0000 Subject: [PATCH 2/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- RMtools_3D/do_fitIcube.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/RMtools_3D/do_fitIcube.py b/RMtools_3D/do_fitIcube.py index c8466b9..df58ade 100755 --- a/RMtools_3D/do_fitIcube.py +++ b/RMtools_3D/do_fitIcube.py @@ -35,6 +35,7 @@ # =============================================================================# import argparse +import multiprocessing as mp import os import sys import time @@ -43,7 +44,6 @@ import astropy.io.fits as pf import numpy as np from tqdm.contrib.concurrent import process_map -import multiprocessing as mp from RMtools_3D.do_RMsynth_3D import readFitsCube from RMtools_3D.make_freq_file import get_freq_array @@ -533,7 +533,7 @@ def make_model_I( ) # Send each spectrum to a different core - if verbose: # Note that 'verbose' is not compatible with Prefect + if verbose: # Note that 'verbose' is not compatible with Prefect results = process_map( func, srcData, From 8fe9513632dfc1103f37a0f59de468330c9c3a22 Mon Sep 17 00:00:00 2001 From: ErikOsinga Date: Tue, 16 Jul 2024 12:43:42 -0400 Subject: [PATCH 3/4] fix able to run from command-line and add stokesI fit test script --- RMtools_3D/do_fitIcube.py | 6 +- tests/test_stokesIfit.py | 167 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+), 3 deletions(-) create mode 100644 tests/test_stokesIfit.py diff --git a/RMtools_3D/do_fitIcube.py b/RMtools_3D/do_fitIcube.py index df58ade..33741f7 100755 --- a/RMtools_3D/do_fitIcube.py +++ b/RMtools_3D/do_fitIcube.py @@ -35,7 +35,6 @@ # =============================================================================# import argparse -import multiprocessing as mp import os import sys import time @@ -44,6 +43,7 @@ import astropy.io.fits as pf import numpy as np from tqdm.contrib.concurrent import process_map +import multiprocessing as mp from RMtools_3D.do_RMsynth_3D import readFitsCube from RMtools_3D.make_freq_file import get_freq_array @@ -533,7 +533,7 @@ def make_model_I( ) # Send each spectrum to a different core - if verbose: # Note that 'verbose' is not compatible with Prefect + if verbose: # Note that 'verbose' is not compatible with Prefect results = process_map( func, srcData, @@ -544,7 +544,7 @@ def make_model_I( total=nDetectPix, ) else: - mp.set_start_method("spawn") + mp.set_start_method("spawn", force=True) args_list = [d for d in srcData] with mp.Pool(processes=num_cores) as pool: results = pool.map(func, args_list) diff --git a/tests/test_stokesIfit.py b/tests/test_stokesIfit.py new file mode 100644 index 0000000..64b7efd --- /dev/null +++ b/tests/test_stokesIfit.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Tests for the Stokes I fitting in RMtools_3D""" + +import os +import numpy as np +import logging +from astropy.io import fits + +from RMtools_3D.make_freq_file import get_freq_array +# rmtools_fitIcube +from RMtools_3D.do_fitIcube import open_datacube, make_model_I + +# import RMtools_3D +# print(f"Using version {RMtools_3D.__file__}") + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +def make_fake_StokesIcube(filename='random_Icube.fits'): + # Create random data cube, 144 channels 102,102 pixels + data = np.random.rand(144, 1, 102, 102).astype(np.float32) + + # Create FITS header + header = fits.Header() + header['SIMPLE'] = True + header['BITPIX'] = -32 + header['NAXIS'] = 4 + header['NAXIS1'] = 102 + header['NAXIS2'] = 102 + header['NAXIS3'] = 1 + header['NAXIS4'] = 144 + header['WCSAXES'] = 4 + header['CRPIX1'] = -139869.0021857 + header['CRPIX2'] = -94562.00147332 + header['CRPIX3'] = 1.0 + header['CRPIX4'] = 1.0 + header['PC1_1'] = 0.7071067811865 + header['PC1_2'] = 0.7071067811865 + header['PC2_1'] = -0.7071067811865 + header['PC2_2'] = 0.7071067811865 + header['CDELT1'] = -0.0009710633743375 + header['CDELT2'] = 0.0009710633743375 + header['CDELT3'] = 1.0 + header['CDELT4'] = 1000000.0 + header['CUNIT1'] = 'deg' + header['CUNIT2'] = 'deg' + header['CUNIT4'] = 'Hz' + header['CTYPE1'] = 'RA---HPX' + header['CTYPE2'] = 'DEC--HPX' + header['CTYPE3'] = 'STOKES' + header['CTYPE4'] = 'FREQ' + header['CRVAL1'] = 0.0 + header['CRVAL2'] = 0.0 + header['CRVAL3'] = 1.0 + header['CRVAL4'] = 1295990740.741 + header['PV2_1'] = 4.0 + header['PV2_2'] = 3.0 + header['LONPOLE'] = 0.0 + header['LATPOLE'] = 90.0 + header['RESTFRQ'] = 1420405751.786 + header['RADESYS'] = 'FK5' + header['EQUINOX'] = 2000.0 + header['SPECSYS'] = 'TOPOCENT' + header['BMAJ'] = 0.005555555555556 + header['BMIN'] = 0.005555555555556 + header['BPA'] = 0.0 + header['BUNIT'] = 'JY/BEAM' + header['HISTORY'] = 'RANDOM FITS FILE FOR TESTING' + + # Create PrimaryHDU object + hdu = fits.PrimaryHDU(data=data, header=header) + # Write the FITS file + hdu.writeto(filename, overwrite=True) + print(f'Random FITS cube created: {filename}') + + return filename + + +def cleanup(outDir, prefixOut, polyOrd): + """ + Cleanup files that are made by make_model_I + """ + os.system("rm random_Icube.fits") + + for i in range(np.abs(polyOrd) + 1): + outname = os.path.join(outDir, prefixOut + "coeff" + str(i) + ".fits") + os.system(f"rm {outname}") + + outname = os.path.join(outDir, prefixOut + "coeff" + str(i) + "err.fits") + os.system(f"rm {outname}") + + MaskfitsFile = os.path.join(outDir, prefixOut + "mask.fits") + os.system(f"rm {MaskfitsFile}") + + fitsModelFile = os.path.join(outDir, prefixOut + "model.i.fits") + os.system(f"rm {fitsModelFile}") + + noisefile = os.path.join(outDir, prefixOut + "noise.dat") + os.system(f"rm {noisefile}") + + outname = os.path.join(outDir, prefixOut + "reffreq.fits") + os.system(f"rm {outname}") + + outname = os.path.join(outDir, prefixOut + "covariance.fits") + os.system(f"rm {outname}") + + +def test_stokesIfit_with_without_verbose(): + """ + Testing RMtools_3D/do_fitIcube.py with and without verbose + """ + I_filename = make_fake_StokesIcube() + + datacube, headI = open_datacube(fitsI=I_filename, verbose=False) + # Deriving frequencies from the fits header.") + freqArr_Hz = get_freq_array(I_filename) + + prefixOut = "" + outDir = "./" + polyOrd=2 + + logger.info("Running make_RMtools_3D.do_fitIcube.make_model_I with verbose=True") + # Run polynomial fitting on the spectra with verbose=T + make_model_I( + datacube=datacube, + header=headI, + freqArr_Hz=freqArr_Hz, + polyOrd=polyOrd, + prefixOut=prefixOut, + outDir=outDir, + nBits=32, + threshold=-3, + apply_mask=False, + num_cores=2, + chunk_size=1, + verbose=True, + fit_function="log", + ) + logger.info("Finished succesfully") + + logger.info("Running make_RMtools_3D.do_fitIcube.make_model_I with verbose=False") + # Run polynomial fitting on the spectra with verbose=F + make_model_I( + datacube=datacube, + header=headI, + freqArr_Hz=freqArr_Hz, + polyOrd=polyOrd, + prefixOut=prefixOut, + outDir=outDir, + nBits=32, + threshold=-3, + apply_mask=False, + num_cores=2, + chunk_size=1, + verbose=False, + fit_function="log", + ) + logger.info("Fitting finished succesfully") + logger.info("Removing output files...") + cleanup(outDir, prefixOut, polyOrd) + + logger.info("Stokes I fitting test finished succesfully") + +if __name__ == "__main__": + test_stokesIfit_with_without_verbose() \ No newline at end of file From da7a2846fc17e136e493ade33655c57e9e60bced Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 16 Jul 2024 16:45:29 +0000 Subject: [PATCH 4/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- RMtools_3D/do_fitIcube.py | 4 +- tests/test_stokesIfit.py | 111 +++++++++++++++++++------------------- 2 files changed, 59 insertions(+), 56 deletions(-) diff --git a/RMtools_3D/do_fitIcube.py b/RMtools_3D/do_fitIcube.py index 33741f7..c0e88d0 100755 --- a/RMtools_3D/do_fitIcube.py +++ b/RMtools_3D/do_fitIcube.py @@ -35,6 +35,7 @@ # =============================================================================# import argparse +import multiprocessing as mp import os import sys import time @@ -43,7 +44,6 @@ import astropy.io.fits as pf import numpy as np from tqdm.contrib.concurrent import process_map -import multiprocessing as mp from RMtools_3D.do_RMsynth_3D import readFitsCube from RMtools_3D.make_freq_file import get_freq_array @@ -533,7 +533,7 @@ def make_model_I( ) # Send each spectrum to a different core - if verbose: # Note that 'verbose' is not compatible with Prefect + if verbose: # Note that 'verbose' is not compatible with Prefect results = process_map( func, srcData, diff --git a/tests/test_stokesIfit.py b/tests/test_stokesIfit.py index 64b7efd..0c9f927 100644 --- a/tests/test_stokesIfit.py +++ b/tests/test_stokesIfit.py @@ -2,14 +2,15 @@ # -*- coding: utf-8 -*- """Tests for the Stokes I fitting in RMtools_3D""" +import logging import os + import numpy as np -import logging from astropy.io import fits -from RMtools_3D.make_freq_file import get_freq_array # rmtools_fitIcube -from RMtools_3D.do_fitIcube import open_datacube, make_model_I +from RMtools_3D.do_fitIcube import make_model_I, open_datacube +from RMtools_3D.make_freq_file import get_freq_array # import RMtools_3D # print(f"Using version {RMtools_3D.__file__}") @@ -18,62 +19,63 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -def make_fake_StokesIcube(filename='random_Icube.fits'): + +def make_fake_StokesIcube(filename="random_Icube.fits"): # Create random data cube, 144 channels 102,102 pixels data = np.random.rand(144, 1, 102, 102).astype(np.float32) # Create FITS header header = fits.Header() - header['SIMPLE'] = True - header['BITPIX'] = -32 - header['NAXIS'] = 4 - header['NAXIS1'] = 102 - header['NAXIS2'] = 102 - header['NAXIS3'] = 1 - header['NAXIS4'] = 144 - header['WCSAXES'] = 4 - header['CRPIX1'] = -139869.0021857 - header['CRPIX2'] = -94562.00147332 - header['CRPIX3'] = 1.0 - header['CRPIX4'] = 1.0 - header['PC1_1'] = 0.7071067811865 - header['PC1_2'] = 0.7071067811865 - header['PC2_1'] = -0.7071067811865 - header['PC2_2'] = 0.7071067811865 - header['CDELT1'] = -0.0009710633743375 - header['CDELT2'] = 0.0009710633743375 - header['CDELT3'] = 1.0 - header['CDELT4'] = 1000000.0 - header['CUNIT1'] = 'deg' - header['CUNIT2'] = 'deg' - header['CUNIT4'] = 'Hz' - header['CTYPE1'] = 'RA---HPX' - header['CTYPE2'] = 'DEC--HPX' - header['CTYPE3'] = 'STOKES' - header['CTYPE4'] = 'FREQ' - header['CRVAL1'] = 0.0 - header['CRVAL2'] = 0.0 - header['CRVAL3'] = 1.0 - header['CRVAL4'] = 1295990740.741 - header['PV2_1'] = 4.0 - header['PV2_2'] = 3.0 - header['LONPOLE'] = 0.0 - header['LATPOLE'] = 90.0 - header['RESTFRQ'] = 1420405751.786 - header['RADESYS'] = 'FK5' - header['EQUINOX'] = 2000.0 - header['SPECSYS'] = 'TOPOCENT' - header['BMAJ'] = 0.005555555555556 - header['BMIN'] = 0.005555555555556 - header['BPA'] = 0.0 - header['BUNIT'] = 'JY/BEAM' - header['HISTORY'] = 'RANDOM FITS FILE FOR TESTING' + header["SIMPLE"] = True + header["BITPIX"] = -32 + header["NAXIS"] = 4 + header["NAXIS1"] = 102 + header["NAXIS2"] = 102 + header["NAXIS3"] = 1 + header["NAXIS4"] = 144 + header["WCSAXES"] = 4 + header["CRPIX1"] = -139869.0021857 + header["CRPIX2"] = -94562.00147332 + header["CRPIX3"] = 1.0 + header["CRPIX4"] = 1.0 + header["PC1_1"] = 0.7071067811865 + header["PC1_2"] = 0.7071067811865 + header["PC2_1"] = -0.7071067811865 + header["PC2_2"] = 0.7071067811865 + header["CDELT1"] = -0.0009710633743375 + header["CDELT2"] = 0.0009710633743375 + header["CDELT3"] = 1.0 + header["CDELT4"] = 1000000.0 + header["CUNIT1"] = "deg" + header["CUNIT2"] = "deg" + header["CUNIT4"] = "Hz" + header["CTYPE1"] = "RA---HPX" + header["CTYPE2"] = "DEC--HPX" + header["CTYPE3"] = "STOKES" + header["CTYPE4"] = "FREQ" + header["CRVAL1"] = 0.0 + header["CRVAL2"] = 0.0 + header["CRVAL3"] = 1.0 + header["CRVAL4"] = 1295990740.741 + header["PV2_1"] = 4.0 + header["PV2_2"] = 3.0 + header["LONPOLE"] = 0.0 + header["LATPOLE"] = 90.0 + header["RESTFRQ"] = 1420405751.786 + header["RADESYS"] = "FK5" + header["EQUINOX"] = 2000.0 + header["SPECSYS"] = "TOPOCENT" + header["BMAJ"] = 0.005555555555556 + header["BMIN"] = 0.005555555555556 + header["BPA"] = 0.0 + header["BUNIT"] = "JY/BEAM" + header["HISTORY"] = "RANDOM FITS FILE FOR TESTING" # Create PrimaryHDU object hdu = fits.PrimaryHDU(data=data, header=header) # Write the FITS file hdu.writeto(filename, overwrite=True) - print(f'Random FITS cube created: {filename}') + print(f"Random FITS cube created: {filename}") return filename @@ -90,7 +92,7 @@ def cleanup(outDir, prefixOut, polyOrd): outname = os.path.join(outDir, prefixOut + "coeff" + str(i) + "err.fits") os.system(f"rm {outname}") - + MaskfitsFile = os.path.join(outDir, prefixOut + "mask.fits") os.system(f"rm {MaskfitsFile}") @@ -102,7 +104,7 @@ def cleanup(outDir, prefixOut, polyOrd): outname = os.path.join(outDir, prefixOut + "reffreq.fits") os.system(f"rm {outname}") - + outname = os.path.join(outDir, prefixOut + "covariance.fits") os.system(f"rm {outname}") @@ -111,7 +113,7 @@ def test_stokesIfit_with_without_verbose(): """ Testing RMtools_3D/do_fitIcube.py with and without verbose """ - I_filename = make_fake_StokesIcube() + I_filename = make_fake_StokesIcube() datacube, headI = open_datacube(fitsI=I_filename, verbose=False) # Deriving frequencies from the fits header.") @@ -119,7 +121,7 @@ def test_stokesIfit_with_without_verbose(): prefixOut = "" outDir = "./" - polyOrd=2 + polyOrd = 2 logger.info("Running make_RMtools_3D.do_fitIcube.make_model_I with verbose=True") # Run polynomial fitting on the spectra with verbose=T @@ -163,5 +165,6 @@ def test_stokesIfit_with_without_verbose(): logger.info("Stokes I fitting test finished succesfully") + if __name__ == "__main__": - test_stokesIfit_with_without_verbose() \ No newline at end of file + test_stokesIfit_with_without_verbose()