diff --git a/environment.yml b/environment.yml index 9279402ef..aabe85d5a 100644 --- a/environment.yml +++ b/environment.yml @@ -13,7 +13,7 @@ dependencies: - joblib==1.1.0 - libtool==2.4.6 - matplotlib==3.5.1 - - numba==0.54.1 + - numba>=0.57 - pandas==1.4.1 - pip: - cs_util==0.0.5 diff --git a/shapepipe/pipeline/file_io.py b/shapepipe/pipeline/file_io.py index a768f0c06..092a24578 100644 --- a/shapepipe/pipeline/file_io.py +++ b/shapepipe/pipeline/file_io.py @@ -2,7 +2,7 @@ This file contains methods for file I/O handling. -:Author: Marc Gentile and Axel Guinot +:Author: Marc Gentile and Axel Guinot and Lucie Baumont """ @@ -71,7 +71,7 @@ def format(self): """Format. Get the default input/output format of the catalogue - (e.g. Text, SExtractor, FITS) + (e.g. Text, SExtractor_ascii, FITS_LDAC, FITS) """ return self._format @@ -87,7 +87,7 @@ def get_nb_rows(self): Number of rows """ - raise BaseCatalogue.FeatureNotImplemented('get_nb_rows()') + raise BaseCatalogue.FeatureNotImplemented("get_nb_rows()") def get_nb_cols(self): """Get Number of Columns. @@ -111,7 +111,7 @@ def get_col_names(self): list list of column names """ - raise BaseCatalogue.FeatureNotImplemented('get_col_names()') + raise BaseCatalogue.FeatureNotImplemented("get_col_names()") def get_col_formats(self): """Get Column Formats. @@ -119,14 +119,10 @@ def get_col_formats(self): Get the list of column formats in the order of columns """ - raise BaseCatalogue.FeatureNotImplemented('get_col_names()') + raise BaseCatalogue.FeatureNotImplemented("get_col_names()") def add_col( - self, - col_name, - col_format=None, - col_comment=None, - col_data=None, + self, col_name, col_format=None, col_comment=None, col_data=None, ): """Add Column. @@ -144,7 +140,7 @@ def add_col( Column data as a numpy array """ - raise BaseCatalogue.FeatureNotImplemented('add_col()') + raise BaseCatalogue.FeatureNotImplemented("add_col()") def _file_exists(self, filepath): """File Exists. @@ -167,16 +163,16 @@ class InputFormat: Undefined = 0 TabulatedText = 1 - SExtractor = 2 + SExtractor_ascii = 2 FITS = 4 FITS_LDAC = 5 class OpenMode: """Supported input catalogue open modes.""" - ReadOnly = 'readonly' - ReadWrite = 'update' - Append = 'append' + ReadOnly = "readonly" + ReadWrite = "update" + Append = "append" class Column(object): """Column. @@ -194,7 +190,7 @@ def name(self): Get the name of the column """ - raise BaseCatalogue.FeatureNotImplemented('column.name') + raise BaseCatalogue.FeatureNotImplemented("column.name") @property def format(self): @@ -203,7 +199,7 @@ def format(self): Get the format of the column """ - raise BaseCatalogue.FeatureNotImplemented('column.format') + raise BaseCatalogue.FeatureNotImplemented("column.format") @property def data(self): @@ -212,7 +208,7 @@ def data(self): Get the data associated with the column """ - raise BaseCatalogue.FeatureNotImplemented('column.data') + raise BaseCatalogue.FeatureNotImplemented("column.data") def get_nb_rows(self): """Get Number of Rows. @@ -220,7 +216,7 @@ def get_nb_rows(self): Retrieve the number of rows of the column. """ - raise BaseCatalogue.FeatureNotImplemented('get_nb_rows()') + raise BaseCatalogue.FeatureNotImplemented("get_nb_rows()") def get_info(self): """Get Information. @@ -228,7 +224,7 @@ def get_info(self): Retrieve information about the column. """ - raise BaseCatalogue.FeatureNotImplemented('get_info()') + raise BaseCatalogue.FeatureNotImplemented("get_info()") def get_type(self): """Get Type. @@ -236,7 +232,7 @@ def get_type(self): Get the data type of the column """ - raise BaseCatalogue.FeatureNotImplemented('get_type()') + raise BaseCatalogue.FeatureNotImplemented("get_type()") class FeatureNotImplemented(NotImplementedError): """Feature Not Implemented. @@ -253,8 +249,8 @@ def __init__(self, msg): def __str__(self): return ( - f'File IO *** ERROR ***: Feature: {self._msg} is not ' - + 'implemented in this class' + f"File IO *** ERROR ***: Feature: {self._msg} is not " + + "implemented in this class" ) class catalogueNotOpen(Exception): @@ -274,8 +270,7 @@ def __init__(self, filepath): def __str__(self): return ( - f'File IO *** ERROR ***: catalogue: {self._filepath} ' - + 'is not open' + f"File IO *** ERROR ***: catalogue: {self._filepath} " + "is not open" ) class DataNotFound(Exception): @@ -298,8 +293,8 @@ def __init__(self, filepath, hdu): def __str__(self): return ( - f'File IO *** ERROR ***: File \'{self._filepath}\', ' - + f'hdu={self._hdu}: data not found' + f"File IO *** ERROR ***: File '{self._filepath}', " + + f"hdu={self._hdu}: data not found" ) class catalogueFileNotFound(Exception): @@ -319,7 +314,7 @@ def __init__(self, filepath): def __str__(self): """Set string representation of the exception object.""" - return f'File IO *** ERROR ***: file {self._filepath} no found' + return f"File IO *** ERROR ***: file {self._filepath} no found" class ColumnNotFound(Exception): """Column Not Found. @@ -338,7 +333,7 @@ def __init__(self, col_name): def __str__(self): """Set string representation of the exception object.""" - return f'File IO *** ERROR ***: column {self._col_name} no found' + return f"File IO *** ERROR ***: column {self._col_name} no found" class catalogueNotCreated(Exception): """Catalogue Not Created. @@ -357,8 +352,8 @@ def __init__(self, filepath): def __str__(self): return ( - f'File IO *** ERROR ***: catalogue: {self._filepath} could ' - + 'not be created' + f"File IO *** ERROR ***: catalogue: {self._filepath} could " + + "not be created" ) class OpenModeNotSupported(Exception): @@ -381,8 +376,8 @@ def __init__(self, filepath, open_mode): def __str__(self): return ( - f'File IO *** ERROR ***: catalogue: {self._filepath} ' - + 'Open Mode {self._open_mode} not supported' + f"File IO *** ERROR ***: catalogue: {self._filepath} " + + "Open Mode {self._open_mode} not supported" ) class OpenModeConflict(Exception): @@ -405,8 +400,8 @@ def __init__(self, open_mode, open_mode_needed): def __str__(self): return ( - 'File IO *** ERROR ***: catalogue has to be open as : ' - + f'{self._open_mode_needed} , Mode used : {self._open_mode}' + "File IO *** ERROR ***: catalogue has to be open as : " + + f"{self._open_mode_needed} , Mode used : {self._open_mode}" ) @@ -425,8 +420,8 @@ class FITSCatalogue(BaseCatalogue): File opening mode memmap : Bool Option to use memory mapping - SEx_catalogue : bool - Option to specify if the input is a SExtractor catalogue + fits_ldac : bool + Option to specify if the input is a FITS_LDAC catalogue """ @@ -436,7 +431,7 @@ def __init__( hdu_no=None, open_mode=BaseCatalogue.OpenMode.ReadOnly, memmap=False, - SEx_catalogue=False, + fits_ldac=False, ): BaseCatalogue.__init__(self, fullpath) @@ -445,12 +440,12 @@ def __init__( # opening mode (see FITSCatalogue.OpenMode) self._open_mode = open_mode - # Work with SExtractor fits format or not - self._SEx_catalogue = SEx_catalogue + # Work with SExtractor fits-ldac format or not + self._fits_ldac = fits_ldac # HDU number of the underlying .FITS table if hdu_no is None: # Default is 1 (or 2 if you are using ) - if SEx_catalogue: + if fits_ldac: self._hdu_no = 2 else: self._hdu_no = 1 @@ -461,9 +456,9 @@ def __init__( def __str__(self): if self._cat_data is not None: - info = f'{self.get_info()}' + info = f"{self.get_info()}" else: - info = 'No information' + info = "No information" return info @property @@ -527,10 +522,10 @@ def open(self): else: raise BaseCatalogue.catalogueFileNotFound(self.fullpath) - def create(self, ext_name=None, s_hdu=True, sex_cat_path=None): + def create(self, ext_name=None, s_hdu=True, ldac_header_path=None): """Create. - Create an empty catalogue in FITS format. + Create an empty catalogue in FITS format. For fits_ldac catalogs, one can inherit the ldac_imhead from an external catalog or create a blank one. Parameters ---------- @@ -538,40 +533,58 @@ def create(self, ext_name=None, s_hdu=True, sex_cat_path=None): Extension name or number s_hdu : bool If true add a secondary HDU - sex_cat_path : str - Path to SEXtractor catalogue + ldac_header_path : str + Path to SEXtractor fits-ldac catalogue from which we will "borrow" a ldac-imhead. A basic LDAC-IMHEAD will be created as default. """ primary_hdu = fits.PrimaryHDU() - if self._SEx_catalogue: - if sex_cat_path is not None: - if self._file_exists(sex_cat_path): - sex_cat = FITSCatalogue(sex_cat_path, hdu_no=1) - sex_cat.open() - secondary_hdu = sex_cat._cat_data[1] - self._cat_data = fits.HDUList([primary_hdu, secondary_hdu]) + if self._fits_ldac: + if ldac_header_path is not None: + if self._file_exists(ldac_header_path): + ldac_cat = FITSCatalogue(ldac_header_path, hdu_no=1) + ldac_cat.open() + imhead_hdu = ldac_cat._cat_data[1] + self._cat_data = fits.HDUList([primary_hdu, imhead_hdu]) self._cat_data.writeto(self.fullpath, overwrite=True) - sex_cat.close() - del sex_cat + ldac_cat.close() + del ldac_cat else: - raise BaseCatalogue.catalogueFileNotFound(sex_cat_path) - else: - raise ValueError( - 'sex_cat_path needs to be provided to create a ' - + 'SEXtractor catalogue' - ) + raise BaseCatalogue.catalogueFileNotFound(ldac_header_path) + else: + imhead_hdu = self._create_ldac_imhead(fits.PrimaryHDU.header()) + self._cat_data = fits.HDUList([primary_hdu, imhead_hdu]) + self._cat_data.writeto(self.fullpath, overwrite=True) + ldac_cat.close() elif s_hdu: - secondary_hdu = fits.BinTableHDU( - data=None, - header=None, - name=ext_name, - ) + secondary_hdu = fits.BinTableHDU(data=None, header=None, name=ext_name,) self._cat_data = fits.HDUList([primary_hdu, secondary_hdu]) self._cat_data.writeto(self.fullpath, overwrite=True) else: self._cat_data = fits.HDUList([primary_hdu]) self._cat_data.writeto(self.fullpath, overwrite=True) + def _create_ldac_imhead(external_header): + """Creates ldac imhead. + + Creates an ldac imhead from an astropy header + + Parameters + ---------- + header : astropy.io.fits.header + astropy fits header + + Returns + ------- + astropy.io.fits.BinTableHDU + """ + tblhdr = np.array([external_header.tostring(',')]) + col1 = fits.Column(name='Field Header Card', array=tblhdr, format='13200A') + cols = fits.ColDefs([col1]) + tbl = fits.BinTableHDU.from_columns(cols) + tbl.header['TDIM1'] = '(80, {0})'.format(len(external_header)) + tbl.header['EXTNAME'] = 'LDAC_IMHEAD' + return tbl + def copy_hdu(self, fits_file=None, hdu_no=None, hdu_name=None): """Copy HDU. @@ -606,11 +619,7 @@ def copy_hdu(self, fits_file=None, hdu_no=None, hdu_name=None): ) def apply_mask( - self, - fits_file=None, - hdu_no=None, - mask=None, - hdu_name=None, + self, fits_file=None, hdu_no=None, mask=None, hdu_name=None, ): """Apply Mask. @@ -639,9 +648,9 @@ def apply_mask( if fits_file._cat_data is None: raise BaseCatalogue.catalogueNotOpen(fits_file.fullpath) if mask is None: - raise ValueError('Mask not provided') + raise ValueError("Mask not provided") if type(mask) is not np.ndarray: - raise TypeError('Mask need to be a numpy.ndarray') + raise TypeError("Mask need to be a numpy.ndarray") if hdu_no is None: hdu_no = fits_file.hdu_no @@ -651,29 +660,166 @@ def apply_mask( if mask.dtype == bool: mask = np.where(mask is True) self._cat_data.append( - fits.BinTableHDU( - fits_file.get_data(hdu_no)[:][mask], - name=hdu_name, - ) + fits.BinTableHDU(fits_file.get_data(hdu_no)[:][mask], name=hdu_name,) ) elif mask.dtype == int: self._cat_data.append( - fits.BinTableHDU( - fits_file.get_data(hdu_no)[:][mask], - name=hdu_name, - ) + fits.BinTableHDU(fits_file.get_data(hdu_no)[:][mask], name=hdu_name,) + ) + else: + raise TypeError("Mask type must be of type int or bool") + + def _dict_to_astropy(self, data,header=None,ext_name=None): + """Dictionary to astropy. + + Coverts a dictionary to astropy hdus + + Parameters + ---------- + data : dict + Data to be stored + header : astropy.io.fits.header + External header + + Returns + ------- + astropy.io.fits.HDUList + """ + if type(data) is not dict: + raise TypeError("Data needs to be a dict") + + names = list(data.keys()) + it = list(range(len(names))) + if len(names) == 1: + data = np.array(data[names[0]]) + else: + data = [np.array(data[i]) for i in names] + + astropy_data=self._ndarray_to_astropy(data, it, header, names, ext_name) + + return astropy_data + + def _list_to_astropy(self, data, it, header=None, names=None, ext_name=None): + """list to astropy. + + Coverts a list to astropy hdus + + Parameters + ---------- + data : list + Data to be stored + header : astropy.io.fits.header + External header + names : List of column names + it : str + iterator + ext_name: str + Name of the HDU where data are stored + + Returns + ------- + astropy.io.fits.HDUList + """ + if names is None: + raise ValueError("Names not provided") + + it = range(len(names)) + data = np.asarray(data) + astropy_data = self._ndarray_to_astropy( + self, data, it, header, names, ext_name ) + return astropy_data + + def _ndarray_to_astropy( + self, data, it, header=None, names=None, ext_name=None + ): + """ndarray to astropy. + + Coverts a ndarray to astropy hdus + + Parameters + ---------- + data : numpy.ndarray + Data to be stored + header : astropy.io.fits.header + External header + names : List of column names + it : str + iterator + ext_name: str + Name of the HDU where data are stored + + Returns + ------- + astropy.io.fits.HDUList + """ + # check that data is okay + if type(data) is not np.ndarray: + raise TypeError("Data needs to be a numpy.ndarray") + # use names in array if not otherwise specified + if names is None: + if data.dtype.names is not None: + names = data.dtype.names + it = names + else: + raise ValueError("Names not provided") else: - raise TypeError('Mask type must be of type int or bool') + it = range(len(names)) + # this step seems really dumb + if len(names) == 1: + data = np.array([data]) + # define columns + col_list = [] + for idx in it: + data_shape = data[idx].shape[1:] + dim = str(tuple(data_shape)) + name = names[it.index(idx)] + data_type = self._get_fits_col_type(data[idx]) + mem_size = 1 + if len(data_shape) != 0: + for shape in data_shape: + mem_size *= shape + data_format = f"{mem_size}{data_type}" + col_list.append( + fits.Column( + name=name, format=data_format, array=data[idx], dim=dim, + ) + ) + elif data_type == "A": + mem_size *= len(max(data[idx], key=len)) + data_format = f"{mem_size}{data_type}" + col_list.append( + fits.Column( + name=name, + format=data_format, + array=data[idx], + dim=str((mem_size,)), + ) + ) + else: + data_format = f"{mem_size}{data_type}" + col_list.append( + fits.Column(name=name, format=data_format, array=data[idx],) + ) + + astropy_hdu = fits.BinTableHDU.from_columns( + col_list, + header=header, + name=ext_name + ) + + return astropy_hdu + def save_as_fits( self, data=None, names=None, ext_name=None, - sex_cat_path=None, + fits_ldac=False, + ldac_cat_path=None, image=False, - image_header=None, + header=None, overwrite=False, ): """Save as FITS. @@ -689,17 +835,19 @@ def save_as_fits( Parameters ---------- - data : numpy.ndarray + data : numpy.ndarray, dict, recarray, fits.fitsrec.FITS_rec, list, astropy.table.Table Data to be stored names : list List of column names ext_name : str Name of the HDU where data are stored - sex_cat_path : str - Path of the existing SExtractor catalogue to mimic + fits_ldac : bool + True if output will be in fits_ldac format + ldac_cat_path : str + Path of existing SExtractor fits-ldac catalogue from which to copy imhead image : bool If true create a fits image - image_header : astropy.io.fits.header + header : astropy.io.fits.header Header to use when saving an image overwrite : bool Option to overwrite an existing image, only used when creating a @@ -707,8 +855,8 @@ def save_as_fits( Notes ----- - To create a SExtractor-like FITS file you need to specify - ``SEx_catalogue=True`` when declaring the FITSCatalogue object. + To create a SExtractor-like FITS-LDAC file you need to specify + ``fits_ldac=True`` when declaring the FITSCatalogue object. """ if self.open_mode != FITSCatalogue.OpenMode.ReadWrite: @@ -716,154 +864,90 @@ def save_as_fits( open_mode=self.open_mode, open_mode_needed=FITSCatalogue.OpenMode.ReadWrite, ) + # handle ldac header, if external, get it if data is None: - raise ValueError('Data not provided') - + raise ValueError("Data not provided") + # convert all data types to astropy if not image: if type(data) is dict: - names = list(data.keys()) - it = list(range(len(names))) - if len(names) == 1: - data = np.array(data[names[0]]) - else: - data = [np.array(data[i]) for i in names] - self._save_to_fits( - data, - names, - it, - ext_name, - sex_cat_path, - overwrite=overwrite, - ) + astropy_data = self._dict_to_astropy( + data,header=header, ext_name=ext_name + ) + self._save_to_fits(astropy_data,fits_ldac=fits_ldac,overwrite=overwrite) + + elif type(data) is list: + astropy_data=self._list_to_astropy( + data, header=header, names=names, ext_name=ext_name + ) + self._save_to_fits(astropy_data,fits_ldac=fits_ldac,overwrite=overwrite) elif type(data) is np.recarray: names = list(data.dtype.names) it = names + astropy_data=self._ndarray_to_astropy( + data, it, header=header,names=names, ext_name=ext_name + ) self._save_to_fits( - data, - names, - it, - ext_name, - sex_cat_path, - overwrite=overwrite, - ) + astropy_data,fits_ldac=fits_ldac,overwrite=overwrite + ) + elif type(data) is fits.fitsrec.FITS_rec: - self._save_from_recarray(data, ext_name, sex_cat_path) - + astropy_data = fits.BinTableHDU(data, name=ext_name) + self._save_to_fits(astropy_data,fits_ldac=fits_ldac,overwrite=overwrite) + elif type(data) is np.ndarray: - if names is None: - if data.dtype.names is not None: - names = data.dtype.names - it = names - else: - raise ValueError('Names not provided') - else: - it = range(len(names)) + astropy_data=self._ndarray_to_astropy( + data, it, header=header, names=names, ext_name=ext_name + ) self._save_to_fits( - data, - names, - it, - ext_name, - sex_cat_path, - overwrite=overwrite, - ) - - elif type(data) is list: - if names is None: - raise ValueError('Names not provided') - it = range(len(names)) - data = np.asarray(data) - self._save_to_fits( - data, - names, - it, - ext_name, - sex_cat_path, - overwrite=overwrite, - ) + astropy_data,fits_ldac=fits_ldac,overwrite=overwrite + ) + # FIX THIS elif type(data) is Table: if names is None: - raise ValueError('Names not provided') + raise ValueError("Names not provided") it = names self._save_to_fits( - data, - names, - it, - ext_name, - sex_cat_path, - overwrite=overwrite, + data, names, it, ext_name, ldac_cat_path, overwrite=overwrite, ) else: if type(data) is np.ndarray: self._save_image( - data=data, - header=image_header, - overwrite=overwrite, + data=data, header=header, overwrite=overwrite, ) else: - raise TypeError('Data need to be a numpy.ndarray') + raise TypeError("Data need to be a numpy.ndarray") + - def create_from_numpy( - self, - matrix, - col_names, - ext_name=None, - ext_ver=None, - header=None, - ): - """Create from Numpy. + def _header_from_dict( self, header_dict=None): + """Create astropy header from dictionary. Create a new catalogue from a two-dimensional numpy array. Parameters ---------- - matrix : numpy.ndarray - Two-dimensional numpy array - col_names : list - List of column names to use as the header - ext_name : str - Extension name or number - ext_ver : str - Extension version - header : list - List of dictionaries with keys: 'card', name', 'value', - 'value_orig', 'comment' - + header_dict : dict + dictionary that will be converted to astropy.io.fits.header + + Returns + ------- + astropy.io.fits.header """ - col_list = [] - for col_name in col_names: - icol = col_names.index(col_name) - col_type = self._get_fits_col_type(matrix[:, icol]) - col_data = fits.Column( - name=col_name, - format=col_type, - array=np.ravel(matrix[:, icol]), - ) - col_list.append(col_data) + fits_header = fits.Header() + + for (k, v) in dict.items(): + fits_header[k] = v - fits_header = None - if header is not None: - fits_header = fits.Header() - for (k, v) in header.items(): - fits_header[k] = v - - primary_hdu = fits.PrimaryHDU() - secondary_hdu = fits.BinTableHDU.from_columns( - col_list, - header=fits_header, - ) - if ext_name is not None: - secondary_hdu.name = ext_name - - self._cat_data = fits.HDUList(hdus=[primary_hdu, secondary_hdu]) - self._cat_data.writeto(self.fullpath, overwrite=True) + return fits_header def close(self): - """Close.""" + """Close. + Appends or overwrites data and prevents further writing. + """ if self._cat_data is not None: if self.open_mode == FITSCatalogue.OpenMode.ReadWrite: self.save() @@ -874,7 +958,9 @@ def close(self): raise BaseCatalogue.catalogueNotOpen(self.fullpath) def save(self): - """Save.""" + """Save. + Appends or overwrites data. + """ if self.open_mode == FITSCatalogue.OpenMode.ReadWrite: self._cat_data.flush() else: @@ -1130,6 +1216,37 @@ def get_data(self, hdu_no=None): else: raise BaseCatalogue.catalogueNotOpen(self.fullpath) + + def _fits_header_from_fits_LDAC(self, ldac_catalogue_path): + """Fits header from a fits-ldac catalog. + + Creates a fits header from a sextractor fits-LDAC field header. + + Parameters + ---------- + ldac_catalogue_path : str + Path to SEXtractor fits-ldac catalogue + + Returns + ------- + astropy.io.fits.Header + """ + # open file and get data + cat = fits.open(ldac_catalogue_path) + header_hdu_no = self.hdu_no - 1 + field_cards = cat[header_hdu_no].data + + # initialize empty header + header = fits.Header(cards=[]) + + for i in np.arange(len(field_cards["Field Header Card"][0])): + cardstring = field_cards["Field Header Card"][0][i] + card = fits.Card.fromstring(cardstring) + header.append(card) + + cat.close() + + return header def get_header(self, hdu_no=None): """Get Header. @@ -1139,12 +1256,12 @@ def get_header(self, hdu_no=None): Parameters ---------- hdu_no : int - HDU index + HDU index Returns ------- - astropy.io.fits.header - FITS header + dict + FITS header in dictionary format Notes ----- @@ -1154,7 +1271,12 @@ def get_header(self, hdu_no=None): if self._cat_data is not None: if hdu_no is None: hdu_no = self.hdu_no - return dict(self._cat_data[hdu_no].header.items()) + + if self._fits_ldac: + astropy_header = self._fits_header_from_fits_LDAC(self.fullpath) + return dict(astropy_header.items()) + else: + return dict(self._cat_data[hdu_no].header.items()) else: raise BaseCatalogue.catalogueNotOpen(self.fullpath) @@ -1178,16 +1300,16 @@ def get_header_value(self, request, hdu_no=None): """ if request is None: - raise ValueError('request not provided') + raise ValueError("request not provided") if type(request) is not str: - raise TypeError('request has to be a string') + raise TypeError("request has to be a string") if hdu_no is None: hdu_no = self._hdu_no header = self.get_header(hdu_no=hdu_no) if header is None: - raise ValueError(f'Empty header in the hdu : {hdu_no}') + raise ValueError(f"Empty header in the hdu : {hdu_no}") return interpreter(string=request, catalogue=header).result @@ -1222,42 +1344,14 @@ def add_header_card(self, key, value=None, comment=None, hdu_no=None): card = [] if key is None: - raise ValueError('key not provided') + raise ValueError("key not provided") else: card.append(key) - - if value is not None: card.append(value) - else: - if comment is not None: - card.append('') - - if comment is not None: card.append(comment) - card = tuple(card) - - self._cat_data[hdu_no].header.append(card, end=True) - - def get_headers(self): - """Get Headers. - - Return the catalogue header as a list of dictionaries. - - Returns - ------- - list - list of headers - - """ - headers = [] - try: - for hdu in self._cat_data: - headers.append(dict(hdu.header.items())) - except Exception: - pass - - return headers + card = tuple(card) + self._cat_data[hdu_no].header.append(card, end=True) def get_comments(self, hdu_no=None): """Get Comments. @@ -1305,13 +1399,9 @@ def get_col_comments(self, hdu_no=None): if hdu_no is None: hdu_no = self.hdu_no hdr_col_types = [ - tt for tt in self._cat_data[hdu_no].header.keys() - if 'TTYPE' in tt - ] - return [ - self._cat_data[hdu_no].header.comments[c] - for c in hdr_col_types + tt for tt in self._cat_data[hdu_no].header.keys() if "TTYPE" in tt ] + return [self._cat_data[hdu_no].header.comments[c] for c in hdr_col_types] else: raise BaseCatalogue.catalogueNotOpen(self.fullpath) @@ -1379,12 +1469,11 @@ def add_col( if open_mode != FITSCatalogue.OpenMode.ReadWrite: raise BaseCatalogue.OpenModeConflict( - open_mode=open_mode, - open_mode_needed=FITSCatalogue.OpenMode.ReadWrite, + open_mode=open_mode, open_mode_needed=FITSCatalogue.OpenMode.ReadWrite, ) if type(col_data) != np.ndarray: - TypeError('col_data must be a numpy.ndarray') + TypeError("col_data must be a numpy.ndarray") if hdu_no is None: hdu_no = self.hdu_no @@ -1410,31 +1499,34 @@ def add_col( if len(data_shape) != 0: for k in data_shape: mem_size *= k - data_format = f'{mem_size}{data_type}' - new_col = fits.ColDefs([fits.Column( - name=col_name, - format=data_format, - array=col_data, - dim=dim, - )]) + data_format = f"{mem_size}{data_type}" + new_col = fits.ColDefs( + [ + fits.Column( + name=col_name, format=data_format, array=col_data, dim=dim, + ) + ] + ) col_list += new_col - elif data_type == 'A': + elif data_type == "A": mem_size *= len(max(col_data, key=len)) - data_format = f'{mem_size}{data_type}' - new_col = fits.ColDefs([fits.Column( - name=col_name, - format=data_format, - array=col_data, - dim=str((mem_size,)), - )]) + data_format = f"{mem_size}{data_type}" + new_col = fits.ColDefs( + [ + fits.Column( + name=col_name, + format=data_format, + array=col_data, + dim=str((mem_size,)), + ) + ] + ) col_list += new_col else: - data_format = f'{mem_size}{data_type}' - new_col = fits.ColDefs([fits.Column( - name=col_name, - format=data_format, - array=col_data, - )]) + data_format = f"{mem_size}{data_type}" + new_col = fits.ColDefs( + [fits.Column(name=col_name, format=data_format, array=col_data,)] + ) col_list += new_col new_fits.append(fits.BinTableHDU.from_columns(col_list, name=ext_name)) @@ -1447,9 +1539,7 @@ def add_col( self._cat_data.close() del self._cat_data self._cat_data = fits.open( - self.fullpath, - mode=self.open_mode, - memmap=self.use_memmap, + self.fullpath, mode=self.open_mode, memmap=self.use_memmap, ) def remove_col(self, col_index): @@ -1463,7 +1553,7 @@ def remove_col(self, col_index): Index of the column to delete """ - raise BaseCatalogue.FeatureNotImplemented('remove_col()') + raise BaseCatalogue.FeatureNotImplemented("remove_col()") def remove_named_col(self, col_name): """Remove Named Column. @@ -1497,9 +1587,7 @@ def _append_col(self, column, hdu_no=None): """ if self._cat_data is not None: new_col = fits.Column( - name=column.name, - format=column.format, - array=column.data, + name=column.name, format=column.format, array=column.data, ) if hdu_no is None: @@ -1508,11 +1596,15 @@ def _append_col(self, column, hdu_no=None): orig_table = fits.open(self.fullpath)[hdu_no].data orig_cols = orig_table.columns - new_col = fits.ColDefs([fits.Column( - name=column.name, - format=column.format, - array=np.zeros(len(orig_table)), - )]) + new_col = fits.ColDefs( + [ + fits.Column( + name=column.name, + format=column.format, + array=np.zeros(len(orig_table)), + ) + ] + ) col_list = orig_cols + new_col hdu = fits.BinTableHDU.from_columns(col_list) hdu.data[column.name] = column.data @@ -1537,22 +1629,27 @@ def _get_fits_col_type(self, col_data): Column FITS data type """ + type_mapping = { + type(None): "D", + np.int16: "I", + np.int32: "J", + int: "K", + np.int64: "K", + float: "D", + np.float16: "D", + np.float32: "D", + np.float64: "D", + bool: "L", + str: "A", + np.str: "A", + np.str_: "A", + np.str0: "A" + } + if col_data is None or len(col_data) == 0: - col_type = 'D' - elif type(col_data[0]) in [np.int16]: - col_type = 'I' - elif type(col_data[0]) in [np.int32]: - col_type = 'J' - elif type(col_data[0]) in [int, np.int64]: - col_type = 'K' - elif type(col_data[0]) in [float, np.float16, np.float32, np.float64]: - col_type = 'D' - elif type(col_data[0]) is bool: - col_type = 'L' - elif type(col_data[0]) in [str, np.str, np.str_, np.str0]: - col_type = 'A' + col_type = "D" else: - col_type = 'D' + col_type = type_mapping.get(type(col_data[0]), "D") return col_type @@ -1572,159 +1669,65 @@ def _get_python_col_type(self, col_type): Column Python data type """ - if col_type in ['B', 'I', 'J', 'K']: - pcol_type = '%d' - elif col_type in ['D', 'E']: - pcol_type = '%f' - elif col_type in ['A', 'C', 'M']: - pcol_type = '%s' - elif col_type == 'L': - pcol_type = '%s' - else: - pcol_type = '%f' - - return pcol_type - - def _save_to_fits( - self, - data, - names, - it, - ext_name=None, - sex_cat_path=None, - overwrite=False, - ): + type_mapping = { + "B": "%d", + "I": "%d", + "J": "%d", + "K": "%d", + "D": "%f", + "E": "%f", + "A": "%s", + "C": "%s", + "M": "%s", + "L": "%s" + } + + return type_mapping.get(col_type, "%f") + + def _save_to_fits(self, data, fits_ldac=False, overwrite=False): """Save to FITS. - Save array of data as fits with their associated column names. + Save and close after adding data in form of astropy HDYlist. Can save as as fits or fits_ldac. (do we want to have this option here?) Parameters ---------- - data : numpy.ndarray - Array with the data - names : list - List of the column names - it : iterator - ? - ext_name : str - Name of the HDU where data are stored - sex_cat_path : str - Path of the existing SExtractor catalogue to mimic + data : astropy.io.HDUlist + Astropy data + fits_ldac : bool + save as fits_ldac format if True overwrite : bool - Option to overwrite an existing catalogue + Option to overwrite an existing catalogue, otherwise new extension will be added """ if data is None: - raise ValueError('Data not provided') + raise ValueError("Data not provided") if self._file_exists(self.fullpath) and not overwrite: if self._cat_data is None: self.open() if ext_name is None: - ext_name = 'new' + ext_name = "new" else: - if self._SEx_catalogue: - self.create(s_hdu=False, sex_cat_path=sex_cat_path) + if self.fits_ldac: + self.create(s_hdu=False) self.open() + header = data.header() + #this will be present in data.header + ldac_imhead = self._create_ldac_imhead(header) + self._cat_data.append(ldac_imhead) if ext_name is None: - ext_name = 'LDAC_OBJECTS' + ext_name = "LDAC_OBJECTS" + + else: self.create(s_hdu=False) self.open() if ext_name is None: - ext_name = 'new' + ext_name = "new" - if len(names) == 1: - data = np.array([data]) - col_list = [] - for idx in it: - data_shape = data[idx].shape[1:] - dim = str(tuple(data_shape)) - name = names[it.index(idx)] - data_type = self._get_fits_col_type(data[idx]) - mem_size = 1 - if len(data_shape) != 0: - for shape in data_shape: - mem_size *= shape - data_format = f'{mem_size}{data_type}' - col_list.append(fits.Column( - name=name, - format=data_format, - array=data[idx], - dim=dim, - )) - elif data_type == 'A': - mem_size *= len(max(data[idx], key=len)) - data_format = f'{mem_size}{data_type}' - col_list.append(fits.Column( - name=name, - format=data_format, - array=data[idx], - dim=str((mem_size,)), - )) - else: - data_format = f'{mem_size}{data_type}' - col_list.append(fits.Column( - name=name, - format=data_format, - array=data[idx], - )) - - self._cat_data.append( - fits.BinTableHDU.from_columns(col_list, name=ext_name) - ) + self._cat_data.append(data) self.close() - def _save_from_recarray( - self, - data=None, - ext_name=None, - sex_cat_path=None, - overwrite=False, - ): - """Save From Record Array. - - Save a numpy.recarray or astropy.io.fits.fitsrec.FITS_rec into a FITS - file. - - Parameters - ---------- - data : numpy.ndarray - Array with the data - ext_name : str - Name of the HDU where data are stored - sex_cat_path : str - Path of the existing SExtractor catalogue to mimic - overwrite : bool - Option to overwrite an existing catalogue - - """ - if data is None: - raise ValueError('Data not provided') - - if self._file_exists(self.fullpath) and not overwrite: - if self._cat_data is None: - self.open() - if ext_name is None: - ext_name = 'new' - self._cat_data.append(fits.BinTableHDU(data, name=ext_name)) - self.close() - else: - if self._SEx_catalogue: - self.create(s_hdu=False, sex_cat_path=sex_cat_path) - self.open() - if ext_name is None: - ext_name = 'LDAC_OBJECTS' - self._cat_data.append(fits.BinTableHDU(data, name=ext_name)) - self.close() - else: - self.create(s_hdu=False) - self.open() - if ext_name is None: - ext_name = 'new' - self._cat_data.append(fits.BinTableHDU(data, name=ext_name)) - self.close() - def _save_image(self, data=None, header=None, overwrite=False): """Save Image. @@ -1740,13 +1743,12 @@ def _save_image(self, data=None, header=None, overwrite=False): Option to overwrite an existing catalogue """ - if (data is not None): + if data is not None: fits.PrimaryHDU(data, header).writeto( - self.fullpath, - overwrite=overwrite, + self.fullpath, overwrite=overwrite, ) else: - raise ValueError('Data or names not provided') + raise ValueError("Data or names not provided") class Column(BaseCatalogue.Column): """Column. @@ -1772,7 +1774,7 @@ def __init__(self, name, format=None, comment=None, data=None): self._name = name if format is None: - format = 'D' + format = "D" self._format = format if comment is None: @@ -1788,7 +1790,7 @@ def __init__(self, name, format=None, comment=None, data=None): self._data = data def __str__(self): - info = f'{self._cat_col}' + info = f"{self._cat_col}" return info @property @@ -1852,7 +1854,7 @@ def get_unit_from_fits_header(header, key): Parameters ---------- header : FITS header - Header information + Header information (dict? or astropy object???) key : str Column name @@ -1871,7 +1873,7 @@ def get_unit_from_fits_header(header, key): idx = 1 idx_found = -1 while True: - ttype_idx = f'TTYPE{idx}' + ttype_idx = f"TTYPE{idx}" if ttype_idx not in header: # Reached beyond last column break @@ -1883,14 +1885,13 @@ def get_unit_from_fits_header(header, key): idx += 1 if idx_found == -1: - raise IndexError(f'Column \'{key}\' not found in FITS header') + raise IndexError(f"Column '{key}' not found in FITS header") # Extract coordinate unit string from header - tcunit_idx = f'TCUNI{idx}' + tcunit_idx = f"TCUNI{idx}" if tcunit_idx not in header: raise IndexError( - f'No coordinate unit found for column \'{key}\'' - ' in FITS header' + f"No coordinate unit found for column '{key}'" " in FITS header" ) unit_str = header[tcunit_idx]