Skip to content

Commit

Permalink
Merge pull request #227 from AstarVienna/oc/rectification
Browse files Browse the repository at this point in the history
rectification of spectral traces
  • Loading branch information
hugobuddel authored Jun 25, 2023
2 parents 62b5ac3 + f1a8239 commit fba76d1
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 20 deletions.
102 changes: 97 additions & 5 deletions scopesim/effects/spectral_trace_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from astropy.table import Table

from .effects import Effect
from .spectral_trace_list_utils import SpectralTrace
from .ter_curves import FilterCurve
from .spectral_trace_list_utils import SpectralTrace, make_image_interpolations
from ..utils import from_currsys, check_keys
from ..optics.image_plane_utils import header_from_list_of_xy
from ..base_classes import FieldOfViewBase, FOVSetupBase
Expand Down Expand Up @@ -103,6 +104,7 @@ def __init__(self, **kwargs):
params = {"z_order": [70, 270, 670],
"pixel_scale": "!INST.pixel_scale", # [arcsec / pix]}
"plate_scale": "!INST.plate_scale", # [arcsec / mm]
"spectral_bin_width": "!SIM.spectral.spectral_bin_width", # [um]
"wave_min": "!SIM.spectral.wave_min", # [um]
"wave_mid": "!SIM.spectral.wave_mid", # [um]
"wave_max": "!SIM.spectral.wave_max", # [um]
Expand Down Expand Up @@ -193,10 +195,6 @@ def apply_to(self, obj, **kwargs):
logging.info("Making cube")
obj.cube = obj.make_cube_hdu()

# ..todo: obj will be changed to a single one covering the full field of view
# covered by the image slicer (28 slices for LMS; for LSS still only a single slit)
# We need a loop over spectral_traces that chops up obj into the single-slice fov before
# calling map_spectra...
trace_id = obj.meta["trace_id"]
spt = self.spectral_traces[trace_id]
obj.hdu = spt.map_spectra_to_focal_plane(obj)
Expand Down Expand Up @@ -226,6 +224,100 @@ def image_plane_header(self):

return hdr

def rectify_traces(self, hdulist, xi_min=None, xi_max=None, interps=None,
**kwargs):
"""Create rectified 2D spectra for all traces in the list
This method creates an HDU list with one extension per spectral
trace, i.e. it essentially treats all traces independently.
For the case of an IFU where the traces correspond to spatial
slices for the same wavelength range, use method `rectify_cube`
(not yet implemented).
Parameters
----------
hdulist : str or fits.HDUList
The result of scopesim readout()
xi_min, xi_max : float [arcsec]
Spatial limits of the slit on the sky. This should be taken
from the header of the hdulist, but this is not yet provided by
scopesim. For the time being, these limits *must* be provided by
the user.
interps : list of interpolation functions
If provided, there must be one for each image extension in `hdulist`.
The functions go from pixels to the images and can be created with,
e.g., RectBivariateSpline.
"""
try:
inhdul = fits.open(hdulist)
except TypeError:
inhdul = hdulist

# Crude attempt to get a useful wavelength range
# Problematic because different instruments use different
# keywords for the filter... We try to make it work for METIS
# and MICADO for the time being.
try:
filter_name = from_currsys("!OBS.filter_name")
except ValueError:
filter_name = from_currsys("!OBS.filter_name_fw1")

filtcurve = FilterCurve(
filter_name=filter_name,
filename_format=from_currsys("!INST.filter_file_format"))
filtwaves = filtcurve.table['wavelength']
filtwave = filtwaves[filtcurve.table['transmission'] > 0.01]
wave_min, wave_max = min(filtwave), max(filtwave)
logging.info("Full wavelength range: %.02f .. %.02f um",
wave_min, wave_max)

if xi_min is None or xi_max is None:
try:
xi_min = inhdul[0].header["HIERARCH INS SLIT XIMIN"]
xi_max = inhdul[0].header["HIERARCH INS SLIT XIMAX"]
logging.info(
"Slit limits taken from header: %.02f .. %.02f arcsec",
xi_min, xi_max)
except KeyError:
logging.error("""
Spatial slit limits (in arcsec) must be provided:
- either as method parameters xi_min and xi_max
- or as header keywords HIERARCH INS SLIT XIMIN/XIMAX
""")
return None


bin_width = kwargs.get("bin_width", None)

if interps is None:
logging.info("Computing interpolation functions")
interps = make_image_interpolations(hdulist)

pdu = fits.PrimaryHDU()
pdu.header['FILETYPE'] = "Rectified spectra"
#pdu.header['INSTRUME'] = inhdul[0].header['HIERARCH ESO OBS INSTRUME']
#pdu.header['FILTER'] = from_currsys("!OBS.filter_name_fw1")
outhdul = fits.HDUList([pdu])

for i, trace_id in enumerate(self.spectral_traces):
hdu = self[trace_id].rectify(hdulist,
interps=interps,
bin_width=bin_width,
xi_min=xi_min, xi_max=xi_max,
wave_min=wave_min, wave_max=wave_max)
if hdu is not None: # ..todo: rectify does not do that yet
outhdul.append(hdu)
outhdul[0].header[f"EXTNAME{i+1}"] = trace_id

outhdul[0].header.update(inhdul[0].header)

return outhdul


def rectify_cube(self, hdulist):
"""Rectify traces and combine into a cube"""
raise(NotImplementedError)

def plot(self, wave_min=None, wave_max=None, **kwargs):
if wave_min is None:
wave_min = from_currsys("!SIM.spectral.wave_min")
Expand Down
177 changes: 162 additions & 15 deletions scopesim/effects/spectral_trace_list_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
This module contains
- the definition of the `SpectralTrace` class. The visible effect should
always be a `SpectralTraceList`, even if that contains only one `SpectralTrace`.
always be a `SpectralTraceList`, even if that contains only one
`SpectralTrace`.
- the definition of the `XiLamImage` class
- utility functions for use with spectral traces
"""
Expand All @@ -23,7 +24,7 @@
from astropy.wcs import WCS
from astropy.modeling.models import Polynomial2D

from ..utils import power_vector, quantify
from ..utils import power_vector, quantify, from_currsys


class SpectralTrace:
Expand Down Expand Up @@ -201,19 +202,7 @@ def map_spectra_to_focal_plane(self, fov):
xmin_mm, ymin_mm = fpa_wcsd.all_pix2world(xmin, ymin, 0)
xmax_mm, ymax_mm = fpa_wcsd.all_pix2world(xmax, ymax, 0)

# Computation of dispersion dlam_per_pix along xi=0
# ..todo: This may have to be generalised - xi=0 is at the centre of METIS slits
# and the short MICADO slit.
xi = np.array([0] * 1001)
lam = np.linspace(wave_min, wave_max, 1001)
x_mm = self.xilam2x(xi, lam)
y_mm = self.xilam2y(xi, lam)
if self.dispersion_axis == "x":
dlam_grad = self.xy2lam.gradient()[0] # dlam_by_dx
else:
dlam_grad = self.xy2lam.gradient()[1] # dlam_by_dy
self.dlam_per_pix = interp1d(lam, dlam_grad(x_mm, y_mm) * pixsize,
fill_value="extrapolate")
self._set_dispersion(wave_min, wave_max, pixsize=pixsize)
try:
xilam = XiLamImage(fov, self.dlam_per_pix)
self._xilamimg = xilam # ..todo: remove or make available with a debug flag?
Expand Down Expand Up @@ -284,6 +273,125 @@ def map_spectra_to_focal_plane(self, fov):
image_hdu = fits.ImageHDU(header=img_header, data=image)
return image_hdu

def rectify(self, hdulist, interps=None, wcs=None, **kwargs):
"""Create 2D spectrum for a trace
Parameters
----------
hdulist : HDUList
The result of scopesim readout
interps : list of interpolation functions
If provided, there must be one for each image extension in `hdulist`.
The functions go from pixels to the images and can be created with,
e.g., RectBivariateSpline.
wcs : The WCS describing the rectified XiLamImage. This can be created
in a simple way from the fov included in the `OpticalTrain` used in
the simulation run producing `hdulist`.
The WCS can also be set up via the following keywords:
bin_width : float [um]
The spectral bin width. This is best computed automatically from the
spectral dispersion of the trace.
wave_min, wave_max : float [um]
Limits of the wavelength range to extract. The default is the
the full range on which the `SpectralTrace` is defined. This may
extend significantly beyond the filter window.
xi_min, xi_max : float [arcsec]
Spatial limits of the slit on the sky. This should be taken from
the header of the hdulist, but this is not yet provided by scopesim
"""
logging.info("Rectifying %s", self.trace_id)

wave_min = kwargs.get("wave_min",
self.wave_min)
wave_max = kwargs.get("wave_max",
self.wave_max)
if wave_max < self.wave_min or wave_min > self.wave_max:
logging.info(" Outside filter range")
return None
wave_min = max(wave_min, self.wave_min)
wave_max = min(wave_max, self.wave_max)
logging.info(" %.02f .. %.02f um", wave_min, wave_max)

# bin_width is taken as the minimum dispersion of the trace
bin_width = kwargs.get("bin_width", None)
if bin_width is None:
self._set_dispersion(wave_min, wave_max)
bin_width = np.abs(self.dlam_per_pix.y).min()
logging.info(" Bin width %.02g um", bin_width)

pixscale = from_currsys(self.meta['pixel_scale'])

# Temporary solution to get slit length
xi_min = kwargs.get("xi_min", None)
if xi_min is None:
try:
xi_min = hdulist[0].header["HIERARCH INS SLIT XIMIN"]
except KeyError:
logging.error("xi_min not found")
return None
xi_max = kwargs.get("xi_max", None)
if xi_max is None:
try:
xi_max = hdulist[0].header["HIERARCH INS SLIT XIMAX"]
except KeyError:
logging.error("xi_max not found")
return None

if wcs is None:
wcs = WCS(naxis=2)
wcs.wcs.ctype = ['WAVE', 'LINEAR']
wcs.wcs.cunit = ['um', 'arcsec']
wcs.wcs.crpix = [1, 1]
wcs.wcs.cdelt = [bin_width, pixscale] # PIXSCALE

# crval set to wave_min to catch explicitely set value
wcs.wcs.crval = [wave_min, xi_min] # XIMIN

nlam = int((wave_max - wave_min) / bin_width) + 1
nxi = int((xi_max - xi_min) / pixscale) + 1

# Create interpolation functions if not provided
if interps is None:
logging.info("Computing image interpolations")
interps = make_image_interpolations(hdulist, kx=1, ky=1)

# Create Xi, Lam images (do I need Iarr and Jarr or can I build Xi, Lam directly?)
Iarr, Jarr = np.meshgrid(np.arange(nlam, dtype=np.float32),
np.arange(nxi, dtype=np.float32))
Lam, Xi = wcs.all_pix2world(Iarr, Jarr, 0)

# Make sure that we do have microns
Lam = Lam * u.Unit(wcs.wcs.cunit[0]).to(u.um)

# Convert Xi, Lam to focal plane units
Xarr = self.xilam2x(Xi, Lam)
Yarr = self.xilam2y(Xi, Lam)

rect_spec = np.zeros_like(Xarr, dtype=np.float32)

ihdu = 0
for hdu in hdulist:
if not isinstance(hdu, fits.ImageHDU):
continue

wcs_fp = WCS(hdu.header, key="D")
n_x = hdu.header['NAXIS1']
n_y = hdu.header['NAXIS2']
iarr, jarr = wcs_fp.all_world2pix(Xarr, Yarr, 0)
mask = (iarr > 0) * (iarr < n_x) * (jarr > 0) * (jarr < n_y)
if np.any(mask):
specpart = interps[ihdu](jarr, iarr, grid=False)
rect_spec += specpart * mask

ihdu += 1

header = wcs.to_header()
header['EXTNAME'] = self.trace_id
return fits.ImageHDU(data=rect_spec, header=header)


def footprint(self, wave_min=None, wave_max=None, xi_min=None, xi_max=None):
"""
Return corners of rectangle enclosing spectral trace
Expand Down Expand Up @@ -411,6 +519,31 @@ def plot(self, wave_min=None, wave_max=None, c="r"):

plt.gca().set_aspect("equal")

@property
def trace_id(self):
"""Return the name of the trace"""
return self.meta['trace_id']

def _set_dispersion(self, wave_min, wave_max, pixsize=None):
"""Computation of dispersion dlam_per_pix along xi=0
"""
#..todo: This may have to be generalised - xi=0 is at the centre
#of METIS slits and the short MICADO slit.

xi = np.array([0] * 1001)
lam = np.linspace(wave_min, wave_max, 1001)
x_mm = self.xilam2x(xi, lam)
y_mm = self.xilam2y(xi, lam)
if self.dispersion_axis == "x":
dlam_grad = self.xy2lam.gradient()[0] # dlam_by_dx
else:
dlam_grad = self.xy2lam.gradient()[1] # dlam_by_dy
pixsize = (from_currsys(self.meta['pixel_scale']) /
from_currsys(self.meta['plate_scale']))
self.dlam_per_pix = interp1d(lam,
dlam_grad(x_mm, y_mm) * pixsize,
fill_value="extrapolate")

def __repr__(self):
msg = (f"<SpectralTrace> \"{self.meta['trace_id']}\" : "
f"[{self.wave_min:.4f}, {self.wave_max:.4f}]um : "
Expand Down Expand Up @@ -748,6 +881,20 @@ def _xiy2xlam_fit(layout, params):
xiy2lam = fitter(pinit_lam, xi_arr, y_arr, lam_arr)
return xiy2x, xiy2lam

def make_image_interpolations(hdulist, **kwargs):
"""
Create 2D interpolation functions for images
"""
interps = []
for hdu in hdulist:
if isinstance(hdu, fits.ImageHDU):
interps.append(
RectBivariateSpline(np.arange(hdu.header['NAXIS1']),
np.arange(hdu.header['NAXIS2']),
hdu.data, **kwargs)
)
return interps


# ..todo: Check whether the following functions are actually used
def rolling_median(x, n):
Expand Down
12 changes: 12 additions & 0 deletions scopesim/tests/tests_effects/test_SpectralTraceList.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,15 @@ def test_setitem_appends_correctly(self, full_trace_list):
spt = tlo.trace_1()
slist["New trace"] = spt
assert len(slist.spectral_traces) == n_trace + 1


@pytest.fixture(name="spectral_trace_list", scope="class")
def fixture_spectral_trace_list():
"""Instantiate a SpectralTraceList"""
return SpectralTraceList(hdulist=tlo.make_trace_hdulist())

class TestRectification:
def test_rectify_cube_not_implemented(self, spectral_trace_list):
hdulist = fits.HDUList()
with pytest.raises(NotImplementedError):
spectral_trace_list.rectify_cube(hdulist)

0 comments on commit fba76d1

Please sign in to comment.