-
Notifications
You must be signed in to change notification settings - Fork 0
/
outputs.py
executable file
·452 lines (313 loc) · 16.4 KB
/
outputs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
r"""
.. codeauthor:: Hugo Plombat - LUPM <[email protected]> & Wilfried Mercier - IRAP/LAM <[email protected]>
Classes used by the sed objects to generate objects loading output tables and producing resolved maps.
"""
import os.path as opath
import astropy.io.ascii as asci
import numpy as np
from numpy import ndarray
from abc import ABC, abstractmethod
from typing import Tuple, Union, Optional, Dict, Any
from astropy.io import fits
from astropy.table import Table
from astropy.units import Quantity
from .misc.enum import LePhareOutputParam
from .filters import FilterList
class Output(ABC):
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Abstract SED fitting code output class.
:param file: name of the SED fitting outut file
:type file: :python:`str`
'''
def __init__(self, file: str, *args, **kwargs) -> None:
r'''Init method.'''
if not isinstance(file, str):
raise TypeError(f'file parameter has type {type(file)} but it must have type str.')
fullFile = opath.expandvars(file)
if not opath.isfile(fullFile):
raise ValueError(f'file {file} (expanded as {fullFile}) not found.')
#: SED fitting code output file name used
self.file: str = fullFile
#: Image properties corresponding to the output table (keys are None by default, update using the link method with a FilterList object)
self.imProp: Dict[str, Any] = {'shape' : None,
'scale' : None,
'meanMap' : None
}
#: Configuration dictionary with info from the header
self.config: str = {}
#: Table gathering data
self.table: Table = None
@abstractmethod
def load(self, *args, **kwargs) -> Table:
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Load data from a file into an astropy Table object.
:returns: the astropy table
:rtype: `Astropy Table`_
'''
return
def link(self, filterList: FilterList, *args, **kwargs) -> None:
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Provide the default image properties from a FilterList object.
:param FilterList filterList: FilterList object from which the image properties are retrieved
:raises TypeError: if **filterList** is not of type :py:class:`~.FilterList`
'''
if not isinstance(filterList, FilterList):
raise TypeError(f'filterList parameter has type {type(filterList)} but it must be of type FilterList.')
self.imProp['shape'] = filterList.shape
self.imProp['scale'] = filterList.scaleFac
self.imProp['meanMap'] = filterList.meanMap
return
@abstractmethod
def toImage(self, *args, **kwargs) -> Quantity:
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Generate a resolved map.
:returns: the image
:rtype: `Astropy Quantity`_
'''
return
class CigaleOutput(Output):
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Init the output class.
:param file: name of the SED fitting outut file
:type file: :python:`str`
'''
def __init__(self, file: str, *args, **kwargs):
r'''Init method.'''
super().__init__(file, *args, **kwargs)
#: Mapping between column names and units
self.units = {}
self.load()
def _getUnits(self, hdr: fits.Header, *args, **kwargs) -> Dict[str, str]:
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Get and return the units from a Cigale output fits file header and map them to their column names.
:param hdr: header to read the units from
:type hdr: `Astropy Header`_
:returns: mapping between column names and units
:rtype: :python:`dict[str, str]`
'''
self.units = {hdr[f'TTYPE{i}'] : hdr[f'TUNIT{i}'] if f'TUNIT{i}' in hdr else '' for i in range(1, hdr['TFIELDS']+1)}
return self.units
def link(self, filterList: FilterList, *args, **kwargs) -> None:
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Provide the default image properties from a FilterList object.
:param FilterList filterList: FilterList object from which the image properties are retrieved
:raises TypeError: if **filterList** is not of type :py:class:`~.FilterList`
'''
if not isinstance(filterList, FilterList):
raise TypeError(f'filterList parameter has type {type(filterList)} but it must be of type FilterList.')
# Scale and meanMap are not used when generating the catalogue for cigale so no need to retrieve them
self.imProp['shape'] = filterList.shape
return
def load(self, *args, **kwargs) -> Table:
r'''Load data from Cigale output fits file.'''
with fits.open(self.file) as hdul:
self._getUnits(hdul[1].header)
table = Table(hdul[1].data)
self.table = table
return table
def toImage(self, name: str,
shape: Optional[Tuple[int]] = None, **kwargs) -> Quantity:
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Generate an image from the Astropy Table given column name.
**Arguments**
:param name: name of the column to generate the image from
:type name: :python:`str`
**Keyword arguments**
:param tuple[int] shape: (**Optional**) shape of the output image. The shape must be such that :python:`shape[0]*shape[1] == len(self.table)`. If :python:`None`, the default value provided in the :py:meth:`~.CigaleOutput.link` method is used.
:type shape: :python:`tuple[int]`
:returns: output image as an `Astropy Quantity`_. Use .data method to only get the array.
:rtype: `Astropy Quantity`_
:raises TypeError:
* if **name** is not of type :python:`str`
* if **shape** is neither a :python:`tuple` nor a :python:`list`
:raises ValueError: if **shape** is not of length 2
'''
if not isinstance(name, str):
raise TypeError(f'column name has type {type(name)} but it must have type str.')
# Check shape parameter
if self.imProp['shape'] is None and shape is None:
raise ValueError('an image shape must be provided either in this function call or using the link method.')
if shape is not None:
if not isinstance(shape, (tuple, list)):
raise TypeError(f'shape parameter has type {type(shape)} but it must have type tuple or list.')
if len(shape) != 2:
raise ValueError(f'shape parameter has length {len(shape)} but it must have a length of 2.')
else:
shape = self.imProp['shape']
# Location of good pixels
indices = self.table['id']
# Output array (NaN for bad pixels - default NaN everywhere)
data = np.full(shape[0]*shape[1], np.nan)
data[indices] = self.table[name]
data = data.reshape(shape)
return Quantity(data, unit=self.units[name])
class LePhareOutput(Output):
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Implement an output class for LePhare.
:param file: name of the SED fitting outut file
:type file: :python:`str`
'''
def __init__(self, file: str, *args, **kwargs) -> None:
r'''Init method.'''
super().__init__(file, *args, **kwargs)
self.load()
def load(self, *args, **kwargs) -> Table:
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Load data from LePhare output file.
:returns: the astropy table
:rtype: `Astropy Table`_
'''
# Read header first
colMap = self.readHeader()
# Select columns from the header and map them to the names given in the Enum object
colNames = [f'{i}' if i[-2:] != '()' else f'{i[:-2]}_2' for i in colMap.keys()]
colDef = [f'col{i}' for i in colMap.values()]
tcolNames = []
tcolUnits = []
tcolLog = []
for i in colNames:
if i == 'IDENT':
tcolNames.append('ID')
tcolUnits.append('')
tcolLog.append(False)
else:
# Get names which will appear in the Astropy table column headers
tcolNames.append(LePhareOutputParam[i].value.name)
# Get units which will appear in the Astropy table colulmn headers
tcolUnits.append(LePhareOutputParam[i].value.unit)
# Get log flag for each column
tcolLog.append(LePhareOutputParam[i].value.log)
# Read full Table
table = asci.read(self.file, comment='#')
# Only keep desired columns, rename them and add unit
table = table[colDef]
for tcname, tcunit, tclog, cdef in zip(tcolNames, tcolUnits, tcolLog, colDef):
table.rename_column(cdef, tcname)
table[tcname].unit = tcunit
if tclog:
table[tcname] = 10**table[tcname]
self.table = table
return table
def readHeader(self, *args, **kwargs) -> Dict[str, str]:
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Read the header of LePhare output file.
:returns: dict mapping between column names and column number
:rtype: :python:`dict[str, str]`
'''
with open(self.file, 'r') as f:
# Go through header lines till output format line
for line in f:
line = line.strip()
if line == '# Output format #':
break
elif ':' not in line:
continue
else:
key, value = (i.strip() for i in line.strip('#').rsplit(':', maxsplit=1))
self.config[key] = value
else:
raise IOError('Output format line could not be reached.')
# Go through the lines which tell us which parameters to extract and where
colMap = {}
for line in f:
line = line.strip()
if line == '#######################################':
break
else:
items = line.strip('#, ').split(',')
for item in items:
key, val = item.split()
colMap[key] = int(val)
else:
raise IOError('End of SED fitting code output file header could not be reached.')
return colMap
def toImage(self, name: str,
shape: Optional[Tuple[int]] = None,
scaleFactor: Optional[Union[int, float]] = None,
meanMap: Optional[ndarray] = None, **kwargs) -> Quantity:
r'''
.. codeauthor:: Wilfried Mercier - IRAP/LAM <[email protected]>
Generate an image from the Astropy Table given column name.
.. note::
Depending on the type of output quantity one wants to generate an image from, **scaleFactor** can have different values:
* if the output quantity is extensive, then this is the same scaleFactor which was used to scale down the filters data
* if the output quantity is intensive, then it must be :python:`1`
The **shape**, **scaleFactor** and **meanMap** parameters can also be passed as default values using
.. code:: python
output = LePhareOutput('someFileName.out')
output.link(filterList)
where filterList is the :py:class:`~.FilterList` object used to perform the SED fitting.
These parameters always override the default values if provided.
**Arguments**
:param name: name of the column to generate the image from
:type name: :python:`str`
**Keyword arguments**
:param shape: shape of the output image. The shape must be such that :python:`shape[0]*shape[1] == len(self.table)`. If :python:`None`, the default value provided in the :py:meth:`~.LePhareOutput.link` method is used.
:type shape: :python:`tuple[int]`
:param scaleFactor: scale factor used to scale up the image. If :python:`None`, the default value provided in the :py:meth:`~.Output.link` method is used.
:type scaleFactor: :python:`int/float`
:param meanMap: mean map used during the filterList table creation to normalise the data. If :python:`None`, the default value provided in the :py:meth:`~.Output.link` method is used.
:type meanMap: `ndarray`_
:returns: output image as an Astropy Quantity. Use .data method to only get the array.
:rtype: `Astropy Quantity`_
:raises TypeError:
* if **name** is not of type :python:`str`
* if **shape** is neither a :python:`tuple` nor a :python:`list`
* if **scaleFactor** is neither an :python:`int` nor a :python:`float`
* if **meanMap** is not a `ndarray`_
:raises ValueError: if **shape** is not of length 2
'''
if not isinstance(name, str):
raise TypeError(f'column name has type {type(name)} but it must have type str.')
# Check shape parameter
if self.imProp['shape'] is None and shape is None:
raise ValueError('an image shape must be provided either in this function call or using the link method.')
if shape is not None:
if not isinstance(shape, (tuple, list)):
raise TypeError(f'shape parameter has type {type(shape)} but it must have type tuple or list.')
if len(shape) != 2:
raise ValueError(f'shape parameter has length {len(shape)} but it must have a length of 2.')
else:
shape = self.imProp['shape']
# Check scaleFactor parameter
if self.imProp['scale'] is None and scaleFactor is None:
raise ValueError('a scale facor must be provided either in this function call or using the link method.')
if scaleFactor is not None:
if not isinstance(scaleFactor, (int, float)):
raise TypeError(f'scaleFactor parameter has type {type(scaleFactor)} but it must have type int or float.')
if scaleFactor <= 0:
raise ValueError(f'scaleFactor parameter has value {scaleFactor} but it must be strictly positive.')
else:
scaleFactor = self.imProp['scale']
# Check meanMap parameter
if meanMap is not None:
if not isinstance(meanMap, ndarray):
raise TypeError(f'meanMap parameter has type {type(meanMap)} but it must have type ndarray.')
else:
meanMap = self.imProp['meanMap']
if shape != meanMap.shape:
raise ValueError(f'meanMap has shape {meanMap.shape} but is must have shape {shape}.')
# Location of good pixels
indices = self.table['ID']
# Output array (NaN for bad pixels - default NaN everywhere)
data = np.full(shape[0]*shape[1], np.nan)
data[indices] = self.table[name].data / scaleFactor
data = data.reshape(shape)
# Multiply by mean map only where it is non zero
if meanMap is not None:
mask = meanMap != 0
data[mask] *= meanMap[mask]
return Quantity(data, unit=self.table[name].unit)