diff --git a/.coveragerc b/.coveragerc index 39df3f597a..1d76471e7c 100644 --- a/.coveragerc +++ b/.coveragerc @@ -5,6 +5,7 @@ parallel = true omit = */tests/* */docs/* + */data/* setup.py source = pyuvdata @@ -13,6 +14,7 @@ source = omit = */tests/* */docs/* + */data/* setup.py show_missing = true diff --git a/CHANGELOG.md b/CHANGELOG.md index 9476c8c729..5d8f657b82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,13 @@ All notable changes to this project will be documented in this file. ## [Unreleased] ### Added -- New optional spatial interpolation method, ``interpolation_function="az_za_map_coordinates"`` that improves the linear -interpolation speed for data in ``az_za`` coordinates. +- New analytic beam classes: AiryBeam, GaussianBeam, ShortDipoleBeam, UniformBeam +that support evaluating either the efield or power beam in any direction and frequency. +- A new BeamInterface class that provides a unified interface to UVBeam and analytic +beam objects to get beam responses in any direction and frequency (via +interpolation or evaluation as appropriate). +- New optional spatial interpolation method, ``interpolation_function="az_za_map_coordinates"`` +that improves the linear interpolation speed for data in ``az_za`` coordinates. - New UVParameter `pol_convention` on `UVData` and `UVCal`. This specifies the convention assumed for converting linear to stokes polarizations -- either "sum" or "avg". Also added to `uvcalibrate` to apply from the `UVCal` to the `UVData`. diff --git a/docs/Images/airy_beam.png b/docs/Images/airy_beam.png new file mode 100644 index 0000000000..c4a3d7e310 Binary files /dev/null and b/docs/Images/airy_beam.png differ diff --git a/docs/Images/dipole_mwa_power.png b/docs/Images/dipole_mwa_power.png new file mode 100644 index 0000000000..fdf3d72714 Binary files /dev/null and b/docs/Images/dipole_mwa_power.png differ diff --git a/docs/Images/short_dipole_beam.png b/docs/Images/short_dipole_beam.png new file mode 100644 index 0000000000..cd7e9fc06d Binary files /dev/null and b/docs/Images/short_dipole_beam.png differ diff --git a/docs/analytic_beam_tutorial.rst b/docs/analytic_beam_tutorial.rst new file mode 100644 index 0000000000..1e7cf0c9f5 --- /dev/null +++ b/docs/analytic_beam_tutorial.rst @@ -0,0 +1,837 @@ +.. _analytic_beam_tutorial: + +-------------- +Analytic Beams +-------------- + +The analytic beams defined in pyuvdata are based on a base class, +:class:`pyuvdata.analytic_beam.AnalyticBeam`, which ensures a standard interface +and can be used to define other analytic beams in a consistent way. + +Evaluating analytic beams +------------------------- + +To evaluate an analytic beam at one or more frequencies and in in one or more +directions, use either the :meth:`pyuvdata.analytic_beam.AnalyticBeam.efield_eval` +or :meth:`pyuvdata.analytic_beam.AnalyticBeam.power_eval` methods as appropriate. + +Evaluating an Airy Beam power response +************************************** + +This code evaluates and plots an Airy beam power response. Note that we exclude +the cross polarizations, since this is an unpolarized beam, the cross polarizations +are identical to the auto polarization power beams. If the cross polarizations +are included, the array returned from the ``power_eval`` method will be complex. + +.. code-block:: python + + >>> import matplotlib.pyplot as plt + >>> import numpy as np + >>> from matplotlib.colors import LogNorm + + >>> from pyuvdata import AiryBeam + + >>> # Create an AiryBeam with a diameter of 14.5 meters + >>> airy_beam = AiryBeam(diameter=14.5, include_cross_pols=False) + + >>> # set up zenith angle, azimuth and frequency arrays to evaluate with + >>> # make a regular grid in direction cosines for nice plots + >>> n_vals = 100 + >>> zmax = np.radians(90) # Degrees + >>> axis_arr = np.arange(-n_vals/2., n_vals/2.) / float(n_vals/2.) + >>> l_arr, m_arr = np.meshgrid(axis_arr, axis_arr) + >>> radius = np.sqrt(l_arr**2 + m_arr**2) + >>> za_array = radius * zmax + >>> az_array = np.arctan2(m_arr, l_arr) + + >>> az_array = az_array.flatten() + >>> za_array = za_array.flatten() + + >>> Nfreqs = 11 + >>> freqs = np.linspace(100, 200, 11) * 1e6 + + >>> # find the values above the horizon so we don't evaluate beyond the horizon + >>> above_hor = np.nonzero(za_array <= np.pi / 2.)[0] + + >>> # set up an output array that matches the expected shape, except that it + >>> # includes the points beyond the horizon, and fill it with infinity. + >>> # Then we will set the points above the horizon to the output of power_eval. + >>> beam_vals = np.full((1, airy_beam.Npols, Nfreqs, n_vals * n_vals), np.inf, dtype=float) + + >>> beam_vals[:, :, :, above_hor] = airy_beam.power_eval( + ... az_array=az_array[above_hor], za_array=za_array[above_hor], freq_array=freqs + ... ) + + >>> beam_vals = np.reshape(beam_vals, (1, airy_beam.Npols, Nfreqs, n_vals, n_vals)) + + >>> fig, ax = plt.subplots(1, 2) + >>> bp_low = ax[0].imshow( + ... beam_vals[0,0,0], + ... norm=LogNorm(vmin = 1e-8, vmax =1), + ... extent=[np.min(l_arr), np.max(l_arr), np.min(m_arr), np.max(m_arr)], + ... ) + >>> _ = ax[0].set_title(f"Airy beam {freqs[0]*1e-6} MHz") + >>> _ = ax[0].set_xlabel("direction cosine l") + >>> _ = ax[0].set_ylabel("direction cosine m") + >>> _ = fig.colorbar(bp_low, ax=ax[0], fraction=0.046, pad=0.04) + + >>> bp_high = ax[1].imshow( + ... beam_vals[0,0,-1], + ... norm=LogNorm(vmin = 1e-8, vmax =1), + ... extent=[np.min(l_arr), np.max(l_arr), np.min(m_arr), np.max(m_arr)], + ... ) + >>> _ = ax[1].set_title(f"Airy beam {freqs[-1]*1e-6} MHz") + >>> _ = ax[1].set_xlabel("direction cosine l") + >>> _ = ax[1].set_ylabel("direction cosine m") + >>> _ = fig.colorbar(bp_high, ax=ax[1], fraction=0.046, pad=0.04) + >>> fig.tight_layout() + >>> plt.show() # doctest: +SKIP + >>> plt.savefig("Images/airy_beam.png", bbox_inches='tight') + >>> plt.clf() + +.. image:: Images/airy_beam.png + :width: 600 + + +Evaluating a Short Dipole Beam E-Field response +*********************************************** + +This code evaluates and plots a short (Herzian) dipole beam E-field response +(also called the Jones matrix). Since it is the E-Field response, we have 4 +effective maps because we have the response to each polarization basis vector +for each feed. In the case of a short dipole, these maps do not have an imaginary +part, but in general E-Field beams can be complex, so a complex array is returned. + +.. code-block:: python + + >>> import matplotlib.pyplot as plt + >>> import numpy as np + + >>> from pyuvdata import ShortDipoleBeam + + >>> # Create an ShortDipoleBeam + >>> dipole_beam = ShortDipoleBeam() + + >>> # set up zenith angle, azimuth and frequency arrays to evaluate with + >>> # make a regular grid in direction cosines for nice plots + >>> n_vals = 100 + >>> zmax = np.radians(90) # Degrees + >>> axis_arr = np.arange(-n_vals/2., n_vals/2.) / float(n_vals/2.) + >>> l_arr, m_arr = np.meshgrid(axis_arr, axis_arr) + >>> radius = np.sqrt(l_arr**2 + m_arr**2) + >>> za_array = radius * zmax + >>> az_array = np.arctan2(m_arr, l_arr) + + >>> az_array = az_array.flatten() + >>> za_array = za_array.flatten() + + >>> Nfreqs = 11 + >>> freqs = np.linspace(100, 200, 11) * 1e8 + + >>> # find the values above the horizon so we don't evaluate beyond the horizon + >>> above_hor = np.nonzero(za_array <= np.pi / 2.)[0] + + >>> # set up an output array that matches the expected shape except, that it + >>> # includes the points beyond the horizon, and fill it with infinity. + >>> # Then we will set the points above the horizon to the output of efield_eval. + >>> beam_vals = np.full((dipole_beam.Naxes_vec, dipole_beam.Nfeeds, Nfreqs, n_vals * n_vals), np.inf, dtype=complex) + + >>> beam_vals[:, :, :, above_hor] = dipole_beam.efield_eval( + ... az_array=az_array[above_hor], za_array=za_array[above_hor], freq_array=freqs + ... ) + + >>> beam_vals = np.reshape(beam_vals, (dipole_beam.Naxes_vec, dipole_beam.Nfeeds, Nfreqs, n_vals, n_vals)) + + >>> fig, ax = plt.subplots(2, 2) + + >>> be00 = ax[0,0].imshow(beam_vals[0,0,0].real, extent=[np.min(l_arr), np.max(l_arr), np.min(m_arr), np.max(m_arr)]) + >>> _ = ax[0,0].set_title("E/W dipole azimuth response") + >>> _ = ax[0,0].set_xlabel("direction cosine l") + >>> _ = ax[0,0].set_ylabel("direction cosine m") + >>> _ = fig.colorbar(be00, ax=ax[0,0]) + + >>> be10 = ax[1,0].imshow(beam_vals[1,0,0].real, extent=[np.min(l_arr), np.max(l_arr), np.min(m_arr), np.max(m_arr)]) + >>> _ = ax[1,0].set_title("E/W dipole zenith angle response") + >>> _ = ax[1,0].set_xlabel("direction cosine l") + >>> _ = ax[1,0].set_ylabel("direction cosine m") + >>> _ = fig.colorbar(be00, ax=ax[1,0]) + + >>> be01 = ax[0,1].imshow(beam_vals[0,1,0].real, extent=[np.min(l_arr), np.max(l_arr), np.min(m_arr), np.max(m_arr)]) + >>> _ = ax[0,1].set_title("N/S dipole azimuth response") + >>> _ = ax[0,1].set_xlabel("direction cosine l") + >>> _ = ax[0,1].set_ylabel("direction cosine m") + >>> _ = fig.colorbar(be00, ax=ax[0,1]) + + >>> be11 = ax[1,1].imshow(beam_vals[1,1,0].real, extent=[np.min(l_arr), np.max(l_arr), np.min(m_arr), np.max(m_arr)]) + >>> _ = ax[1,1].set_title("N/S dipole zenith angle response") + >>> _ = ax[1,1].set_xlabel("direction cosine l") + >>> _ = ax[1,1].set_ylabel("direction cosine m") + >>> _ = fig.colorbar(be00, ax=ax[1,1]) + + >>> fig.tight_layout() + >>> plt.show() # doctest: +SKIP + >>> plt.savefig("Images/short_dipole_beam.png", bbox_inches='tight') + >>> plt.clf() + +.. image:: Images/short_dipole_beam.png + :width: 600 + + +Defining new analytic beams +--------------------------- + +We have worked to make defining new analytic beams as straight forward as possible. +The new beam needs to inherit from either the :class:`pyuvdata.analytic_beam.AnalyticBeam`, +or the :class:`pyuvdata.analytic_beam.UnpolarizedAnalyticBeam`, which are base +classes that specify what needs to be defined on the new class. Unpolarized +beams (based on the ``UnpolarizedAnalyticBeam`` class) have fewer things that +need to be specified. + +Note that while unpolarized beams are simpler to define and think about, they +are quite unphysical and can have results that may be surprising to radio +astronomers. Since unpolarized feeds respond equally to all orientations of the +E-field, if two feeds are specified they will have cross-feed power responses that +are more similar to typical auto-feed power responses (and they will be identical +to auto-feed power responses if the two feeds have the same beam shapes). + +Setting parameters on the beam +****************************** + +If the new beam has any parameters that control the beam response (e.g. diameter), +The class must have an ``@dataclass`` decorator and the parameters must be listed +in the class definitions with type annotations and optionally defaults (these +are called ``fields`` in the dataclass, see the examples below and +`dataclass `_ for more details). + +If you need to do some manipulation or validation of the parameters after they +are specified by the user, you can use the ``validate`` method to do that +(under the hood the ``validate`` method is called by the base object's dataclass +``__post_init__`` method, so the ``validate`` method will always be called +when the class is instantiated). +The gaussian beam example below shows how this can be done. + +Polarized beams +*************** + +For polarized beams (based on the ``AnalyticBeam`` class), the following items +may be specified, the defaults on the ``AnalyticBeam`` class are noted: + + - ``feed_array``: This an array of feed strings (a list can also be passed, + it will be converted to an array). The default is ``["x", "y"]``. + This is a a dataclass field, so if it is specified, the class must have + ``@dataclass`` decorator and it should be specified with type annotations + and optionally a default (see examples below). + + - ``x_orientation``: For linear polarization feeds, this specifies what the + ``x`` feed polarization correspond to, allowed values are ``"east"`` or + ``"north"``, the default is ``"east"``. Should be set to ``None`` for + circularly polarized feeds. + This is a a dataclass field, so if it is specified, the class must have + ``@dataclass`` decorator and it should be specified with type annotations + and optionally a default (see examples below). + + - ``basis_vector_type``: This defines the coordinate system for the + polarization basis vectors, the default is ``"az_za"``. Currently only + ``"az_za"`` is supported, which specifies that there are 2 vector directions + (i.e. ``Naxes_vec`` is 2). + This should be defined as a class variable (see examples below). + +Defining the beam response +************************** + +At least one of the ``_efield_eval`` or ``_power_eval`` methods must be +defined to specify the response of the new beam. Defining ``_efield_eval`` is +the most general approach because it can represent complex and negative going +E-field beams (if only ``_efield_eval`` defined, power beams will be calculated +from the E-field beams). If only ``_power_eval`` is defined, the E-field beam is +defined as the square root of the auto polarization power beam, so the E-field +beam will be real and positive definite. Both methods can be specified, which +may allow for computational efficiencies in some cases. + +The inputs to the ``_efield_eval`` and ``_power_eval`` methods are the same and +give the directions (azimuth and zenith angle) and frequencies to evaluate the +beam. All three inputs must be two-dimensional with the first axis having the +length of the number of frequencies and the second axis having the having the +length of the number of directions (these are essentially the output of an +``np.meshgrid`` on the direction and frequency vectors). The inputs are: + + - ``az_grid``: an array of azimuthal values in radians for the directions + to evaluate the beam. Shape: (number of frequencies, number of directions) + - ``za_array``: an array of zenith angle values in radians for the directions + to evaluate the beam. Shape: (number of frequencies, number of directions) + - ``freq_array``: an array of frequencies in Hz at which to evaluate the beam. + Shape: (number of frequencies, number of directions) + +The ``_efield_eval`` and ``_power_eval`` methods must return arrays with the beam +response. The shapes and types of the returned arrays are: + + - _efield_eval: a complex array of beam responses with shape: + (``Naxes_vec``, ``Nfeeds``, ``freq_array.size``, ``az_array.size``). + ``Naxes_vec`` is 2 for the ``"az_za"`` basis, and ``Nfeeds`` is typically 2. + + - ``_power_eval``: an array with shape: (1, ``Npols``, ``freq_array.size``, + ``az_array.size``). ``Npols`` is equal to either ``Nfeeds`` squared if + ``include_cross_pols`` was set to True (the default) when the beam was + instantiated or ``Nfeeds`` if ``include_cross_pols`` was set to False. The + array should be real if ``include_cross_pols`` was set to False and it can + be complex if ``include_cross_pols`` was set to True (it will be cast to + complex when it is called via the ``power_eval`` method on the base class). + + +Below we provide some examples of beams defined in pyuvdata to make this more +concrete. + +Example: Defining simple unpolarized beams +****************************************** + +Airy beams are unpolarized but frequency dependent and require one parameter, +the dish diameter in meters. Since the Airy beam E-field response goes negative, +the ``_efield_eval`` method is specified in this beam. + +.. code-block:: python + :linenos: + + from dataclasses import dataclass + + import numpy as np + import numpy.typing as npt + from astropy.constants import c as speed_of_light + from scipy.special import j1 + from pyuvdata.analytic_beam import UnpolarizedAnalyticBeam + + + @dataclass(kw_only=True) + class AiryBeam(UnpolarizedAnalyticBeam): + """ + A zenith pointed Airy beam. + + Airy beams are the diffraction pattern of a circular aperture, so represent + an idealized dish. Requires a dish diameter in meters and is inherently + chromatic and unpolarized. + + The unpolarized nature leads to some results that may be surprising to radio + astronomers: if two feeds are specified they will have identical responses + and the cross power beam between the two feeds will be identical to the + power beam for a single feed. + + Attributes + ---------- + diameter : float + Dish diameter in meters. + + Parameters + ---------- + diameter : float + Dish diameter in meters. + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) for + the power beam. + + """ + + diameter: float + + def _efield_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the efield at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape) + + kvals = (2.0 * np.pi) * f_grid / speed_of_light.to("m/s").value + xvals = (self.diameter / 2.0) * np.sin(za_grid) * kvals + values = np.zeros_like(xvals) + nz = xvals != 0.0 + ze = xvals == 0.0 + values[nz] = 2.0 * j1(xvals[nz]) / xvals[nz] + values[ze] = 1.0 + + for fn in np.arange(self.Nfeeds): + data_array[0, fn, :, :] = values / np.sqrt(2.0) + data_array[1, fn, :, :] = values / np.sqrt(2.0) + + return data_array + +Below we show how to define a cosine shaped beam with a single width parameter, +which can be defined with just the ``_power_eval`` method. + +.. code-block:: python + :linenos: + + from dataclasses import dataclass + + import numpy as np + import numpy.typing as npt + from pyuvdata.analytic_beam import UnpolarizedAnalyticBeam + + @dataclass(kw_only=True) + class CosBeam(UnpolarizedAnalyticBeam): + """ + A variable-width zenith pointed cosine beam. + + Attributes + ---------- + width : float + Width parameter, E-field goes like a cosine of width * zenith angle, + power goes like the same cosine squared. + + Parameters + ---------- + width : float + Width parameter, E-field goes like a cosine of width * zenith angle, + power goes like the same cosine squared. + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) for + the power beam. + + """ + + width: float + + def _power_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the power at the given coordinates.""" + + data_array = self._get_empty_data_array(az_grid.shape, beam_type="power") + + for pol_i in np.arange(self.Npols): + data_array[0, pol_i, :, :] = np.cos(self.width * za_grid) ** 2 + + return data_array + +Defining a cosine beam with no free parameters is even simpler: + +.. code-block:: python + :linenos: + + import numpy as np + import numpy.typing as npt + from pyuvdata.analytic_beam import UnpolarizedAnalyticBeam + + class CosBeam(UnpolarizedAnalyticBeam): + """ + A zenith pointed cosine beam. + + Parameters + ---------- + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) for + the power beam. + + """ + + def _power_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the power at the given coordinates.""" + + data_array = self._get_empty_data_array(az_grid.shape, beam_type="power") + + for pol_i in np.arange(self.Npols): + data_array[0, pol_i, :, :] = np.cos(za_grid) ** 2 + + return data_array + + +Example: Defining a simple polarized beam +***************************************** + +Short (Hertzian) dipole beams are polarized but frequency independent and do not +require any extra parameters. We just inherit the default values of ``feed_array`` +and ``x_orientation`` from the ``AnalyticBeam`` class, so do not list them here. + +Note that we define both the ``_efield_eval`` and ``_power_eval`` methods because +we can use some trig identities to reduce the number of cos/sin evaluations for +the power calculation, but it would give the same results if the ``_power_eval`` +method was not defined (we have tests verifying this). + +.. code-block:: python + :linenos: + + import numpy as np + import numpy.typing as npt + from pyuvdata.analytic_beam import AnalyticBeam + + + class ShortDipoleBeam(AnalyticBeam): + """ + A zenith pointed analytic short dipole beam with two crossed feeds. + + A classical short (Hertzian) dipole beam with two crossed feeds aligned east + and north. Short dipole beams are intrinsically polarized but achromatic. + Does not require any parameters, but the orientation of the dipole labelled + as "x" can be specified to align "north" or "east" via the x_orientation + parameter (matching the parameter of the same name on UVBeam and UVData + objects). + + Attributes + ---------- + feed_array : list of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east"). + x_orientation : str + The orientation of the dipole labeled 'x'. The default ("east") means + that the x dipole is aligned east-west and that the y dipole is aligned + north-south. + + Parameters + ---------- + feed_array : list of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east"). + x_orientation : str + The orientation of the dipole labeled 'x'. The default ("east") means + that the x dipole is aligned east-west and that the y dipole is aligned + north-south. + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) + for the power beam. + + """ + + basis_vector_type = "az_za" + + def _efield_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the efield at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape) + + # The first dimension is for [azimuth, zenith angle] in that order + # the second dimension is for feed [e, n] in that order + data_array[0, self.east_ind] = -np.sin(az_grid) + data_array[0, self.north_ind] = np.cos(az_grid) + data_array[1, self.east_ind] = np.cos(za_grid) * np.cos(az_grid) + data_array[1, self.north_ind] = np.cos(za_grid) * np.sin(az_grid) + + return data_array + + def _power_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the power at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape, beam_type="power") + + # these are just the sum in quadrature of the efield components. + # some trig work is done to reduce the number of cos/sin evaluations + data_array[0, 0] = 1 - (np.sin(za_grid) * np.cos(az_grid)) ** 2 + data_array[0, 1] = 1 - (np.sin(za_grid) * np.sin(az_grid)) ** 2 + + if self.Npols > self.Nfeeds: + # cross pols are included + data_array[0, 2] = -(np.sin(za_grid) ** 2) * np.sin(2.0 * az_grid) / 2.0 + data_array[0, 3] = data_array[0, 2] + + return data_array + +If we wanted to specify the default feed_array to be ``["e", "n"]`` and that the +default x_orientation was ``"north"`` we would define it as shown below. We +handle the defaulting of the feed_array in the ``validate`` because dataclass +fields cannot have mutable defaults. We also do some other validation in that method. + +.. code-block:: python + :linenos: + + from typing import Literal + from dataclasses import dataclass + + import numpy as np + import numpy.typing as npt + from pyuvdata.analytic_beam import AnalyticBeam + + @dataclass(kw_only=True) + class ShortDipoleBeam(AnalyticBeam): + """ + A zenith pointed analytic short dipole beam with two crossed feeds. + + A classical short (Hertzian) dipole beam with two crossed feeds aligned east + and north. Short dipole beams are intrinsically polarized but achromatic. + Does not require any parameters, but the orientation of the dipole labelled + as "x" can be specified to align "north" or "east" via the x_orientation + parameter (matching the parameter of the same name on UVBeam and UVData + objects). + + Attributes + ---------- + feed_array : list of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east"). + x_orientation : str + The orientation of the dipole labeled 'x'. The default ("east") means + that the x dipole is aligned east-west and that the y dipole is aligned + north-south. + + Parameters + ---------- + feed_array : list of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east"). + x_orientation : str + The orientation of the dipole labeled 'x'. The default ("east") means + that the x dipole is aligned east-west and that the y dipole is aligned + north-south. + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) + for the power beam. + + """ + + feed_array: npt.NDArray[str] | list[str] | None = None + x_orientation: Literal["east", "north"] | None = "north" + + basis_vector_type = "az_za" + + def validate(self): + """Post-initialization validation and conversions.""" + if self.feed_array is None: + self.feed_array = ["e", "n"] + + allowed_feeds = ["n", "e", "x", "y"] + for feed in self.feed_array: + if feed not in allowed_feeds: + raise ValueError( + f"Feeds must be one of: {allowed_feeds}, " + f"got feeds: {self.feed_array}" + ) + + def _efield_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the efield at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape) + + # The first dimension is for [azimuth, zenith angle] in that order + # the second dimension is for feed [e, n] in that order + data_array[0, self.east_ind] = -np.sin(az_grid) + data_array[0, self.north_ind] = np.cos(az_grid) + data_array[1, self.east_ind] = np.cos(za_grid) * np.cos(az_grid) + data_array[1, self.north_ind] = np.cos(za_grid) * np.sin(az_grid) + + return data_array + + def _power_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the power at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape, beam_type="power") + + # these are just the sum in quadrature of the efield components. + # some trig work is done to reduce the number of cos/sin evaluations + data_array[0, 0] = 1 - (np.sin(za_grid) * np.cos(az_grid)) ** 2 + data_array[0, 1] = 1 - (np.sin(za_grid) * np.sin(az_grid)) ** 2 + + if self.Npols > self.Nfeeds: + # cross pols are included + data_array[0, 2] = -(np.sin(za_grid) ** 2) * np.sin(2.0 * az_grid) / 2.0 + data_array[0, 3] = data_array[0, 2] + + return data_array + + + +Example: Defining a beam with post init validation +************************************************** + +The gaussian beam defined in pyuvdata is an unpolarized beam that has several +optional configurations that require some validation, which we do using the +``validate`` method. + +.. code-block:: python + :linenos: + + from dataclasses import dataclass + from typing import Literal + + import numpy as np + import numpy.typing as npt + from astropy.constants import c as speed_of_light + from pyuvdata.analytic_beam import UnpolarizedAnalyticBeam + + def diameter_to_sigma(diameter: float, freq_array: npt.NDArray[float]) -> float: + """ + Find the sigma that gives a beam width similar to an Airy disk. + + Find the stddev of a gaussian with fwhm equal to that of + an Airy disk's main lobe for a given diameter. + + Parameters + ---------- + diameter : float + Antenna diameter in meters + freq_array : array of float + Frequencies in Hz + + Returns + ------- + sigma : float + The standard deviation in zenith angle radians for a Gaussian beam + with FWHM equal to that of an Airy disk's main lobe for an aperture + with the given diameter. + + """ + wavelengths = speed_of_light.to("m/s").value / freq_array + + scalar = 2.2150894 # Found by fitting a Gaussian to an Airy disk function + + sigma = np.arcsin(scalar * wavelengths / (np.pi * diameter)) * 2 / 2.355 + + return sigma + + + @dataclass(kw_only=True) + class GaussianBeam(UnpolarizedAnalyticBeam): + """ + A circular, zenith pointed Gaussian beam. + + Requires either a dish diameter in meters or a standard deviation sigma in + radians. Gaussian beams specified by a diameter will have their width + matched to an Airy beam at each simulated frequency, so are inherently + chromatic. For Gaussian beams specified with sigma, the sigma_type defines + whether the width specified by sigma specifies the width of the E-Field beam + (default) or power beam in zenith angle. If only sigma is specified, the + beam is achromatic, optionally both the spectral_index and reference_frequency + parameters can be set to generate a chromatic beam with standard deviation + defined by a power law: + + stddev(f) = sigma * (f/ref_freq)**(spectral_index) + + Attributes + ---------- + sigma : float + Standard deviation in radians for the gaussian beam. Only one of sigma + and diameter should be set. + sigma_type : str + Either "efield" or "power" to indicate whether the sigma specifies the size of + the efield or power beam. Ignored if `sigma` is None. + diameter : float + Dish diameter in meters to use to define the size of the gaussian beam, by + matching the FWHM of the gaussian to the FWHM of an Airy disk. This will result + in a frequency dependent beam. Only one of sigma and diameter should be set. + spectral_index : float + Option to scale the gaussian beam width as a power law with frequency. If set + to anything other than zero, the beam will be frequency dependent and the + `reference_frequency` must be set. Ignored if `sigma` is None. + reference_frequency : float + The reference frequency for the beam width power law, required if `sigma` is not + None and `spectral_index` is not zero. Ignored if `sigma` is None. + + Parameters + ---------- + sigma : float + Standard deviation in radians for the gaussian beam. Only one of sigma + and diameter should be set. + sigma_type : str + Either "efield" or "power" to indicate whether the sigma specifies the size of + the efield or power beam. Ignored if `sigma` is None. + diameter : float + Dish diameter in meters to use to define the size of the gaussian beam, by + matching the FWHM of the gaussian to the FWHM of an Airy disk. This will result + in a frequency dependent beam. Only one of sigma and diameter should be set. + spectral_index : float + Option to scale the gaussian beam width as a power law with frequency. If set + to anything other than zero, the beam will be frequency dependent and the + `reference_frequency` must be set. Ignored if `sigma` is None. + reference_frequency : float + The reference frequency for the beam width power law, required if `sigma` is not + None and `spectral_index` is not zero. Ignored if `sigma` is None. + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) for + the power beam. + + """ + + sigma: float | None = None + sigma_type: Literal["efield", "power"] = "efield" + diameter: float | None = None + spectral_index: float = 0.0 + reference_frequency: float = None + + def validate(self): + """Post-initialization validation and conversions.""" + if (self.diameter is None and self.sigma is None) or ( + self.diameter is not None and self.sigma is not None + ): + if self.diameter is None: + raise ValueError("Either diameter or sigma must be set.") + else: + raise ValueError("Only one of diameter or sigma can be set.") + + if self.sigma is not None: + if self.sigma_type not in ["efield", "power"]: + raise ValueError("sigma_type must be 'efield' or 'power'.") + + if self.sigma_type == "power": + self.sigma = np.sqrt(2) * self.sigma + + if self.spectral_index != 0.0 and self.reference_frequency is None: + raise ValueError( + "reference_frequency must be set if `spectral_index` is not zero." + ) + if self.reference_frequency is None: + self.reference_frequency = 1.0 + + def get_sigmas(self, freq_array: npt.NDArray[float]) -> npt.NDArray[float]: + """ + Get the sigmas for the gaussian beam using the diameter (if defined). + + Parameters + ---------- + freq_array : array of floats + Frequency values to get the sigmas for in Hertz. + + Returns + ------- + sigmas : array_like of float + Beam sigma values as a function of frequency. Size will match the + freq_array size. + + """ + if self.diameter is not None: + sigmas = diameter_to_sigma(self.diameter, freq_array) + elif self.sigma is not None: + sigmas = ( + self.sigma + * (freq_array / self.reference_frequency) ** self.spectral_index + ) + return sigmas + + def _power_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the power at the given coordinates.""" + sigmas = self.get_sigmas(f_grid) + + values = np.exp(-(za_grid ** 2) / (sigmas ** 2)) + data_array = self._get_empty_data_array(az_grid.shape, beam_type="power") + for fn in np.arange(self.Npols): + data_array[0, fn, :, :] = values + + return data_array diff --git a/docs/analytic_beams.rst b/docs/analytic_beams.rst new file mode 100644 index 0000000000..f33967a5d1 --- /dev/null +++ b/docs/analytic_beams.rst @@ -0,0 +1,78 @@ +Analytic Beams +============== + +pyuvdata defines several analytic primary beams for radio telescopes. While these +are not realistic models for true antennas (like those represented in +:class:`pyuvdata.UVBeam`), they can be useful in simulation because they are +lightweight and fast to evaluate (as opposed to having to interpolate). + +The analytic beams defined in pyuvdata are based on a base class, +:class:`pyuvdata.analytic_beam.AnalyticBeam`, which ensures a standard interface +and can be used to define other analytic beams in a consistent way (see the +:ref:`analytic beam tutorial `). To evaluate analytic +beams in particular directions at particular frequencies, use the +:meth:`pyuvdata.analytic_beam.AnalyticBeam.efield_eval` +or :meth:`pyuvdata.analytic_beam.AnalyticBeam.power_eval` methods as appropriate. + +The ``AnalyticBeam`` base class also provides a yaml constructor that can enable +analytic beams to be instantiated directly from yaml files (see +:ref:`yaml_constructors`, similar constructors are also available for UVBeam +objects) and a plugin infrastructure that can automatically include any imported +subclass even if they are defined in other packages. This infrastructure, along +with the :class:`pyuvdata.BeamInterface` class, can simplify the setup for +simulations. + +.. autoclass:: pyuvdata.analytic_beam.AnalyticBeam + :members: + + +.. autoclass:: pyuvdata.AiryBeam + :members: + +.. autoclass:: pyuvdata.GaussianBeam + :members: + +.. autoclass:: pyuvdata.ShortDipoleBeam + :members: + +.. autoclass:: pyuvdata.UniformBeam + :members: + + +.. _yaml_constructors: + +yaml constructors +----------------- + +Analytic beams can be instantiated directly from yaml files using the +``!AnalyticBeam`` tag. The ``class`` parameter must be specified and it can be +set to one of the pyuvdata provided analytic beams or to any AnalyticBeam +subclass. If the subclass is not defined in pyuvdata, either the subclass must +already be imported or it must be specified with the dot-pathed modules included +(i.e. ``my_module.MyAnalyticBeam``). Some analytic beams have required parameters +(e.g. ``diameter`` for AiryBeams), which must also be provided, see the object +definitions for details. + +Some examples are provided below, note that the node key can be anything, it +does not need to be ``beam``: + +A 16 meter diameter Airy beam:: + + beam: !AnalyticBeam + class: AiryBeam + diameter: 16 + +A classical short dipole beam (the dot-pathed module notation is not required +for pyvudata beams but is shown here as an example):: + + beam: !AnalyticBeam + class: pyuvdata.ShortDipoleBeam + +A gaussian shaped beam with an E-Field beam sigma of 0.26 radians that has +width that scales as a power law with frequency:: + + beam: !AnalyticBeam + class: GaussianBeam + reference_frequency: 120000000. + spectral_index: -1.5 + sigma: 0.26 diff --git a/docs/beam_interface.rst b/docs/beam_interface.rst new file mode 100644 index 0000000000..2efbe4b94e --- /dev/null +++ b/docs/beam_interface.rst @@ -0,0 +1,13 @@ +Beam Interface +============== + +The BeamInterface object is designed to provide a unified interface for UVBeam +and AnalyticBeam objects to compute beam response values. It can be constructed +with either a :class:`pyuvdata.UVBeam` or :class:`AnalyticBeam` and it provides +the :meth:`pyuvdata.BeamInterface.compute_response` method, which can be used to +either evaluate the response of an AnalyticBeam or interpolate a UVBeam to get +the beam response. This interface provides a simplified way for simulators to +get beam responses from either UVBeams or analytic beams with the same code. + +.. autoclass:: pyuvdata.BeamInterface + :members: diff --git a/docs/beam_interface_tutorial.rst b/docs/beam_interface_tutorial.rst new file mode 100644 index 0000000000..7788a5943d --- /dev/null +++ b/docs/beam_interface_tutorial.rst @@ -0,0 +1,103 @@ +-------------- +Beam Interface +-------------- + +The BeamInterface object is designed to provide a unified interface for UVBeam +and AnalyticBeam objects to compute beam response values. It can be constructed +with either a :class:`pyuvdata.UVBeam` or :class:`AnalyticBeam` and the beam +response can be calculated using the :meth:`pyuvdata.BeamInterface.compute_response` +method. + +Using BeamInterface +------------------- + +The following code shows how to set up two BeamInterface objects, one with an +analytic beam and one with a UVBeam. Then each is evalated at the same frequency +and directions using the same call to :meth:`pyuvdata.BeamInterface.compute_response` +and the results are plotted. The value of the BeamInterface object is that it +unifies the interface so the code calling it does not need to know if the beam +that is attached to it is an analytic beam or a UVBeam. + +.. code-block:: python + + >>> import os + >>> import matplotlib.pyplot as plt + >>> import numpy as np + >>> from matplotlib.colors import LogNorm + + >>> from pyuvdata import ShortDipoleBeam, BeamInterface, UVBeam + >>> from pyuvdata.data import DATA_PATH + + >>> filename = os.path.join(DATA_PATH, "mwa_full_EE_test.h5") + + >>> dipole_beam = BeamInterface(ShortDipoleBeam(), beam_type="power") + >>> mwa_beam = BeamInterface(UVBeam.from_file(filename, pixels_per_deg=1), beam_type="power") + + >>> # set up zenith angle, azimuth and frequency arrays to evaluate with + >>> # make a regular grid in direction cosines for nice plots + >>> n_vals = 100 + >>> zmax = np.radians(90) # Degrees + >>> axis_arr = np.arange(-n_vals/2., n_vals/2.) / float(n_vals/2.) + >>> l_arr, m_arr = np.meshgrid(axis_arr, axis_arr) + >>> radius = np.sqrt(l_arr**2 + m_arr**2) + >>> za_array = radius * zmax + >>> az_array = np.arctan2(m_arr, l_arr) + + >>> # Wrap the azimuth array to [0, 2pi] to match the extent of the UVBeam azimuth + >>> where_neg_az = np.nonzero(az_array < 0) + >>> az_array[where_neg_az] = az_array[where_neg_az] + np.pi * 2. + >>> az_array = az_array.flatten() + >>> za_array = za_array.flatten() + + >>> # find the values above the horizon so we don't try to interpolate the MWA beam + >>> # beyond the horizon + >>> above_hor = np.nonzero(za_array <= np.pi / 2.)[0] + + >>> # set up output arrays that matches the expected shape, except that they + >>> # include the points beyond the horizon, and fill them with infinity. + >>> # Then we will set the points above the horizon to the computed responses. + >>> dipole_beam_vals = np.full((1, 4, 1, n_vals * n_vals), np.inf, dtype=complex) + >>> mwa_beam_vals = np.full((1, 4, 1, n_vals * n_vals), np.inf, dtype=complex) + + >>> # The MWA beam we have in our test data is small, it only has 3 frequencies, + >>> # so we will just get the value at one of those frequencies rather than + >>> # trying to interpolate to a new frequency. + >>> freqs = np.array([mwa_beam.beam.freq_array[-1]]) + + >>> dipole_beam_vals[:, :, :, above_hor] = dipole_beam.compute_response( + ... az_array=az_array[above_hor], za_array=za_array[above_hor], freq_array=freqs + ... ) + >>> dipole_beam_vals = dipole_beam_vals.reshape(4, n_vals, n_vals) + + >>> mwa_beam_vals[:, :, :, above_hor] = mwa_beam.compute_response( + ... az_array=az_array[above_hor], za_array=za_array[above_hor], freq_array=freqs + ... ) + >>> mwa_beam_vals = mwa_beam_vals.reshape(4, n_vals, n_vals) + + >>> fig, ax = plt.subplots(1, 2) + >>> bp_dip = ax[0].imshow( + ... dipole_beam_vals[0].real, + ... norm=LogNorm(vmin = 1e-4, vmax =1), + ... extent=[np.min(l_arr), np.max(l_arr), np.min(m_arr), np.max(m_arr)], + ... ) + >>> _ = ax[0].set_title(f"E/W Dipole power beam") + >>> _ = ax[0].set_xlabel("direction cosine l") + >>> _ = ax[0].set_ylabel("direction cosine m") + >>> _ = fig.colorbar(bp_dip, ax=ax[0], fraction=0.046, pad=0.04) + + >>> bp_mwa = ax[1].imshow( + ... mwa_beam_vals[0].real, + ... norm=LogNorm(vmin = 1e-4, vmax =1), + ... extent=[np.min(l_arr), np.max(l_arr), np.min(m_arr), np.max(m_arr)], + ... ) + >>> _ = ax[1].set_title(f"MWA E/W power beam") + >>> _ = ax[1].set_xlabel("direction cosine l") + >>> _ = ax[1].set_ylabel("direction cosine m") + >>> _ = fig.colorbar(bp_mwa, ax=ax[1], fraction=0.046, pad=0.04) + >>> fig.tight_layout() + >>> plt.show() # doctest: +SKIP + >>> plt.savefig("Images/dipole_mwa_power.png", bbox_inches='tight') + >>> plt.clf() + +.. image:: Images/dipole_mwa_power.png + :width: 600 diff --git a/docs/cst_settings_yaml.rst b/docs/cst_settings_yaml.rst index 44811e79b5..0894363d51 100644 --- a/docs/cst_settings_yaml.rst +++ b/docs/cst_settings_yaml.rst @@ -7,8 +7,7 @@ via keywords when the files are read in, but it is better for the metadata to be specified once and carried with the data files. To that end, we developed a yaml settings file specification to carry all the metadata. This format is very human readable and writeable and we encourage using such a file as the best way to -ensure the metadata is preserved. Note that reading a yaml settings file into -UVBeam requires that pyyaml is installed. +ensure the metadata is preserved. Required Fields *************** diff --git a/docs/make_index.py b/docs/make_index.py index d089b7219f..0ca44337df 100644 --- a/docs/make_index.py +++ b/docs/make_index.py @@ -49,6 +49,8 @@ def write_index_rst(readme_file=None, write_file=None): " uvbeam\n" " uvflag\n" " telescope\n" + " analytic_beams\n" + " beam_interface\n" " fast_uvh5_meta\n" " fast_calh5_meta\n" " utility_functions\n" diff --git a/docs/make_uvbeam.py b/docs/make_uvbeam.py index 673a44f556..6f3d9e54ae 100644 --- a/docs/make_uvbeam.py +++ b/docs/make_uvbeam.py @@ -21,7 +21,9 @@ def write_uvbeam_rst(write_file=None): "transforming the data (interpolating/regridding, selecting, converting\n" "types) and can be interacted with directly.\n\n" "Note that there are some tricks that can help with reading in CST beam\n" - "simulation files in `CST Settings Files`_.\n\n" + "simulation files in `CST Settings Files`_.\n" + "UVBeam also provides a yaml constructor that can enable beams to be\n" + "instantiated directly from yaml files (see: `UVbeam yaml constructor`_)\n\n" "Attributes\n----------\n" "The attributes on UVBeam hold all of the metadata and data required to\n" "describe primary beam models. Under the hood, the attributes are implemented\n" @@ -67,6 +69,33 @@ def write_uvbeam_rst(write_file=None): out += "Methods\n-------\n.. autoclass:: pyuvdata.UVBeam\n :members:\n\n" + out += """ +UVBeam yaml constructor +----------------------- + +UVbeams can be instantiated directly from yaml files using the +``!UVBeam`` tag. The ``filename`` parameter must be specified and +any other parameter that can be passed to the :meth:`pyuvdata.UVBeam.read` +method can also be specified. + +Some examples are provided below, note that the node key can be anything, +it does not need to be ``beam``: + +A simple UVBeam specification:: + + beam: !UVBeam + filename: hera.beamfits + +An UVbeam specification with some keywords to pass to ``UVBeam.read``:: + + beam: !UVBeam + filename: mwa_full_EE_test.h5 + pixels_per_deg: 1 + freq_range: [100.e+6, 200.e+6] + +\n\n +""" + with open("cst_settings_yaml.rst", encoding="utf-8") as cst_settings_file: cst_setting_text = cst_settings_file.read() diff --git a/docs/tutorial.rst b/docs/tutorial.rst index 6ad4f72eea..a4c84bec2f 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -16,3 +16,5 @@ Tutorials are available for each major user class: uvdata_tutorial.rst uvcal_tutorial.rst uvbeam_tutorial.rst + analytic_beam_tutorial.rst + beam_interface_tutorial.rst diff --git a/docs/uvcal_tutorial.rst b/docs/uvcal_tutorial.rst index 83b61ae50e..96676d9d61 100644 --- a/docs/uvcal_tutorial.rst +++ b/docs/uvcal_tutorial.rst @@ -324,6 +324,7 @@ that should be noted: on the object itself). * Regardless of the value of ``undo``, the convention that is inferred for the calibration solutions is determined as follows: + * If neither ``uvc_pol_convention`` nor ``uvcal.pol_convention`` are specified, a a warning is raised (since the resulting calibrated data is not well-determined), and it is *assumed* that the solutions have the same convention as the ``UVData`` @@ -332,7 +333,9 @@ that should be noted: corrections are applied, and the result is ambiguous. * If both ``uvc_pol_convention`` and ``uvcal.pol_convention`` are specified and are different, an error is raised. + * When **calibrating** in :func:`pyuvdata.utils.uvcalibrate` (i.e. ``undo=False``): + * If ``uvdata.pol_convention`` is specified, an error is raised, because you are trying to calibrate already-calibrated data. * The convention applied to the resulting ``UVData`` object is inferred in the @@ -341,7 +344,9 @@ that should be noted: or ``uvcal.pol_convention``, see above), (iii) if still unspecified, no convention will be used and a warning will be raised. This was always the behaviour in earlier versions of ``pyuvdata`` (pre-v3). + * When **un-calibrating** with :func:`pyuvdata.utils.uvcalibrate` (i.e. ``undo=True``): + * If both ``uvd_pol_convention`` and ``uvdata.pol_convention`` are defined and are different, an error is raised. * If neither are set, a warning is raised, since the resulting un-calibrated values @@ -349,6 +354,7 @@ that should be noted: convention could have been used to calibrate originally than is being used to de-calibrate). However, calibration will continue, assuming that the ``UVData`` object has the same convention as the ``UVCal`` object used to de-calibrate. + * It is not supported to have ``pol_convention`` set on ``UVCal``, but *not* ``gain_scale``. A ``pol_convention`` only makes sense in the context of having a scale for the gains. diff --git a/src/pyuvdata/__init__.py b/src/pyuvdata/__init__.py index a70986faf5..626305a728 100644 --- a/src/pyuvdata/__init__.py +++ b/src/pyuvdata/__init__.py @@ -45,6 +45,8 @@ def branch_scheme(version): # pragma: nocover warnings.filterwarnings("ignore", message="numpy.dtype size changed") warnings.filterwarnings("ignore", message="numpy.ufunc size changed") +from .analytic_beam import AiryBeam, GaussianBeam, ShortDipoleBeam, UniformBeam # noqa +from .beam_interface import BeamInterface # noqa from .telescopes import Telescope # noqa from .telescopes import get_telescope # noqa # NB: get_telescopes is deprecated from .uvbeam import UVBeam # noqa @@ -59,6 +61,11 @@ def branch_scheme(version): # pragma: nocover "UVCal", "UVFlag", "UVBeam", + "BeamInterface", + "AiryBeam", + "GaussianBeam", + "ShortDipoleBeam", + "UniformBeam", "Telescope", "get_telescope", ] diff --git a/src/pyuvdata/analytic_beam.py b/src/pyuvdata/analytic_beam.py new file mode 100644 index 0000000000..85c6a476f1 --- /dev/null +++ b/src/pyuvdata/analytic_beam.py @@ -0,0 +1,1021 @@ +# Copyright (c) 2022 Radio Astronomy Software Group +# Licensed under the 2-clause BSD License + +"""Analytic beam class definitions.""" + +from __future__ import annotations + +import dataclasses +import importlib +import warnings +from dataclasses import InitVar, astuple, dataclass, field +from typing import ClassVar, Literal + +import numpy as np +import numpy.typing as npt +import yaml +from astropy.constants import c as speed_of_light +from scipy.special import j1 + +from . import utils +from .docstrings import combine_docstrings +from .uvbeam.uvbeam import UVBeam, _convert_feeds_to_pols + +__all__ = ["AnalyticBeam", "AiryBeam", "GaussianBeam", "ShortDipoleBeam", "UniformBeam"] + + +def array_safe_eq(a, b) -> bool: + """Check if a and b are equal, even if they are numpy arrays.""" + if a is b: + return True + if isinstance(a, np.ndarray) and isinstance(b, np.ndarray): + return a.shape == b.shape and (a == b).all() + try: + return a == b + except TypeError: # pragma: no cover + return NotImplemented + + +def dc_eq(dc1, dc2) -> bool: + """Check if two dataclasses which hold numpy arrays are equal.""" + if dc1 is dc2: + return True + if dc1.__class__ is not dc2.__class__: + return NotImplemented # better than False + t1 = astuple(dc1) + t2 = astuple(dc2) + return all(array_safe_eq(a1, a2) for a1, a2 in zip(t1, t2, strict=True)) + + +@dataclass(kw_only=True) +class AnalyticBeam: + """ + Analytic beam base class. + + Attributes + ---------- + feed_array : array-like of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east") + or r & l. + x_orientation : str + Physical orientation of the feed for the x feed. Not meaningful for + unpolarized analytic beams, but clarifies the orientation of the dipole + for polarized beams like the ShortDipoleBeam and matches with the meaning + on UVBeam objects. + + Parameters + ---------- + feed_array : array-like of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east") + or r & l. Default is ["x", "y"]. + x_orientation : str + Physical orientation of the feed for the x feed. Not meaningful for + unpolarized analytic beams, but clarifies the orientation of the dipole + for for polarized beams like the ShortDipoleBeam and matches with the + meaning on UVBeam objects + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) for + the power beam. + + """ + + feed_array: npt.ArrayLike[str] | None = None + x_orientation: Literal["east", "north"] | None = "east" + + include_cross_pols: InitVar[bool] = True + + basis_vector_type = None + + # In the future, this might allow for cartesian basis vectors in some orientation. + # In that case, the Naxes_vec would be 3 rather than 2 + _basis_vec_dict = {"az_za": 2} + + __types__: ClassVar[dict] = {} + + def __init_subclass__(cls): + """Initialize any imported subclass.""" + if ( + cls.__name__ != "UnpolarizedAnalyticBeam" + and not hasattr(cls, "_efield_eval") + and not hasattr(cls, "_power_eval") + ): + raise TypeError( + "Either _efield_eval or _power_eval method must be defined on " + f"{cls.__name__}. Defining _efield_eval is the most general " + "approach because it can represent polarized and negative going " + "beams. If only _power_eval is defined, the E-field beam is " + "defined as the square root of the auto pol power beam." + ) + + if cls.basis_vector_type is None: + warnings.warn( + "basis_vector_type was not defined, defaulting to azimuth and " + "zenith_angle." + ) + cls.basis_vector_type = "az_za" + + if cls.basis_vector_type not in cls._basis_vec_dict: + raise ValueError( + f"basis_vector_type for {cls.__name__} is {cls.basis_vector_type}, " + f"must be one of {list(cls._basis_vec_dict.keys())}" + ) + + cls.__types__[cls.__name__] = cls + + def validate(self): + """Validate inputs, placeholder for subclasses.""" + pass + + def __post_init__(self, include_cross_pols): + """ + Post-initialization validation and conversions. + + Parameters + ---------- + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) + for the power beam. + + """ + self.validate() + + if self.feed_array is not None: + allowed_feeds = ["n", "e", "x", "y", "r", "l"] + for feed in self.feed_array: + if feed not in allowed_feeds: + raise ValueError( + f"Feeds must be one of: {allowed_feeds}, " + f"got feeds: {self.feed_array}" + ) + self.feed_array = np.asarray(self.feed_array) + else: + self.feed_array = np.asarray(["x", "y"]) + + linear_pol = False + for feed_name in ["x", "y", "e", "n"]: + if feed_name in self.feed_array: + linear_pol = True + + if self.x_orientation is None and linear_pol: + raise ValueError( + "x_orientation must be specified for linear polarization feeds" + ) + + self.polarization_array, _ = _convert_feeds_to_pols( + self.feed_array, include_cross_pols, x_orientation=self.x_orientation + ) + + def __eq__(self, other): + """Define equality.""" + # have to define this because feed_array is a numpy array, so equality + # needs to be checked with all_close not `==` + return dc_eq(self, other) + + @property + def Naxes_vec(self): # noqa N802 + """The number of vector axes.""" + return self._basis_vec_dict[self.basis_vector_type] + + @property + def Nfeeds(self): # noqa N802 + """The number of feeds.""" + return len(self.feed_array) + + @property + def Npols(self): # noqa N802 + """The number of polarizations.""" + return self.polarization_array.size + + @property + def east_ind(self): + """The index of the east feed in the feed array.""" + if "e" in self.feed_array: + east_name = "e" + elif self.x_orientation == "east" and "x" in self.feed_array: + east_name = "x" + elif self.x_orientation == "north" and "y" in self.feed_array: + east_name = "y" + else: + # this is not a linearly polarized feed + return None + return np.nonzero(np.asarray(self.feed_array) == east_name)[0][0] + + @property + def north_ind(self): + """The index of the north feed in the feed array.""" + if "n" in self.feed_array: + north_name = "n" + elif self.x_orientation == "north" and "x" in self.feed_array: + north_name = "x" + elif self.x_orientation == "east" and "y" in self.feed_array: + north_name = "y" + else: + # this is not a linearly polarized feed + return None + return np.nonzero(np.asarray(self.feed_array) == north_name)[0][0] + + def _check_eval_inputs( + self, + *, + az_array: npt.NDArray[float], + za_array: npt.NDArray[float], + freq_array: npt.NDArray[float], + ): + """Check the inputs for the eval methods.""" + if az_array.ndim > 1 or za_array.ndim > 1 or freq_array.ndim > 1: + raise ValueError( + "az_array, za_array and freq_array must all be one dimensional." + ) + + if az_array.shape != za_array.shape: + raise ValueError("az_array and za_array must have the same shape.") + + def _get_empty_data_array( + self, grid_shape: tuple[int, int], beam_type: str = "efield" + ) -> npt.NDArray[float]: + """Get the empty data to fill in the eval methods.""" + if beam_type == "efield": + return np.zeros( + (self.Naxes_vec, self.Nfeeds, *grid_shape), dtype=np.complex128 + ) + else: + if self.Npols > self.Nfeeds: + # crosspols are included + dtype_use = np.complex128 + else: + dtype_use = np.float64 + return np.zeros((1, self.Npols, *grid_shape), dtype=dtype_use) + + def efield_eval( + self, + *, + az_array: npt.NDArray[float], + za_array: npt.NDArray[float], + freq_array: npt.NDArray[float], + ) -> npt.NDArray[float]: + """ + Evaluate the efield at the given coordinates. + + Parameters + ---------- + az_array : array_like of floats, optional + Azimuth values to evaluate the beam at in radians. Must be the same shape + as za_array. + za_array : array_like of floats, optional + Zenith values to evaluate the beam at in radians. Must be the same shape + as az_array. + freq_array : array_like of floats, optional + Frequency values to evaluate the beam at in Hertz. + + Returns + ------- + array_like of complex + An array of beam values. The shape of the evaluated data will be: + (Naxes_vec, Nfeeds, freq_array.size, az_array.size) + + """ + self._check_eval_inputs( + az_array=az_array, za_array=za_array, freq_array=freq_array + ) + + za_grid, _ = np.meshgrid(za_array, freq_array) + az_grid, f_grid = np.meshgrid(az_array, freq_array) + + if hasattr(self, "_efield_eval"): + return self._efield_eval( + az_grid=az_grid, za_grid=za_grid, f_grid=f_grid + ).astype(complex) + else: + # the polarization array always has the auto pols first, so we can just + # use the first Nfeed elements. + power_vals = self._power_eval( + az_grid=az_grid, za_grid=za_grid, f_grid=f_grid + )[0, 0 : self.Nfeeds].real + + data_array = self._get_empty_data_array(az_grid.shape) + + for fn in np.arange(self.Nfeeds): + data_array[0, fn, :, :] = np.sqrt(power_vals[fn] / 2.0) + data_array[1, fn, :, :] = np.sqrt(power_vals[fn] / 2.0) + + return data_array + + def power_eval( + self, + *, + az_array: npt.NDArray[float], + za_array: npt.NDArray[float], + freq_array: npt.NDArray[float], + ) -> npt.NDArray[float]: + """ + Evaluate the power at the given coordinates. + + Parameters + ---------- + az_array : array_like of floats, optional + Azimuth values to evaluate the beam at in radians. Must be the same shape + as za_array. + za_array : array_like of floats, optional + Zenith values to evaluate the beam at in radians. Must be the same shape + as az_array. + freq_array : array_like of floats, optional + Frequency values to evaluate the beam at in Hertz. + + Returns + ------- + array_like of float or complex + An array of beam values. The shape of the evaluated data will be: + (1, Npols, freq_array.size, az_array.size). The dtype will be + a complex type if cross-pols are included, otherwise it will be a + float type. + + """ + self._check_eval_inputs( + az_array=az_array, za_array=za_array, freq_array=freq_array + ) + + za_grid, _ = np.meshgrid(za_array, freq_array) + az_grid, f_grid = np.meshgrid(az_array, freq_array) + + if self.Npols > self.Nfeeds: + # cross pols are included + expected_type = complex + else: + expected_type = float + + if hasattr(self, "_power_eval"): + return self._power_eval( + az_grid=az_grid, za_grid=za_grid, f_grid=f_grid + ).astype(expected_type) + else: + efield_vals = self._efield_eval( + az_grid=az_grid, za_grid=za_grid, f_grid=f_grid + ).astype(complex) + + data_array = self._get_empty_data_array(az_grid.shape, beam_type="power") + + for feed_i in np.arange(self.Nfeeds): + data_array[0, feed_i] = ( + np.abs(efield_vals[0, feed_i]) ** 2 + + np.abs(efield_vals[1, feed_i]) ** 2 + ) + + if self.Npols > self.Nfeeds: + # do cross pols + data_array[0, 2] = efield_vals[0, 0] * np.conj( + efield_vals[0, 1] + ) + efield_vals[1, 0] * np.conj(efield_vals[1, 1]) + data_array[0, 3] = np.conj(data_array[0, 2]) + + return data_array + + @combine_docstrings(UVBeam.new) + def to_uvbeam( + self, + freq_array: npt.NDArray[float], + beam_type: Literal["efield", "power"] = "efield", + pixel_coordinate_system: ( + Literal["az_za", "orthoslant_zenith", "healpix"] | None + ) = None, + axis1_array: npt.NDArray[float] | None = None, + axis2_array: npt.NDArray[float] | None = None, + nside: int | None = None, + healpix_pixel_array: npt.NDArray[int] | None = None, + ordering: Literal["ring", "nested"] | None = None, + ): + """Generate a UVBeam object from an AnalyticBeam object. + + This method evaluates the analytic beam at a set of locations and + frequencies to create a UVBeam object. This can be useful for testing + and some other operations, but it is of course an approximation. + + Parameters + ---------- + freq_array : ndarray of float + Array of frequencies in Hz to evaluate the beam at. + beam_type : str + Beam type, either "efield" or "power". + pixel_coordinate_system : str + Pixel coordinate system, options are "az_za", "orthoslant_zenith" and + "healpix". Forced to be "healpix" if ``nside`` is given and by + *default* set to "az_za" if not. Currently, only "az_za" and "healpix" + are implemented. + axis1_array : ndarray of float + Coordinates along first pixel axis (e.g. azimuth for an azimuth/zenith + angle coordinate system) to evaluate the beam at. Must be regularly + spaced. Should not provided for healpix coordinates. + axis2_array : ndarray of float + Coordinates along second pixel axis (e.g. zenith angle for an + azimuth/zenith angle coordinate system) to evaluate the beam at. Must + be regularly spaced. Should not provided for healpix coordinates. + nside : int + Healpix nside parameter, should only be provided for healpix coordinates. + healpix_pixel_array : ndarray of int + Healpix pixels to include. If nside is provided, defaults to all the + pixels in the Healpix map. + ordering : str + Healpix ordering parameter, defaults to "ring" if nside is provided. + + """ + if beam_type not in ["efield", "power"]: + raise ValueError("Beam type must be 'efield' or 'power'") + + if beam_type == "efield": + feed_array = self.feed_array + polarization_array = None + else: + feed_array = None + polarization_array = self.polarization_array + + if pixel_coordinate_system is not None: + allowed_coord_sys = list(UVBeam().coordinate_system_dict.keys()) + if pixel_coordinate_system not in allowed_coord_sys: + raise ValueError( + f"Unknown coordinate system {pixel_coordinate_system}. UVBeam " + f"supported coordinate systems are: {allowed_coord_sys}." + ) + + if pixel_coordinate_system not in ["az_za", "healpix"]: + raise NotImplementedError( + "Currently this method only supports 'az_za' and 'healpix' " + "pixel_coordinate_systems." + ) + + uvb = UVBeam.new( + telescope_name="Analytic Beam", + data_normalization="physical", + feed_name=self.__repr__(), + feed_version="1.0", + model_name=self.__repr__(), + model_version="1.0", + freq_array=freq_array, + feed_array=feed_array, + polarization_array=polarization_array, + x_orientation=self.x_orientation, + pixel_coordinate_system=pixel_coordinate_system, + axis1_array=axis1_array, + axis2_array=axis2_array, + nside=nside, + healpix_pixel_array=healpix_pixel_array, + ordering=ordering, + history=f"created from a pyuvdata analytic beam: {self.__repr__()}", + ) + + if uvb.pixel_coordinate_system == "healpix": + try: + from astropy_healpix import HEALPix + except ImportError as e: + raise ImportError( + "astropy_healpix is not installed but is " + "required for healpix functionality. " + "Install 'astropy-healpix' using conda or pip." + ) from e + hp_obj = HEALPix(nside=uvb.nside, order=uvb.ordering) + hpx_lon, hpx_lat = hp_obj.healpix_to_lonlat(uvb.pixel_array) + za_array, az_array = utils.coordinates.hpx_latlon_to_zenithangle_azimuth( + hpx_lat.radian, hpx_lon.radian + ) + + else: + az_array, za_array = np.meshgrid(uvb.axis1_array, uvb.axis2_array) + az_array = az_array.flatten() + za_array = za_array.flatten() + + if beam_type == "efield": + eval_function = "efield_eval" + else: + eval_function = "power_eval" + + data_array = getattr(self, eval_function)( + az_array=az_array, za_array=za_array, freq_array=freq_array + ) + + if uvb.pixel_coordinate_system == "az_za": + data_array = data_array.reshape(uvb.data_array.shape) + + uvb.data_array = data_array + + uvb.check() + return uvb + + +def _analytic_beam_constructor(loader, node): + """ + Define a yaml constructor for analytic beams. + + The yaml must specify a "class" field with an importable class and any + required inputs to that class's constructor. + + Example yamls (note that the node key can be anything, it does not need to + be 'beam'): + + * beam: !AnalyticBeam + class: AiryBeam + diameter: 4.0 + + * beam: !AnalyticBeam + class: GaussianBeam + reference_frequency: 120000000. + spectral_index: -1.5 + sigma: 0.26 + + * beam: !AnalyticBeam + class: ShortDipoleBeam + + + Parameters + ---------- + loader: yaml.Loader + An instance of a yaml Loader object. + node: yaml.Node + A yaml node object. + + Returns + ------- + beam + An instance of an AnalyticBeam subclass. + + """ + values = loader.construct_mapping(node, deep=True) + if "class" not in values: + raise ValueError("yaml entries for AnalyticBeam must specify a class") + class_parts = (values.pop("class")).split(".") + class_name = class_parts[-1] + + if class_name not in AnalyticBeam.__types__ and len(class_parts) > 1: + module = (".").join(class_parts[:-1]) + module = importlib.import_module(module) + + if class_name not in AnalyticBeam.__types__: + raise NameError( + f"{class_name} is not a known AnalyticBeam. Available options are: " + f"{list(AnalyticBeam.__types__.keys())}. If it is a custom beam, " + "either ensure the module is imported, or specify the beam with " + "dot-pathed modules included (i.e. `my_module.MyAnalyticBeam`)" + ) + + beam_class = AnalyticBeam.__types__[class_name] + + beam = beam_class(**values) + + return beam + + +yaml.add_constructor( + "!AnalyticBeam", _analytic_beam_constructor, Loader=yaml.SafeLoader +) +yaml.add_constructor( + "!AnalyticBeam", _analytic_beam_constructor, Loader=yaml.FullLoader +) + + +def _analytic_beam_representer(dumper, beam): + """ + Define a yaml representer for analytic beams. + + Parameters + ---------- + dumper: yaml.Dumper + An instance of a yaml Loader object. + beam: AnalyticBeam subclass + An analytic beam object. + + Returns + ------- + str + The yaml representation of the analytic beam. + + """ + mapping = { + "class": beam.__module__ + "." + beam.__class__.__name__, + **dataclasses.asdict(beam), + } + if "feed_array" in mapping: + mapping["feed_array"] = mapping["feed_array"].tolist() + + return dumper.represent_mapping("!AnalyticBeam", mapping) + + +yaml.add_multi_representer( + AnalyticBeam, _analytic_beam_representer, Dumper=yaml.SafeDumper +) +yaml.add_multi_representer(AnalyticBeam, _analytic_beam_representer, Dumper=yaml.Dumper) + + +@dataclass(kw_only=True) +class UnpolarizedAnalyticBeam(AnalyticBeam): + """ + Unpolarized analytic beam base class. + + Attributes + ---------- + feed_array : array-like of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east") + or r & l. + x_orientation : str + Physical orientation of the feed for the x feed. Not meaningful for + unpolarized analytic beams, but clarifies the orientation of the dipole + for polarized beams like the ShortDipoleBeam and matches with the meaning + on UVBeam objects. + + Parameters + ---------- + feed_array : array-like of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east") + or r & l. Default is ["x", "y"]. + x_orientation : str + Physical orientation of the feed for the x feed. Not meaningful for + unpolarized analytic beams, but clarifies the orientation of the dipole + for for polarized beams like the ShortDipoleBeam and matches with the + meaning on UVBeam objects + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) for + the power beam. + + """ + + feed_array: npt.npt.ArrayLike[str] | None = field( + default=None, repr=False, compare=False + ) + x_orientation: Literal["east", "north"] = field( + default="east", repr=False, compare=False + ) + + # the basis vector type doesn't matter for unpolarized beams, just hardcode + # it here so subclasses don't have to deal with it. + basis_vector_type = "az_za" + + +@dataclass(kw_only=True) +class AiryBeam(UnpolarizedAnalyticBeam): + """ + A zenith pointed Airy beam. + + Airy beams are the diffraction pattern of a circular aperture, so represent + an idealized dish. Requires a dish diameter in meters and is inherently + chromatic and unpolarized. + + The unpolarized nature leads to some results that may be surprising to radio + astronomers: if two feeds are specified they will have identical responses + and the cross power beam between the two feeds will be identical to the + power beam for a single feed. + + Attributes + ---------- + diameter : float + Dish diameter in meters. + + Parameters + ---------- + diameter : float + Dish diameter in meters. + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) for + the power beam. + + """ + + diameter: float + + # Have to define this because an Airy E-field response can go negative, + # so it cannot just be calculated from the sqrt of a power beam. + def _efield_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the efield at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape) + + kvals = (2.0 * np.pi) * f_grid / speed_of_light.to("m/s").value + xvals = (self.diameter / 2.0) * np.sin(za_grid) * kvals + values = np.zeros_like(xvals) + nz = xvals != 0.0 + ze = xvals == 0.0 + values[nz] = 2.0 * j1(xvals[nz]) / xvals[nz] + values[ze] = 1.0 + + for fn in np.arange(self.Nfeeds): + data_array[0, fn, :, :] = values / np.sqrt(2.0) + data_array[1, fn, :, :] = values / np.sqrt(2.0) + + return data_array + + +def diameter_to_sigma(diameter: float, freq_array: npt.NDArray[float]) -> float: + """ + Find the sigma that gives a beam width similar to an Airy disk. + + Find the stddev of a gaussian with fwhm equal to that of + an Airy disk's main lobe for a given diameter. + + Parameters + ---------- + diameter : float + Antenna diameter in meters + freq_array : array of float + Frequencies in Hz + + Returns + ------- + sigma : float + The standard deviation in zenith angle radians for a Gaussian beam + with FWHM equal to that of an Airy disk's main lobe for an aperture + with the given diameter. + + """ + wavelengths = speed_of_light.to("m/s").value / freq_array + + scalar = 2.2150894 # Found by fitting a Gaussian to an Airy disk function + + sigma = np.arcsin(scalar * wavelengths / (np.pi * diameter)) * 2 / 2.355 + + return sigma + + +@dataclass(kw_only=True) +class GaussianBeam(UnpolarizedAnalyticBeam): + """ + A circular, zenith pointed Gaussian beam. + + Requires either a dish diameter in meters or a standard deviation sigma in + radians. Gaussian beams specified by a diameter will have their width + matched to an Airy beam at each simulated frequency, so are inherently + chromatic. For Gaussian beams specified with sigma, the sigma_type defines + whether the width specified by sigma specifies the width of the E-Field beam + (default) or power beam in zenith angle. If only sigma is specified, the + beam is achromatic, optionally both the spectral_index and reference_frequency + parameters can be set to generate a chromatic beam with standard deviation + defined by a power law: + + stddev(f) = sigma * (f/ref_freq)**(spectral_index) + + The unpolarized nature leads to some results that may be + surprising to radio astronomers: if two feeds are specified they will have + identical responses and the cross power beam between the two feeds will be + identical to the power beam for a single feed. + + Attributes + ---------- + sigma : float + Standard deviation in radians for the gaussian beam. Only one of sigma + and diameter should be set. + sigma_type : str + Either "efield" or "power" to indicate whether the sigma specifies the size of + the efield or power beam. Ignored if `sigma` is None. + diameter : float + Dish diameter in meters to use to define the size of the gaussian beam, by + matching the FWHM of the gaussian to the FWHM of an Airy disk. This will result + in a frequency dependent beam. Only one of sigma and diameter should be set. + spectral_index : float + Option to scale the gaussian beam width as a power law with frequency. If set + to anything other than zero, the beam will be frequency dependent and the + `reference_frequency` must be set. Ignored if `sigma` is None. + reference_frequency : float + The reference frequency for the beam width power law, required if `sigma` is not + None and `spectral_index` is not zero. Ignored if `sigma` is None. + + Parameters + ---------- + sigma : float + Standard deviation in radians for the gaussian beam. Only one of sigma + and diameter should be set. + sigma_type : str + Either "efield" or "power" to indicate whether the sigma specifies the size of + the efield or power beam. Ignored if `sigma` is None. + diameter : float + Dish diameter in meters to use to define the size of the gaussian beam, by + matching the FWHM of the gaussian to the FWHM of an Airy disk. This will result + in a frequency dependent beam. Only one of sigma and diameter should be set. + spectral_index : float + Option to scale the gaussian beam width as a power law with frequency. If set + to anything other than zero, the beam will be frequency dependent and the + `reference_frequency` must be set. Ignored if `sigma` is None. + reference_frequency : float + The reference frequency for the beam width power law, required if `sigma` is not + None and `spectral_index` is not zero. Ignored if `sigma` is None. + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) for + the power beam. + + """ + + sigma: float | None = None + sigma_type: Literal["efield", "power"] = "efield" + diameter: float | None = None + spectral_index: float = 0.0 + reference_frequency: float = None + + def validate(self): + """Post-initialization validation and conversions.""" + if (self.diameter is None and self.sigma is None) or ( + self.diameter is not None and self.sigma is not None + ): + if self.diameter is None: + raise ValueError("Either diameter or sigma must be set.") + else: + raise ValueError("Only one of diameter or sigma can be set.") + + if self.sigma is not None: + if self.sigma_type not in ["efield", "power"]: + raise ValueError("sigma_type must be 'efield' or 'power'.") + + if self.sigma_type == "power": + self.sigma = np.sqrt(2) * self.sigma + + if self.spectral_index != 0.0 and self.reference_frequency is None: + raise ValueError( + "reference_frequency must be set if `spectral_index` is not zero." + ) + if self.reference_frequency is None: + self.reference_frequency = 1.0 + + def get_sigmas(self, freq_array: npt.NDArray[float]) -> npt.NDArray[float]: + """ + Get the sigmas for the gaussian beam using the diameter (if defined). + + Parameters + ---------- + freq_array : array of floats + Frequency values to get the sigmas for in Hertz. + + Returns + ------- + sigmas : array_like of float + Beam sigma values as a function of frequency. Size will match the + freq_array size. + + """ + if self.diameter is not None: + sigmas = diameter_to_sigma(self.diameter, freq_array) + elif self.sigma is not None: + sigmas = ( + self.sigma + * (freq_array / self.reference_frequency) ** self.spectral_index + ) + return sigmas + + def _power_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the power at the given coordinates.""" + sigmas = self.get_sigmas(f_grid) + + values = np.exp(-(za_grid**2) / (sigmas**2)) + data_array = self._get_empty_data_array(az_grid.shape, beam_type="power") + for fn in np.arange(self.Npols): + # For power beams the first axis is shallow because we don't have to worry + # about polarization. + data_array[0, fn, :, :] = values + + return data_array + + +class ShortDipoleBeam(AnalyticBeam): + """ + A zenith pointed analytic short dipole beam with two crossed feeds. + + A classical short (Hertzian) dipole beam with two crossed feeds aligned east + and north. Short dipole beams are intrinsically polarized but achromatic. + Does not require any parameters, but the orientation of the dipole labelled + as "x" can be specified to align "north" or "east" via the x_orientation + parameter (matching the parameter of the same name on UVBeam and UVData + objects). + + Attributes + ---------- + feed_array : array-like of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east"). + x_orientation : str + The orientation of the dipole labeled 'x'. The default ("east") means + that the x dipole is aligned east-west and that the y dipole is aligned + north-south. + + Parameters + ---------- + feed_array : list of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east") + or r & l. Default is ["e", "n"]. + x_orientation : str + The orientation of the dipole labeled 'x'. The default ("east") means + that the x dipole is aligned east-west and that the y dipole is aligned + north-south. + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) + for the power beam. + + """ + + basis_vector_type = "az_za" + + def validate(self): + """Post-initialization validation and conversions.""" + if self.feed_array is None: + self.feed_array = ["e", "n"] + + allowed_feeds = ["n", "e", "x", "y"] + for feed in self.feed_array: + if feed not in allowed_feeds: + raise ValueError( + f"Feeds must be one of: {allowed_feeds}, " + f"got feeds: {self.feed_array}" + ) + + def _efield_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the efield at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape) + + # The first dimension is for [azimuth, zenith angle] in that order + # the second dimension is for feed [e, n] in that order + data_array[0, self.east_ind] = -np.sin(az_grid) + data_array[0, self.north_ind] = np.cos(az_grid) + data_array[1, self.east_ind] = np.cos(za_grid) * np.cos(az_grid) + data_array[1, self.north_ind] = np.cos(za_grid) * np.sin(az_grid) + + return data_array + + def _power_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the power at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape, beam_type="power") + + # these are just the sum in quadrature of the efield components. + # some trig work is done to reduce the number of cos/sin evaluations + data_array[0, 0] = 1 - (np.sin(za_grid) * np.cos(az_grid)) ** 2 + data_array[0, 1] = 1 - (np.sin(za_grid) * np.sin(az_grid)) ** 2 + + if self.Npols > self.Nfeeds: + # cross pols are included + data_array[0, 2] = -(np.sin(za_grid) ** 2) * np.sin(2.0 * az_grid) / 2.0 + data_array[0, 3] = data_array[0, 2] + + return data_array + + +@dataclass(kw_only=True) +class UniformBeam(UnpolarizedAnalyticBeam): + """ + A uniform beam. + + Uniform beams have identical responses in all directions, so are quite + unphysical but can be useful for testing other aspects of simulators. They + are unpolarized and achromatic and do not take any parameters. + + The unpolarized nature leads to some results that may be surprising to radio + astronomers: if two feeds are specified they will have identical responses + and the cross power beam between the two feeds will be identical to the + power beam for a single feed. + + Attributes + ---------- + feed_array : array-like of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east") + or r & l. + x_orientation : str + Physical orientation of the feed for the x feed. Not meaningful for + UniformBeams, which are unpolarized. + + Parameters + ---------- + feed_array : array-like of str + Feeds to define this beam for, e.g. x & y or n & e (for "north" and "east") + or r & l. Default is ["x", "y"]. + x_orientation : str + Physical orientation of the feed for the x feed. Not meaningful for + UniformBeams, which are unpolarized. + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) for + the power beam. + + """ + + def _power_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the power at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape, beam_type="power") + + data_array = data_array + 1.0 + + return data_array diff --git a/src/pyuvdata/beam_interface.py b/src/pyuvdata/beam_interface.py new file mode 100644 index 0000000000..50609407d4 --- /dev/null +++ b/src/pyuvdata/beam_interface.py @@ -0,0 +1,195 @@ +# Copyright (c) 2022 Radio Astronomy Software Group +# Licensed under the 2-clause BSD License + +"""Definition for BeamInterface object.""" + +from __future__ import annotations + +import copy +import warnings +from dataclasses import InitVar, dataclass +from typing import Literal + +import numpy as np +import numpy.typing as npt + +from .analytic_beam import AnalyticBeam +from .uvbeam import UVBeam + +# Other methods we may want to include: +# - beam area +# - beam squared area +# - efield_to_pstokes + + +@dataclass +class BeamInterface: + """ + Definition for a unified beam interface. + + This object provides a unified interface for UVBeam and AnalyticBeam objects + to compute beam response values in any direction. + + Attributes + ---------- + beam : pyuvdata.UVBeam or pyuvdata.AnalyticBeam + Beam object to use for computations + beam_type : str + The beam type, either "efield" or "power". + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne). + Used if beam is a UVBeam and and the input UVBeam is an Efield beam but + beam_type is "power". + Ignored otherwise (the cross pol inclusion is set by the beam object.) + + """ + + beam: AnalyticBeam | UVBeam + beam_type: Literal["efield", "power"] | None = None + include_cross_pols: InitVar[bool] = True + + def __post_init__(self, include_cross_pols: bool): + """ + Post-initialization validation and conversions. + + Parameters + ---------- + include_cross_pols : bool + Option to include the cross polarized beams (e.g. xy and yx or en and ne) + for the power beam. + + """ + if not isinstance(self.beam, UVBeam) and not issubclass( + type(self.beam), AnalyticBeam + ): + raise ValueError( + "beam must be a UVBeam or an AnalyticBeam instance, not a " + f"{type(self.beam)}." + ) + if isinstance(self.beam, UVBeam): + if self.beam_type is None or self.beam_type == self.beam.beam_type: + self.beam_type = self.beam.beam_type + elif self.beam_type == "power": + warnings.warn( + "Input beam is an efield UVBeam but beam_type is specified as " + "'power'. Converting efield beam to power." + ) + self.beam.efield_to_power(calc_cross_pols=include_cross_pols) + else: + raise ValueError( + "Input beam is a power UVBeam but beam_type is specified as " + "'efield'. It's not possible to convert a power beam to an " + "efield beam, either provide an efield UVBeam or do not " + "specify `beam_type`." + ) + + @property + def _isuvbeam(self): + return isinstance(self.beam, UVBeam) + + def compute_response( + self, + *, + az_array: npt.NDArray[float], + za_array: npt.NDArray[float], + freq_array: npt.NDArray[float] | None, + az_za_grid: bool = False, + interpolation_function=None, + freq_interp_kind=None, + freq_interp_tol: float = 1.0, + reuse_spline: bool = False, + spline_opts: dict | None = None, + check_azza_domain: bool = True, + ): + """ + Calculate beam responses, by interpolating UVBeams or evaluating AnalyticBeams. + + Parameters + ---------- + az_array : array_like of floats, optional + Azimuth values to compute the response for in radians, either + specifying the azimuth positions for every interpolation point or + specifying the azimuth vector for a meshgrid if az_za_grid is True. + za_array : array_like of floats, optional + Zenith values to compute the response for in radians, either + specifying the zenith positions for every interpolation point or + specifying the zenith vector for a meshgrid if az_za_grid is True. + freq_array : array_like of floats or None + Frequency values to compute the response for in Hz. If beam is a UVBeam + this can be set to None to get the responses at the UVBeam frequencies. + It must be a numpy array if beam is an analytic beam. + az_za_grid : bool + Option to treat the `az_array` and `za_array` as the input vectors + for points on a mesh grid. + interpolation_function : str, optional + Specify the interpolation function to use. Defaults to: "az_za_simple" for + objects with the "az_za" pixel_coordinate_system and "healpix_simple" for + objects with the "healpix" pixel_coordinate_system. Only applies if + beam is a UVBeam. + freq_interp_kind : str + Interpolation method to use frequency. See scipy.interpolate.interp1d + for details. Defaults to "cubic". + freq_interp_tol : float + Frequency distance tolerance [Hz] of nearest neighbors. + If *all* elements in freq_array have nearest neighbor distances within + the specified tolerance then return the beam at each nearest neighbor, + otherwise interpolate the beam. Only applies if beam is a UVBeam. + reuse_spline : bool + Save the interpolation functions for reuse. Only applies if beam is + a UVBeam and interpolation_function is "az_za_simple". + spline_opts : dict + Provide options to numpy.RectBivariateSpline. This includes spline + order parameters `kx` and `ky`, and smoothing parameter `s`. Only + applies if beam is a UVBeam and interpolation_function is "az_za_simple". + check_azza_domain : bool + Whether to check the domain of az/za to ensure that they are covered by the + intrinsic data array. Checking them can be quite computationally expensive. + Conversely, if the passed az/za are outside of the domain, they will be + silently extrapolated and the behavior is not well-defined. Only + applies if beam is a UVBeam and interpolation_function is "az_za_simple". + + Returns + ------- + array_like of float or complex + An array of computed values, shape (Naxes_vec, Nfeeds or Npols, + freq_array.size, az_array.size) + """ + if not isinstance(az_array, np.ndarray) or az_array.ndim != 1: + raise ValueError("az_array must be a one-dimensional numpy array") + if not isinstance(za_array, np.ndarray) or za_array.ndim != 1: + raise ValueError("za_array must be a one-dimensional numpy array") + + if self._isuvbeam: + interp_data, _ = self.beam.interp( + az_array=az_array, + za_array=za_array, + az_za_grid=az_za_grid, + freq_array=freq_array, + interpolation_function=interpolation_function, + freq_interp_kind=freq_interp_kind, + freq_interp_tol=freq_interp_tol, + reuse_spline=reuse_spline, + spline_opts=spline_opts, + check_azza_domain=check_azza_domain, + ) + else: + if not isinstance(freq_array, np.ndarray) or freq_array.ndim != 1: + raise ValueError("freq_array must be a one-dimensional numpy array") + if az_za_grid: + az_array_use, za_array_use = np.meshgrid(az_array, za_array) + az_array_use = az_array_use.flatten() + za_array_use = za_array_use.flatten() + else: + az_array_use = copy.copy(az_array) + za_array_use = copy.copy(za_array) + + if self.beam_type == "efield": + interp_data = self.beam.efield_eval( + az_array=az_array_use, za_array=za_array_use, freq_array=freq_array + ) + else: + interp_data = self.beam.power_eval( + az_array=az_array_use, za_array=za_array_use, freq_array=freq_array + ) + + return interp_data diff --git a/src/pyuvdata/data/test_analytic_beam.py b/src/pyuvdata/data/test_analytic_beam.py new file mode 100644 index 0000000000..ed5cab9c23 --- /dev/null +++ b/src/pyuvdata/data/test_analytic_beam.py @@ -0,0 +1,59 @@ +# Copyright (c) 2024 Radio Astronomy Software Group +# Licensed under the 2-clause BSD License +"""Define an AnalyticBeam subclass for testing the AnalyticBeam plugin code.""" + +from dataclasses import dataclass + +import numpy as np +import numpy.typing as npt + +from ..analytic_beam import UnpolarizedAnalyticBeam + + +@dataclass(kw_only=True) +class CosPowerTest(UnpolarizedAnalyticBeam): + """A test class to support testing the AnalyticBeam plugin code.""" + + width: float + + def _power_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the power at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape, beam_type="power") + + for pol_i in np.arange(self.Npols): + # For power beams the first axis is shallow because we don't have to worry + # about polarization. + data_array[0, pol_i, :, :] = np.cos(self.width * za_grid) ** 2 + + return data_array + + +@dataclass(kw_only=True) +class CosEfieldTest(UnpolarizedAnalyticBeam): + """A test class to support testing the AnalyticBeam plugin code.""" + + width: float + + def _efield_eval( + self, + *, + az_grid: npt.NDArray[float], + za_grid: npt.NDArray[float], + f_grid: npt.NDArray[float], + ) -> npt.NDArray[float]: + """Evaluate the efield at the given coordinates.""" + data_array = self._get_empty_data_array(az_grid.shape) + + for feed_i in np.arange(self.Nfeeds): + # For power beams the first axis is shallow because we don't have to worry + # about polarization. + data_array[0, feed_i, :, :] = np.cos(self.width * za_grid) / np.sqrt(2) + data_array[1, feed_i, :, :] = np.cos(self.width * za_grid) / np.sqrt(2) + + return data_array diff --git a/src/pyuvdata/utils/coordinates.py b/src/pyuvdata/utils/coordinates.py index bb0f2b28b8..5644ea6131 100644 --- a/src/pyuvdata/utils/coordinates.py +++ b/src/pyuvdata/utils/coordinates.py @@ -482,6 +482,90 @@ def ECEF_from_ENU( return xyz +def hpx_latlon_to_zenithangle_azimuth(hpx_lat, hpx_lon): + """ + Convert from healpix lat/lon to UVBeam za/az convention. + + Note that this is different (unfortunately) from the conversion between + the UVBeam Zenith Angle, Azimuth coordinate system and the astropy Alt/Az + coordinate system. The astropy Azimuth runs the opposite direction and + has a different starting angle than UVBeam's Azimuth because they are both + right handed coordinate systems but Altitude moves the opposite direction + than Zenith Angle does. + + The conversion used in this code sets the Healpix latitude to 90-zenith angle + but it does not change the origin or direction for the azimuthal angle. This + convention was set early in the development of UVBeam and we preserve it to + preserve backwards compatibility. + + Parameters + ---------- + hpx_lat: float or array of float + Healpix latiudinal coordinate in radians. + hpx_lon: float or array of float + Healpix longitudinal coordinate in radians. + + Returns + ------- + zenith_angle: float or array of float + In radians + azimuth: float or array of float + In radians in uvbeam convention: North of East(East=0, North=pi/2) + + """ + input_alt = np.asarray(hpx_lat) + input_az = np.asarray(hpx_lon) + if input_alt.shape != input_az.shape: + raise ValueError("shapes of hpx_lat and hpx_lon values must match.") + + zenith_angle = np.pi / 2 - hpx_lat + azimuth = hpx_lon + + return zenith_angle, azimuth + + +def zenithangle_azimuth_to_hpx_latlon(zenith_angle, azimuth): + """ + Convert from UVBeam az/za convention to healpix lat/lon. + + Note that this is different (unfortunately) from the conversion between + the UVBeam Zenith Angle, Azimuth coordinate system and the astropy Alt/Az + coordinate system. The astropy Azimuth runs the opposite direction and + has a different starting angle than UVBeam's Azimuth because they are both + right handed coordinate systems but Altitude moves the opposite direction + than Zenith Angle does. + + The conversion used in this code sets the Healpix latitude to 90-zenith angle + but it does not change the origin or direction for the azimuthal angle. This + convention was set early in the development of UVBeam and we preserve it to + preserve backwards compatibility. + + Parameters + ---------- + zenith_angle: float, array_like of float + Zenith angle in radians + azimuth: float, array_like of float + Azimuth in radians in uvbeam convention: North of East(East=0, North=pi/2) + + Returns + ------- + hpx_lat: float or array of float + Healpix latiudinal coordinate in radians. + hpx_lon: float or array of float + Healpix longitudinal coordinate in radians. + + """ + input_za = np.array(zenith_angle) + input_az = np.array(azimuth) + if input_za.shape != input_az.shape: + raise ValueError("shapes of zenith_angle and azimuth values must match.") + + lat_array = np.pi / 2 - zenith_angle + lon_array = azimuth + + return lat_array, lon_array + + def check_surface_based_positions( *, telescope_loc=None, diff --git a/src/pyuvdata/uvbeam/initializers.py b/src/pyuvdata/uvbeam/initializers.py index 08789f0187..8624b561cc 100644 --- a/src/pyuvdata/uvbeam/initializers.py +++ b/src/pyuvdata/uvbeam/initializers.py @@ -18,7 +18,7 @@ def new_uvbeam( *, telescope_name: str, data_normalization: Literal["physical", "peak", "solid_angle"], - freq_array: npt.NDArray[np.float], + freq_array: npt.NDArray[float], feed_name: str = "default", feed_version: str = "0.0", model_name: str = "default", @@ -31,20 +31,21 @@ def new_uvbeam( pixel_coordinate_system: ( Literal["az_za", "orthoslant_zenith", "healpix"] | None ) = None, - axis1_array: npt.NDArray[np.float] | None = None, - axis2_array: npt.NDArray[np.float] | None = None, + axis1_array: npt.NDArray[float] | None = None, + axis2_array: npt.NDArray[float] | None = None, nside: int | None = None, ordering: Literal["ring", "nested"] | None = None, - healpix_pixel_array: npt.NDArray[np.int] | None = None, - basis_vector_array: npt.NDArray[np.float] | None = None, - bandpass_array: npt.NDArray[np.float] | None = None, - element_location_array: npt.NDArray[np.float] | None = None, + healpix_pixel_array: npt.NDArray[int] | None = None, + basis_vector_array: npt.NDArray[float] | None = None, + bandpass_array: npt.NDArray[float] | None = None, + element_location_array: npt.NDArray[float] | None = None, element_coordinate_system: Literal["n-e", "x-y"] | None = None, - delay_array: npt.NDArray[np.float] | None = None, - gain_array: npt.NDArray[np.float] | None = None, - coupling_matrix: npt.NDArray[np.float] | None = None, - data_array: npt.NDArray[np.float] | None = None, + delay_array: npt.NDArray[float] | None = None, + gain_array: npt.NDArray[float] | None = None, + coupling_matrix: npt.NDArray[float] | None = None, + data_array: npt.NDArray[float] | None = None, history: str = "", + **kwargs, ): r"""Create a new UVBeam object with default parameters. @@ -88,10 +89,12 @@ def new_uvbeam( "az_za" if not. axis1_array : ndarray of float Coordinates along first pixel axis (e.g. azimuth for an azimuth/zenith - angle coordinate system). Should not provided for healpix coordinates. + angle coordinate system). Must be regularly spaced. Should not provided + for healpix coordinates. axis2_array : ndarray of float Coordinates along second pixel axis (e.g. zenith angle for an azimuth/zenith - angle coordinate system). Should not provided for healpix coordinates. + angle coordinate system). Must be regularly spaced. Should not provided + for healpix coordinates. nside : int Healpix nside parameter, should only be provided for healpix coordinates. healpix_pixel_array : ndarray of int @@ -198,6 +201,13 @@ def new_uvbeam( uvb.Naxes_vec = 2 uvb.Ncomponents_vec = 2 elif axis1_array is not None and axis2_array is not None: + for ind, arr in enumerate([axis1_array, axis2_array]): + # both axes arrays have the same same tols since we just made this + # object and haven't modified the tols + if not utils.tools._test_array_constant_spacing( + arr, tols=uvb._axis1_array.tols + ): + raise ValueError(f"axis{ind+1}_array must be regularly spaced") uvb.axis1_array = axis1_array uvb.axis2_array = axis2_array @@ -242,6 +252,12 @@ def new_uvbeam( if x_orientation is not None: uvb.x_orientation = utils.XORIENTMAP[x_orientation.lower()] + for k, v in kwargs.items(): + if hasattr(uvb, k): + setattr(uvb, k, v) + else: + raise ValueError(f"Unrecognized keyword argument: {k}") + if basis_vector_array is not None: if uvb.pixel_coordinate_system == "healpix": bv_shape = (uvb.Naxes_vec, uvb.Ncomponents_vec, uvb.Npixels) @@ -346,6 +362,9 @@ def new_uvbeam( polax = uvb.Nfeeds else: data_type = float + for pol in uvb.polarization_array: + if pol in [-3, -4, -7, -8]: + data_type = complex polax = uvb.Npols if uvb.pixel_coordinate_system == "healpix": diff --git a/src/pyuvdata/uvbeam/uvbeam.py b/src/pyuvdata/uvbeam/uvbeam.py index 71e44ddb89..3e99511195 100644 --- a/src/pyuvdata/uvbeam/uvbeam.py +++ b/src/pyuvdata/uvbeam/uvbeam.py @@ -4,13 +4,13 @@ """Primary container for radio telescope antenna beams.""" import copy +import importlib import os import warnings import numpy as np import yaml from astropy import units -from astropy.coordinates import Angle from docstring_parser import DocstringStyle from scipy import interpolate, ndimage @@ -22,6 +22,29 @@ __all__ = ["UVBeam"] +def _convert_feeds_to_pols(feed_array, calc_cross_pols, x_orientation=None): + n_feeds = np.asarray(feed_array).size + + feed_pol_order = [(0, 0)] + if n_feeds > 1: + feed_pol_order.append((1, 1)) + + if calc_cross_pols: + # to get here we have Nfeeds > 1 + feed_pol_order.extend([(0, 1), (1, 0)]) + + pol_strings = [] + for pair in feed_pol_order: + pol_strings.append(feed_array[pair[0]] + feed_array[pair[1]]) + polarization_array = np.array( + [ + utils.polstr2num(ps.upper(), x_orientation=x_orientation) + for ps in pol_strings + ] + ) + return polarization_array, feed_pol_order + + class UVBeam(UVBase): """ A class for defining a radio telescope antenna beam. @@ -237,7 +260,7 @@ def __init__(self): self._Nfeeds = uvp.UVParameter( "Nfeeds", - description="Number of feeds. " 'Not required if beam_type is "power".', + description="Number of feeds. Not required if beam_type is 'power'.", expected_type=int, acceptable_vals=[1, 2], required=False, @@ -922,32 +945,15 @@ def efield_to_power( # There are no cross pols with one feed. Set this so the power beam is real calc_cross_pols = False - efield_data = beam_object.data_array - efield_naxes_vec = beam_object.Naxes_vec - - feed_pol_order = [(0, 0)] - if beam_object.Nfeeds > 1: - feed_pol_order.append((1, 1)) - - if calc_cross_pols: - beam_object.Npols = beam_object.Nfeeds**2 - # to get here we have Nfeeds > 1 - feed_pol_order.extend([(0, 1), (1, 0)]) - else: - beam_object.Npols = beam_object.Nfeeds - - pol_strings = [] - for pair in feed_pol_order: - pol_strings.append( - beam_object.feed_array[pair[0]] + beam_object.feed_array[pair[1]] - ) - beam_object.polarization_array = np.array( - [ - utils.polstr2num(ps.upper(), x_orientation=self.x_orientation) - for ps in pol_strings - ] + beam_object.polarization_array, feed_pol_order = _convert_feeds_to_pols( + beam_object.feed_array, + calc_cross_pols, + x_orientation=beam_object.x_orientation, ) + beam_object.Npols = beam_object.polarization_array.size + efield_data = beam_object.data_array + efield_naxes_vec = beam_object.Naxes_vec if not keep_basis_vector: beam_object.Naxes_vec = 1 @@ -1386,9 +1392,14 @@ def _check_interpolation_domain(self, az_array, za_array, phi_use, theta_use): az_sq_dist = np.where(az_sq_dist > temp_arr, temp_arr, az_sq_dist) if np.any(np.sqrt(az_sq_dist + za_sq_dist) > (max_axis_diff * 2.0)): + if np.any(np.sqrt(za_sq_dist) > (max_axis_diff * 2.0)): + msg = " The zenith angles values are outside UVBeam coverage." + elif np.any(np.sqrt(az_sq_dist) > (max_axis_diff * 2.0)): + msg = " The azimuth values are outside UVBeam coverage." + raise ValueError( "at least one interpolation location " - "is outside of the UVBeam pixel coverage." + "is outside of the UVBeam pixel coverage." + msg ) def _prepare_coordinate_data(self, input_data_array): @@ -1915,8 +1926,12 @@ def _interp_healpix_bilinear( interp_basis_vector = self._prepare_basis_vector_array(az_array.size) hp_obj = HEALPix(nside=self.nside, order=self.ordering) - lat_array = Angle(np.pi / 2, units.radian) - Angle(za_array, units.radian) - lon_array = Angle(az_array, units.radian) + lat_array, lon_array = utils.coordinates.zenithangle_azimuth_to_hpx_latlon( + za_array, az_array + ) + lon_array = lon_array * units.rad + lat_array = lat_array * units.rad + for index3 in range(input_nfreqs): for index0 in range(self.Naxes_vec): for index2 in range(Npol_feeds): @@ -2021,6 +2036,11 @@ def interp( polarizations : list of str polarizations to interpolate if beam_type is 'power'. Default is all polarizations in self.polarization_array. + return_bandpass : bool + Option to return the bandpass. Only applies if `new_object` is False. + return_coupling : bool + Option to return the interpolated coupling matrix, only applies if + `antenna_type` is "phased_array" and `new_object` is False. new_object : bool Option to return a new UVBeam object with the interpolated data, if possible. Note that this is only possible for Healpix pixels or @@ -2046,7 +2066,8 @@ def interp( Whether to check the domain of az/za to ensure that they are covered by the intrinsic data array. Checking them can be quite computationally expensive. Conversely, if the passed az/za are outside of the domain, they will be - silently extrapolated and the behavior is not well-defined. + silently extrapolated and the behavior is not well-defined. Only + applies for `az_za_simple` interpolation. Returns ------- @@ -2107,6 +2128,9 @@ def interp( interp_func = self.interpolation_function_dict[interpolation_function]["func"] if freq_array is not None: + if freq_array.ndim != 1: + raise ValueError("freq_array must be one-dimensional") + # get frequency distances freq_dists = np.abs(self.freq_array - freq_array.reshape(-1, 1)) nearest_dist = np.min(freq_dists, axis=1) @@ -2151,9 +2175,11 @@ def interp( healpix_inds = np.arange(hp_obj.npix) hpx_lon, hpx_lat = hp_obj.healpix_to_lonlat(healpix_inds) - - za_array_use = (Angle(np.pi / 2, units.radian) - hpx_lat).radian - az_array_use = hpx_lon.radian + za_array_use, az_array_use = ( + utils.coordinates.hpx_latlon_to_zenithangle_azimuth( + hpx_lat.radian, hpx_lon.radian + ) + ) extra_keyword_dict = {} if interp_func in [ @@ -2382,15 +2408,15 @@ def to_healpix( pixels = np.arange(hp_obj.npix) hpx_lon, hpx_lat = hp_obj.healpix_to_lonlat(pixels) - - hpx_theta = (Angle(np.pi / 2, units.radian) - hpx_lat).radian - hpx_phi = hpx_lon.radian + hpx_zen_ang, hpx_az = utils.coordinates.hpx_latlon_to_zenithangle_azimuth( + hpx_lat.radian, hpx_lon.radian + ) inds_to_use = _uvbeam.find_healpix_indices( np.ascontiguousarray(self.axis2_array, dtype=np.float64), np.ascontiguousarray(self.axis1_array, dtype=np.float64), - np.ascontiguousarray(hpx_theta, dtype=np.float64), - np.ascontiguousarray(hpx_phi, dtype=np.float64), + np.ascontiguousarray(hpx_zen_ang, dtype=np.float64), + np.ascontiguousarray(hpx_az, dtype=np.float64), np.float64(hp_obj.pixel_resolution.to_value(units.radian)), ) @@ -4451,3 +4477,108 @@ def write_beamfits(self, filename, **kwargs): beamfits_obj = self._convert_to_filetype("beamfits") beamfits_obj.write_beamfits(filename, **kwargs) del beamfits_obj + + +def _uvbeam_constructor(loader, node): + """ + Define a yaml constructor for UVBeam objects. + + The yaml must specify a "filename" field pointing to the UVBeam readable file + and any desired arguments to the UVBeam.from_file method. + + Parameters + ---------- + loader: yaml.Loader + An instance of a yaml Loader object. + node: yaml.Node + A yaml node object. + + Returns + ------- + UVBeam + An instance of a UVBeam. + + """ + values = loader.construct_mapping(node) + if "filename" not in values: + raise ValueError("yaml entries for UVBeam must specify a filename.") + + files_use = values["filename"] + if isinstance(values["filename"], str): + files_use = [values["filename"]] + + if "path_variable" in values: + path_parts = (values.pop("path_variable")).split(".") + var_name = path_parts[-1] + if len(path_parts) == 1: + raise ValueError( + "If 'path_variable' is specified, it should take the form of a " + "module.variable_name where the variable name can be imported " + "from the module." + ) + else: + module = (".").join(path_parts[:-1]) + module = importlib.import_module(module) + path_var = getattr(module, var_name) + for f_i in range(len(files_use)): + files_use[f_i] = os.path.join(path_var, files_use[f_i]) + + if len(files_use) == 1: + files_use = files_use[0] + values["filename"] = files_use + + beam = UVBeam.from_file(**values) + + return beam + + +yaml.add_constructor("!UVBeam", _uvbeam_constructor, Loader=yaml.SafeLoader) +yaml.add_constructor("!UVBeam", _uvbeam_constructor, Loader=yaml.FullLoader) + + +def _uvbeam_representer(dumper, beam): + """ + Define a yaml representer for UVbeams. + + Note: since all the possible selects cannot be extracted from the object, + the object generated from this yaml may not be an exact match for the object + in memory. Also note that the filename parameter must not be None and must + point to an existing file. It's likely that the user will need to update + the filename parameter to include the full path. + + Parameters + ---------- + dumper: yaml.Dumper + An instance of a yaml Loader object. + beam: UVBeam + A UVbeam object, which must have a filename defined on it. + + Returns + ------- + str + The yaml representation of the UVbeam. + + """ + if beam.filename is None: + raise ValueError( + "beam must have a filename defined to be able to represent it in a yaml." + ) + files_use = beam.filename + if isinstance(files_use, str): + files_use = [files_use] + for file in files_use: + if not os.path.exists(file): + raise ValueError( + "all entries in the filename parameter must be existing files " + f"to be able to represent it in a yaml. {file} does not exist" + ) + if len(files_use) == 1: + files_use = files_use[0] + + mapping = {"filename": files_use} + + return dumper.represent_mapping("!UVBeam", mapping) + + +yaml.add_representer(UVBeam, _uvbeam_representer, Dumper=yaml.SafeDumper) +yaml.add_representer(UVBeam, _uvbeam_representer, Dumper=yaml.Dumper) diff --git a/tests/conftest.py b/tests/conftest.py index 1d72c96098..3dd03caf54 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,7 @@ import os +import numpy as np import pytest from astropy.coordinates import EarthLocation from astropy.time import Time @@ -123,3 +124,39 @@ def uvcalibrate_uvdata_oldfiles(uvcalibrate_uvdata_oldfiles_main): uvd = uvcalibrate_uvdata_oldfiles_main.copy() yield uvd + + +@pytest.fixture() +def az_za_coords(): + az_array = np.deg2rad(np.linspace(0, 350, 36)) + za_array = np.deg2rad(np.linspace(0, 90, 10)) + + return az_array, za_array + + +@pytest.fixture() +def az_za_deg_grid(az_za_coords): + az_array, za_array = az_za_coords + freqs = np.linspace(100, 200, 11) * 1e8 + + az_vals, za_vals = np.meshgrid(az_array, za_array) + + return az_vals.flatten(), za_vals.flatten(), freqs + + +@pytest.fixture() +def xy_grid(): + nfreqs = 20 + freqs = np.linspace(100e6, 130e6, nfreqs) + + xy_half_n = 250 + zmax = np.radians(90) # Degrees + arr = np.arange(-xy_half_n, xy_half_n) + x_arr, y_arr = np.meshgrid(arr, arr) + x_arr = x_arr.flatten() + y_arr = y_arr.flatten() + radius = np.sqrt(x_arr**2 + y_arr**2) / float(xy_half_n) + za_array = radius * zmax + az_array = np.arctan2(y_arr, x_arr) + + return az_array, za_array, freqs diff --git a/tests/test_analytic_beam.py b/tests/test_analytic_beam.py new file mode 100644 index 0000000000..87ccc761b2 --- /dev/null +++ b/tests/test_analytic_beam.py @@ -0,0 +1,564 @@ +# Copyright (c) 2024 Radio Astronomy Software Group +# Licensed under the 2-clause BSD License + +import re + +import numpy as np +import pytest +import yaml +from astropy.constants import c as speed_of_light +from scipy.special import j1 + +from pyuvdata import AiryBeam, GaussianBeam, ShortDipoleBeam, UniformBeam, UVBeam +from pyuvdata.analytic_beam import AnalyticBeam, UnpolarizedAnalyticBeam +from pyuvdata.testing import check_warnings + + +def test_airy_beam_values(az_za_deg_grid): + diameter_m = 14.0 + beam = AiryBeam(diameter=diameter_m) + + az_vals, za_vals, freqs = az_za_deg_grid + + beam_vals = beam.efield_eval(az_array=az_vals, za_array=za_vals, freq_array=freqs) + nsrcs = az_vals.size + n_freqs = freqs.size + + expected_data = np.zeros((2, 2, n_freqs, nsrcs), dtype=float) + za_grid, f_grid = np.meshgrid(za_vals, freqs) + c_ms = speed_of_light.to("m/s").value + xvals = diameter_m / 2.0 * np.sin(za_grid) * 2.0 * np.pi * f_grid / c_ms + airy_values = np.zeros_like(xvals) + nz = xvals != 0.0 + ze = xvals == 0.0 + airy_values[nz] = 2.0 * j1(xvals[nz]) / xvals[nz] + airy_values[ze] = 1.0 + for pol in range(2): + for feed in range(2): + expected_data[pol, feed, :, :] = airy_values / np.sqrt(2.0) + + np.testing.assert_allclose(beam_vals, expected_data) + + assert beam.__repr__() == f"AiryBeam(diameter={diameter_m})" + + +def test_airy_uv_beam_widths(xy_grid): + # Check that the width of the Airy disk beam in UV space corresponds with + # the dish diameter. + diameter_m = 25.0 + beam = AiryBeam(diameter=diameter_m) + + az_array, za_array, freqs = xy_grid + + wavelengths = speed_of_light.to("m/s").value / freqs + + beam_vals = beam.efield_eval(az_array=az_array, za_array=za_array, freq_array=freqs) + + ebeam = beam_vals[0, 0, :, :] + npix_side = int(np.sqrt(az_array.size)) + ebeam = ebeam.reshape(freqs.size, npix_side, npix_side) + beam_kern = np.fft.fft2(ebeam, axes=(1, 2)) + beam_kern = np.fft.fftshift(beam_kern, axes=(1, 2)) + for i, bk in enumerate(beam_kern): + # Cutoff at half a % of the maximum value in Fourier space. + thresh = np.max(np.abs(bk)) * 0.005 + points = np.sum(np.abs(bk) >= thresh) + upix = 1 / (2 * np.sin(np.max(za_array))) + area = np.sum(points) * upix**2 + kern_radius = np.sqrt(area / np.pi) + assert np.isclose(diameter_m / wavelengths[i], kern_radius, rtol=0.5) + + +@pytest.mark.parametrize("sigma_type", ["efield", "power"]) +def test_achromatic_gaussian_beam(az_za_deg_grid, sigma_type): + sigma_rad = np.deg2rad(5) + beam = GaussianBeam(sigma=sigma_rad, sigma_type=sigma_type) + + az_vals, za_vals, freqs = az_za_deg_grid + nsrcs = az_vals.size + n_freqs = freqs.size + + beam_vals = beam.efield_eval( + az_array=az_vals, za_array=za_vals, freq_array=np.array(freqs) + ) + + expected_data = np.zeros((2, 2, n_freqs, nsrcs), dtype=float) + + expand_za = np.repeat(za_vals[np.newaxis], n_freqs, axis=0) + if sigma_type == "power": + sigma_use = np.sqrt(2) * sigma_rad + else: + sigma_use = sigma_rad + + gaussian_vals = np.exp(-(expand_za**2) / (2 * sigma_use**2)) + + for pol in range(2): + for feed in range(2): + expected_data[pol, feed, :, :] = gaussian_vals / np.sqrt(2.0) + + np.testing.assert_allclose(beam_vals, expected_data) + + assert ( + beam.__repr__() == f"GaussianBeam(sigma={sigma_use.__repr__()}, " + f"sigma_type={sigma_type.__repr__()}, " + "diameter=None, spectral_index=0.0, reference_frequency=1.0)" + ) + + +def test_chromatic_gaussian(): + """ + Defining a gaussian beam with a spectral index and reference frequency. + Check that beam width follows prescribed power law. + """ + freqs = np.arange(120e6, 160e6, 4e6) + Npix = 1000 + alpha = -1.5 + sigma = np.radians(15.0) + + az = np.zeros(Npix) + za = np.linspace(0, np.pi / 2.0, Npix) + + beam = GaussianBeam(sigma=sigma, reference_frequency=freqs[0], spectral_index=alpha) + + # Get the widths at each frequency. + + vals = beam.efield_eval(az_array=az, za_array=za, freq_array=freqs) + # pick out a single polarization direction and feed + vals = vals[0, 0] + + # The beam peaks at 1/sqrt(2) in each pol. Find where it drops by a factor of 2 + half_power_val = 1 / (2.0 * np.sqrt(2.0)) + hwhm = za[np.argmin(np.abs(vals - half_power_val), axis=1)] + sig_f = sigma * (freqs / freqs[0]) ** alpha + np.testing.assert_allclose(sig_f, 2 * hwhm / 2.355, atol=1e-3) + + assert ( + beam.__repr__() + == f"GaussianBeam(sigma={sigma.__repr__()}, sigma_type='efield', " + f"diameter=None, spectral_index={alpha}, " + f"reference_frequency={freqs[0].__repr__()})" + ) + + +def test_diameter_to_sigma(az_za_deg_grid): + # The integrals of an Airy power beam and a Gaussian power beam, within + # the first Airy null, should be close if the Gaussian width is set to the + # Airy width. + diameter_m = 25.0 + abm = AiryBeam(diameter=diameter_m) + gbm = GaussianBeam(diameter=diameter_m) + + assert ( + gbm.__repr__() + == f"GaussianBeam(sigma=None, sigma_type='efield', diameter={diameter_m}, " + "spectral_index=0.0, reference_frequency=None)" + ) + + az_array, za_array, freqs = az_za_deg_grid + + wavelengths = speed_of_light.to("m/s").value / freqs + + airy_vals = abm.power_eval( + az_array=az_array.flatten(), za_array=za_array.flatten(), freq_array=freqs + ) + + gauss_vals = gbm.power_eval( + az_array=az_array.flatten(), za_array=za_array.flatten(), freq_array=freqs + ) + + # Remove pol/spw/feed axes. + airy_vals = airy_vals[0, 0] + gauss_vals = gauss_vals[0, 0] + + for fi in range(freqs.size): + null = 1.22 * wavelengths[fi] / diameter_m + inds = np.where(np.abs(za_array) < null) + + # Assert integral of power beams within the first Airy null are close + np.testing.assert_allclose( + np.sum(airy_vals[fi, inds]), np.sum(gauss_vals[fi, inds]), rtol=1e-2 + ) + + +def test_short_dipole_beam(az_za_deg_grid): + beam = ShortDipoleBeam() + + az_vals, za_vals, freqs = az_za_deg_grid + + nsrcs = az_vals.size + n_freqs = freqs.size + + efield_vals = beam.efield_eval(az_array=az_vals, za_array=za_vals, freq_array=freqs) + + expected_data = np.zeros((2, 2, n_freqs, nsrcs), dtype=float) + + expected_data[0, 0] = -np.sin(az_vals) + expected_data[0, 1] = np.cos(az_vals) + expected_data[1, 0] = np.cos(za_vals) * np.cos(az_vals) + expected_data[1, 1] = np.cos(za_vals) * np.sin(az_vals) + + np.testing.assert_allclose(efield_vals, expected_data) + + power_vals = beam.power_eval(az_array=az_vals, za_array=za_vals, freq_array=freqs) + expected_data = np.zeros((1, 4, n_freqs, nsrcs), dtype=float) + + expected_data[0, 0] = 1 - np.sin(za_vals) ** 2 * np.cos(az_vals) ** 2 + expected_data[0, 1] = 1 - np.sin(za_vals) ** 2 * np.sin(az_vals) ** 2 + expected_data[0, 2] = -(np.sin(za_vals) ** 2) * np.sin(2.0 * az_vals) / 2.0 + expected_data[0, 3] = -(np.sin(za_vals) ** 2) * np.sin(2.0 * az_vals) / 2.0 + + np.testing.assert_allclose(power_vals, expected_data, atol=1e-15, rtol=0) + + assert ( + beam.__repr__() == "ShortDipoleBeam(feed_array=array(['e', 'n'], dtype='180 to 0->360 + + return az_array, za_array, freqs + + +@pytest.mark.parametrize( + ["beam_obj", "kwargs"], + [ + [AiryBeam, {"diameter": 14.0}], + [GaussianBeam, {"diameter": 14.0}], + [UniformBeam, {"include_cross_pols": False}], + [ShortDipoleBeam, {}], + ], +) +@pytest.mark.parametrize("init_beam_type", ["efield", "power"]) +@pytest.mark.parametrize("final_beam_type", ["efield", "power"]) +@pytest.mark.parametrize("coord_sys", ["az_za", "healpix"]) +def test_beam_interface( + beam_obj, + kwargs, + init_beam_type, + final_beam_type, + az_za_coords, + xy_grid_coarse, + coord_sys, +): + analytic = beam_obj(**kwargs) + + nfreqs = 20 + freq_array = np.linspace(100e6, 150e6, nfreqs) + + if coord_sys == "healpix": + nside = 64 + if init_beam_type == "efield": + ordering = "ring" + else: + ordering = "nested" + healpix_pixel_array = np.arange(12 * nside**2, dtype=int) + + to_uvbeam_kwargs = { + "nside": nside, + "ordering": ordering, + "healpix_pixel_array": healpix_pixel_array, + } + + try: + from astropy_healpix import HEALPix + + except ImportError: + with pytest.raises( + ImportError, + match="astropy_healpix is not installed but is " + "required for healpix functionality. ", + ): + uvb = analytic.to_uvbeam( + beam_type=init_beam_type, freq_array=freq_array, **to_uvbeam_kwargs + ) + pytest.importorskip("astropy_healpix") + + hp_obj = HEALPix(nside=nside, order=ordering) + hpx_lon, hpx_lat = hp_obj.healpix_to_lonlat(healpix_pixel_array) + + za_array, az_array = utils.coordinates.hpx_latlon_to_zenithangle_azimuth( + hpx_lat.radian, hpx_lon.radian + ) + + # downselect places to check + above_horizon = np.nonzero(za_array <= (np.pi / 2.0)) + az_array = az_array[above_horizon] + za_array = za_array[above_horizon] + az_array = az_array[::10] + za_array = za_array[::10] + else: + az_array, za_array = az_za_coords + to_uvbeam_kwargs = {"axis1_array": az_array, "axis2_array": za_array} + + include_cross_pols = kwargs.get("include_cross_pols", True) + + uvb = analytic.to_uvbeam( + beam_type=init_beam_type, freq_array=freq_array, **to_uvbeam_kwargs + ) + bi_analytic = BeamInterface(analytic, final_beam_type) + + if final_beam_type != init_beam_type: + if final_beam_type == "efield": + with pytest.raises( + ValueError, + match="Input beam is a power UVBeam but beam_type is specified as " + "'efield'. It's not possible to convert a power beam to an " + "efield beam, either provide an efield UVBeam or do not " + "specify `beam_type`.", + ): + BeamInterface(uvb, final_beam_type) + return + + warn_type = UserWarning + msg = ( + "Input beam is an efield UVBeam but beam_type is specified as " + "'power'. Converting efield beam to power." + ) + else: + warn_type = None + msg = "" + + with check_warnings(warn_type, match=msg): + bi_uvbeam = BeamInterface( + uvb, final_beam_type, include_cross_pols=include_cross_pols + ) + + if coord_sys == "az_za": + az_za_grid = True + else: + az_za_grid = False + + analytic_data = bi_analytic.compute_response( + az_array=az_array, + za_array=za_array, + freq_array=freq_array, + az_za_grid=az_za_grid, + ) + + uvb_data = bi_uvbeam.compute_response( + az_array=az_array, + za_array=za_array, + freq_array=freq_array, + az_za_grid=az_za_grid, + ) + + np.testing.assert_allclose(analytic_data, uvb_data, rtol=0, atol=1e-14) + + # now on a grid that is not the same as where the beam was evaluated + # larger differences of course + az_vals, za_vals, freqs = xy_grid_coarse + analytic_data = bi_analytic.compute_response( + az_array=az_vals, za_array=za_vals, freq_array=freqs, az_za_grid=True + ) + uvb_data = bi_uvbeam.compute_response( + az_array=az_vals, za_array=za_vals, freq_array=freqs, az_za_grid=True + ) + + if not (coord_sys == "healpix" and isinstance(analytic, ShortDipoleBeam)): + np.testing.assert_allclose(analytic_data, uvb_data, rtol=0, atol=1e-1) + else: + # the comparison falls apart at zenith because there's no healpix + # pixel right at zenith and the dipole beam changes quickly there. + az_mesh, za_mesh = np.meshgrid(az_vals, za_vals) + az_mesh = az_mesh.flatten() + za_mesh = za_mesh.flatten() + wh_not_zenith = np.nonzero(za_mesh != 0) + np.testing.assert_allclose( + analytic_data[:, :, :, wh_not_zenith], + uvb_data[:, :, :, wh_not_zenith], + rtol=0, + atol=1e-1, + ) + + +def test_beam_interface_errors(): + with pytest.raises( + ValueError, match="beam must be a UVBeam or an AnalyticBeam instance." + ): + BeamInterface("foo", "power") + + +@pytest.mark.parametrize( + ["param", "value"], + [ + ["az_array", None], + ["za_array", None], + ["freq_array", None], + ["az_array", np.zeros((10, 10), dtype=float)], + ["za_array", np.zeros((10, 10), dtype=float)], + ["freq_array", np.zeros((10, 10), dtype=float)], + ], +) +def test_compute_response_errors(param, value): + orig_kwargs = { + "az_array": np.deg2rad(np.linspace(0, 360, 36, endpoint=False)), + "za_array": np.deg2rad(np.linspace(0, 90, 10)), + "freq_array": np.deg2rad(np.linspace(100, 200, 5)), + } + + compute_kwargs = copy.deepcopy(orig_kwargs) + compute_kwargs["az_za_grid"] = True + compute_kwargs[param] = value + + analytic = ShortDipoleBeam() + bi_analytic = BeamInterface(analytic, beam_type="efield") + + with pytest.raises( + ValueError, match=f"{param} must be a one-dimensional numpy array" + ): + bi_analytic.compute_response(**compute_kwargs) + + uvb = analytic.to_uvbeam( + beam_type="power", + freq_array=orig_kwargs["freq_array"], + axis1_array=orig_kwargs["az_array"], + axis2_array=orig_kwargs["za_array"], + ) + bi_uvb = BeamInterface(uvb) + + if param != "freq_array": + with pytest.raises( + ValueError, match=f"{param} must be a one-dimensional numpy array" + ): + bi_uvb.compute_response(**compute_kwargs) + elif value is not None: + with pytest.raises(ValueError, match="freq_array must be one-dimensional"): + bi_uvb.compute_response(**compute_kwargs) + + else: + # this shouldn't error + bi_uvb.compute_response(**compute_kwargs) diff --git a/tests/utils/test_coordinates.py b/tests/utils/test_coordinates.py index ab19b9c6c0..9607511d9d 100644 --- a/tests/utils/test_coordinates.py +++ b/tests/utils/test_coordinates.py @@ -750,6 +750,42 @@ def test_mwa_ecef_conversion(): np.testing.assert_allclose(rot_xyz.T, xyz) +def test_hpx_latlon_az_za(): + zenith_angle = np.deg2rad(np.linspace(0, 90, 10)) + azimuth = np.deg2rad(np.linspace(0, 360, 36, endpoint=False)) + az_mesh, za_mesh = np.meshgrid(azimuth, zenith_angle) + + hpx_lat = np.deg2rad(np.linspace(90, 0, 10)) + hpx_lon = np.deg2rad(np.linspace(0, 360, 36, endpoint=False)) + lon_mesh, lat_mesh = np.meshgrid(hpx_lon, hpx_lat) + + with pytest.raises( + ValueError, match="shapes of zenith_angle and azimuth values must match." + ): + utils.coordinates.zenithangle_azimuth_to_hpx_latlon(zenith_angle, azimuth) + + calc_lat, calc_lon = utils.coordinates.zenithangle_azimuth_to_hpx_latlon( + za_mesh, az_mesh + ) + + print(np.min(calc_lat), np.max(calc_lat)) + print(np.min(lat_mesh), np.max(lat_mesh)) + np.testing.assert_allclose(calc_lat, lat_mesh) + np.testing.assert_allclose(calc_lon, lon_mesh) + + with pytest.raises( + ValueError, match="shapes of hpx_lat and hpx_lon values must match." + ): + utils.coordinates.hpx_latlon_to_zenithangle_azimuth(hpx_lat, hpx_lon) + + calc_za, calc_az = utils.coordinates.hpx_latlon_to_zenithangle_azimuth( + lat_mesh, lon_mesh + ) + + np.testing.assert_allclose(calc_za, za_mesh) + np.testing.assert_allclose(calc_az, az_mesh) + + @pytest.mark.parametrize("err_state", ["err", "warn", "none"]) @pytest.mark.parametrize("tel_loc", ["Center", "Moon", "Earth", "Space"]) @pytest.mark.parametrize("check_frame", ["Moon", "Earth"]) diff --git a/tests/uvbeam/test_initializers.py b/tests/uvbeam/test_initializers.py index ae74bd8e4d..cc86b7f1bd 100644 --- a/tests/uvbeam/test_initializers.py +++ b/tests/uvbeam/test_initializers.py @@ -45,6 +45,11 @@ def uvb_efield_kw(): @pytest.fixture() def uvb_power_kw(): + return {"polarization_array": ["xx", "yy", "xy", "yx"]} + + +@pytest.fixture() +def uvb_power_nocross_kw(): return {"polarization_array": ["xx", "yy"]} @@ -75,25 +80,27 @@ def test_new_uvcal_simplest( uvb_healpix_kw, uvb_efield_kw, uvb_power_kw, + uvb_power_nocross_kw, coord_sys, beam_type, ): if coord_sys == "az_za": - kw_use = {**uvb_common_kw, **uvb_azza_kw} + kw_coord = {**uvb_common_kw, **uvb_azza_kw} else: - kw_use = {**uvb_common_kw, **uvb_healpix_kw} + kw_coord = {**uvb_common_kw, **uvb_healpix_kw} if beam_type == "efield": - kw_use = {**kw_use, **uvb_efield_kw} + kw_use = {**kw_coord, **uvb_efield_kw} else: - kw_use = {**kw_use, **uvb_power_kw} + kw_use = {**kw_coord, **uvb_power_kw} + kw_no_cross_use = {**kw_coord, **uvb_power_nocross_kw} uvb = UVBeam.new(**kw_use) assert uvb.Nfreqs == 10 if beam_type == "efield": assert uvb.Nfeeds == 2 else: - assert uvb.Npols == 2 + assert uvb.Npols == 4 if uvb.pixel_coordinate_system == "healpix": assert uvb.Npixels == 12 * uvb.nside**2 @@ -101,6 +108,10 @@ def test_new_uvcal_simplest( assert uvb.Naxes1 == 360 assert uvb.Naxes2 == 181 + if beam_type == "power": + uvb_no_cross = UVBeam.new(**kw_no_cross_use) + assert uvb_no_cross.Npols == 2 + def test_x_orientation(uvb_azza_efield_kw): uvb_azza_efield_kw["x_orientation"] = "e" @@ -199,6 +210,19 @@ def test_phased_array(phased_array_efield, phased_array_beam_2freq, rm_param): ) +def test_kwargs(uvb_azza_efield_kw): + uvb_azza_efield_kw["reference_impedance"] = 50.0 + + uvb = UVBeam.new(**uvb_azza_efield_kw) + + assert uvb.reference_impedance == 50.0 + + uvb_azza_efield_kw["foo"] = "bar" + + with pytest.raises(ValueError, match="Unrecognized keyword argument: foo"): + UVBeam.new(**uvb_azza_efield_kw) + + def test_no_feed_pol_error(uvb_common_kw): with pytest.raises( ValueError, @@ -250,6 +274,12 @@ def test_freq_array_errors(uvb_azza_efield_kw): UVBeam.new(**uvb_azza_efield_kw) +def test_axis_array_errors(uvb_azza_efield_kw): + uvb_azza_efield_kw["axis1_array"][0] += 0.1 + with pytest.raises(ValueError, match="axis1_array must be regularly spaced"): + UVBeam.new(**uvb_azza_efield_kw) + + def test_data_array_errors(uvb_azza_efield_kw): uvb_azza_efield_kw["data_array"] = (2, 2, 10, 360, 181) with pytest.raises(ValueError, match="data_array must be a numpy ndarray"): diff --git a/tests/uvbeam/test_uvbeam.py b/tests/uvbeam/test_uvbeam.py index 21925f1320..bc775b97c3 100644 --- a/tests/uvbeam/test_uvbeam.py +++ b/tests/uvbeam/test_uvbeam.py @@ -11,8 +11,8 @@ import numpy as np import pytest +import yaml from astropy import units -from astropy.coordinates import Angle from astropy.io import fits from pyuvdata import UVBeam, utils @@ -426,7 +426,9 @@ def test_efield_to_pstokes(cst_efield_2freq_cut, cst_efield_2freq_cut_healpix): # NOTE: So far, the following doesn't hold unless the beams are # peak_normalized again. # This seems to be the fault of interpolation - assert np.allclose(pstokes_beam.data_array, beam_return.data_array, atol=1e-2) + np.testing.assert_allclose( + pstokes_beam.data_array, beam_return.data_array, atol=1e-2 + ) def test_efield_to_pstokes_error(cst_power_2freq_cut): @@ -599,7 +601,9 @@ def test_efield_to_power_crosspol(cst_efield_2freq_cut, tmp_path): new_power_beam = efield_beam.efield_to_power( calc_cross_pols=False, keep_basis_vector=True, inplace=False ) - assert np.allclose(new_power_beam.data_array, np.abs(efield_beam.data_array) ** 2) + np.testing.assert_allclose( + new_power_beam.data_array, np.abs(efield_beam.data_array) ** 2 + ) def test_efield_to_power_errors(cst_efield_2freq_cut, cst_power_2freq_cut): @@ -872,12 +876,12 @@ def test_spatial_interpolation_samepoints( ) interp_data_array = interp_data_array.reshape(uvbeam.data_array.shape, order="F") - assert np.allclose(uvbeam.data_array, interp_data_array) + np.testing.assert_allclose(uvbeam.data_array, interp_data_array) if beam_type == "efield": interp_basis_vector = interp_basis_vector.reshape( uvbeam.basis_vector_array.shape, order="F" ) - assert np.allclose(uvbeam.basis_vector_array, interp_basis_vector) + np.testing.assert_allclose(uvbeam.basis_vector_array, interp_basis_vector) # test error with using an incompatible interpolation function with pytest.raises( @@ -967,7 +971,7 @@ def test_spatial_interpolation_samepoints( interp_data_array = interp_data_array.reshape( data_array_compare.shape, order="F" ) - assert np.allclose(data_array_compare, interp_data_array) + np.testing.assert_allclose(data_array_compare, interp_data_array) @pytest.mark.parametrize("beam_type", ["efield", "power"]) @@ -1065,7 +1069,7 @@ def test_spatial_interpolation_everyother( ) # slightly different interpolation, so not identical. - assert np.allclose(quartic_data_array, orig_data_array, atol=1e-10) + np.testing.assert_allclose(quartic_data_array, orig_data_array, atol=1e-10) assert not np.all(quartic_data_array == orig_data_array) select_data_array_orig, _ = uvbeam.interp( @@ -1082,7 +1086,7 @@ def test_spatial_interpolation_everyother( freq_interp_kind="linear", reuse_spline=True, ) - assert np.allclose(select_data_array_orig, select_data_array_reused) + np.testing.assert_allclose(select_data_array_orig, select_data_array_reused) del uvbeam.saved_interp_functions # test comparison of different interpolation functions @@ -1178,11 +1182,11 @@ def test_spatial_interpolation_errors(interpolation_function, cst_power_2freq_cu freq_array=freq_interp_vals, interpolation_function=interpolation_function, ) - # test errors if positions outside range + # test errors if positions outside range (zenith angle) with pytest.raises( ValueError, - match="at least one interpolation location " - "is outside of the UVBeam pixel coverage.", + match="at least one interpolation location is outside of the UVBeam " + "pixel coverage. The zenith angles values are outside UVBeam coverage.", ): uvbeam.interp( az_array=az_interp_vals, @@ -1190,6 +1194,17 @@ def test_spatial_interpolation_errors(interpolation_function, cst_power_2freq_cu interpolation_function=interpolation_function, ) + with pytest.raises( + ValueError, + match="at least one interpolation location is outside of the UVBeam " + "pixel coverage. The azimuth values are outside UVBeam coverage.", + ): + uvbeam.interp( + az_array=az_interp_vals + np.pi / 2, + za_array=za_interp_vals, + interpolation_function=interpolation_function, + ) + # test no errors only frequency interpolation _, _ = uvbeam.interp( freq_array=freq_interp_vals, @@ -1245,14 +1260,14 @@ def test_interp_longitude_branch_cut(beam_type, cst_efield_2freq, cst_power_2fre beam.Naxes_vec, npol_feed, beam.Nfreqs, 4, beam.Naxes2 ) - assert np.allclose( + np.testing.assert_allclose( interp_data_array[:, :, :, 0, :], interp_data_array[:, :, :, 1, :], rtol=beam._data_array.tols[0], atol=beam._data_array.tols[1], ) - assert np.allclose( + np.testing.assert_allclose( interp_data_array[:, :, :, 2, :], interp_data_array[:, :, :, 3, :], rtol=beam._data_array.tols[0], @@ -1317,8 +1332,9 @@ def test_healpix_interpolation(antenna_type, cst_efield_2freq, phased_array_beam # check that interpolating to existing points gives the same answer hp_obj = HEALPix(nside=hpx_efield_beam.nside) hpx_lon, hpx_lat = hp_obj.healpix_to_lonlat(hpx_efield_beam.pixel_array) - za_orig_vals = (Angle(np.pi / 2, units.radian) - hpx_lat).radian - az_orig_vals = hpx_lon.radian + za_orig_vals, az_orig_vals = utils.coordinates.hpx_latlon_to_zenithangle_azimuth( + hpx_lat.rad, hpx_lon.rad + ) az_orig_vals = az_orig_vals.ravel(order="C") za_orig_vals = za_orig_vals.ravel(order="C") @@ -1329,7 +1345,8 @@ def test_healpix_interpolation(antenna_type, cst_efield_2freq, phased_array_beam ) data_array_compare = hpx_efield_beam.data_array interp_data_array = interp_data_array.reshape(data_array_compare.shape, order="F") - assert np.allclose(data_array_compare, interp_data_array) + + np.testing.assert_allclose(data_array_compare, interp_data_array) # test error with using an incompatible interpolation function with pytest.raises( @@ -1418,7 +1435,7 @@ def test_healpix_interpolation(antenna_type, cst_efield_2freq, phased_array_beam # test no inputs equals same answer interp_data_array2, _ = hpx_efield_beam.interp() - assert np.allclose(interp_data_array, interp_data_array2) + np.testing.assert_allclose(interp_data_array, interp_data_array2) # test errors with specifying healpix_inds without healpix_nside hp_obj = HEALPix(nside=hpx_efield_beam.nside) @@ -1467,7 +1484,7 @@ def test_healpix_interpolation(antenna_type, cst_efield_2freq, phased_array_beam ) data_array_compare = power_beam.data_array interp_data_array = interp_data_array.reshape(data_array_compare.shape, order="F") - assert np.allclose(data_array_compare, interp_data_array) + np.testing.assert_allclose(data_array_compare, interp_data_array) # test that interp to every other point returns an object that matches a select pixel_inds = np.arange(0, power_beam.Npixels, 2) @@ -1485,20 +1502,20 @@ def test_healpix_interpolation(antenna_type, cst_efield_2freq, phased_array_beam interp_data_array2, _ = power_beam.interp( az_array=az_orig_vals, za_array=za_orig_vals ) - assert np.allclose(interp_data_array, interp_data_array2) + np.testing.assert_allclose(interp_data_array, interp_data_array2) # assert not feeding az_array gives same answer interp_data_array2, _ = power_beam.interp( az_array=az_orig_vals, za_array=za_orig_vals ) - assert np.allclose(interp_data_array, interp_data_array2) + np.testing.assert_allclose(interp_data_array, interp_data_array2) # test requesting polarization gives the same answer interp_data_array2, _ = power_beam.interp( az_array=az_orig_vals, za_array=za_orig_vals, polarizations=["yy"] ) - assert np.allclose( + np.testing.assert_allclose( interp_data_array[..., 1:2, :, :], interp_data_array2[..., :1, :, :] ) @@ -1510,7 +1527,7 @@ def test_healpix_interpolation(antenna_type, cst_efield_2freq, phased_array_beam ) data_array_compare = power_beam.data_array interp_data_array = interp_data_array.reshape(data_array_compare.shape, order="F") - assert np.allclose(data_array_compare, interp_data_array) + np.testing.assert_allclose(data_array_compare, interp_data_array) # assert polarization value error with pytest.raises( @@ -1548,8 +1565,9 @@ def test_find_healpix_indices(start, stop, phi_start, phi_end): pixels = np.arange(hp_obj.npix) hpx_lon, hpx_lat = hp_obj.healpix_to_lonlat(pixels) - hpx_theta = (Angle(np.pi / 2, units.radian) - hpx_lat).radian - hpx_phi = hpx_lon.radian + hpx_za, hpx_az = utils.coordinates.hpx_latlon_to_zenithangle_azimuth( + hpx_lat.rad, hpx_lon.rad + ) theta_vals1 = np.linspace(0, np.pi, 5, endpoint=True) theta_vals2 = np.linspace(start, stop, 5, endpoint=True) @@ -1559,16 +1577,16 @@ def test_find_healpix_indices(start, stop, phi_start, phi_end): inds_to_use1 = _uvbeam.find_healpix_indices( np.ascontiguousarray(theta_vals1, dtype=np.float64), np.ascontiguousarray(phi_vals, dtype=np.float64), - np.ascontiguousarray(hpx_theta, dtype=np.float64), - np.ascontiguousarray(hpx_phi, dtype=np.float64), + np.ascontiguousarray(hpx_za, dtype=np.float64), + np.ascontiguousarray(hpx_az, dtype=np.float64), np.float64(hp_obj.pixel_resolution.to_value(units.radian)), ) inds_to_use2 = _uvbeam.find_healpix_indices( np.ascontiguousarray(theta_vals2, dtype=np.float64), np.ascontiguousarray(phi_vals, dtype=np.float64), - np.ascontiguousarray(hpx_theta, dtype=np.float64), - np.ascontiguousarray(hpx_phi, dtype=np.float64), + np.ascontiguousarray(hpx_za, dtype=np.float64), + np.ascontiguousarray(hpx_az, dtype=np.float64), np.float64(hp_obj.pixel_resolution.to_value(units.radian)), ) @@ -2673,23 +2691,23 @@ def test_beam_area_healpix(cst_power_1freq_cut_healpix, cst_efield_1freq_cut_hea d_omega = hp_obj.pixel_area.to_value("steradian") npix = healpix_norm.Npixels healpix_norm.data_array = np.ones_like(healpix_norm.data_array) - assert np.allclose( + np.testing.assert_allclose( np.sum(healpix_norm.get_beam_area(pol="xx")), numfreqs * npix * d_omega ) healpix_norm.data_array = 2.0 * np.ones_like(healpix_norm.data_array) - assert np.allclose( + np.testing.assert_allclose( np.sum(healpix_norm.get_beam_sq_area(pol="xx")), numfreqs * 4.0 * npix * d_omega ) # check XX and YY beam areas work and match to within 5 sigfigs xx_area = healpix_norm.get_beam_area("XX") xx_area = healpix_norm.get_beam_area("xx") - assert np.allclose(xx_area, xx_area) + np.testing.assert_allclose(xx_area, xx_area) yy_area = healpix_norm.get_beam_area("YY") - assert np.allclose(yy_area / xx_area, np.ones(numfreqs)) + np.testing.assert_allclose(yy_area / xx_area, np.ones(numfreqs)) xx_area = healpix_norm.get_beam_sq_area("XX") yy_area = healpix_norm.get_beam_sq_area("YY") - assert np.allclose(yy_area / xx_area, np.ones(numfreqs)) + np.testing.assert_allclose(yy_area / xx_area, np.ones(numfreqs)) # Check that if pseudo-Stokes I (pI) is in the beam polarization_array it # just uses it @@ -2725,7 +2743,7 @@ def test_beam_area_healpix(cst_power_1freq_cut_healpix, cst_efield_1freq_cut_hea XY_area = healpix_norm_fullpol.get_beam_sq_area("XY") YX_area = healpix_norm_fullpol.get_beam_sq_area("YX") # check if XY beam area is equal to beam YX beam area - assert np.allclose(XY_area, YX_area) + np.testing.assert_allclose(XY_area, YX_area, rtol=0, atol=1e-9) # check if XY/YX beam area is less than XX/YY beam area assert np.all(np.less(XY_area, xx_area)) assert np.all(np.less(XY_area, yy_area)) @@ -2765,8 +2783,8 @@ def test_beam_area_healpix(cst_power_1freq_cut_healpix, cst_efield_1freq_cut_hea I_area = efield_beam.get_beam_area("I") pI_area = efield_beam.get_beam_area("pI") area1 = efield_beam.get_beam_area(1) - assert np.allclose(I_area, pI_area) - assert np.allclose(I_area, area1) + np.testing.assert_allclose(I_area, pI_area) + np.testing.assert_allclose(I_area, area1) # check efield beam type is accepted for pseudo-stokes and power for # linear polarizations @@ -2961,3 +2979,108 @@ def test_from_file(filename): assert uvb.check() assert uvb2.check() assert uvb == uvb2 + + +@pytest.mark.parametrize( + ["filename", "path_var", "file_list"], + [ + [cst_yaml_file, True, True], + [mwa_beam_file, False, False], + [casa_beamfits, False, True], + ], +) +def test_yaml_constructor(filename, path_var, file_list): + if path_var: + input_yaml = f""" + beam: !UVBeam + filename: {filename} + run_check: False + """ + else: + fname_use = filename[len(DATA_PATH) + 1 :] + input_yaml = f""" + beam: !UVBeam + filename: {fname_use} + path_variable: pyuvdata.data.DATA_PATH + run_check: False + """ + + beam_from_yaml = yaml.safe_load(input_yaml)["beam"] + + # don't run checks because of casa_beamfits, we'll do that later + uvb = UVBeam.from_file(filename, run_check=False) + # hera casa beam is missing some parameters but we just want to check + # that reading is going okay + if filename == casa_beamfits: + # fill in missing parameters + for _uvb in [uvb, beam_from_yaml]: + _uvb.data_normalization = "peak" + _uvb.feed_name = "casa_ideal" + _uvb.feed_version = "v0" + _uvb.model_name = "casa_airy" + _uvb.model_version = "v0" + + # this file is actually in an orthoslant projection RA/DEC at zenith at a + # particular time. + # For now pretend it's in a zenith orthoslant projection + _uvb.pixel_coordinate_system = "orthoslant_zenith" + # double check the files are valid + assert uvb.check() + assert beam_from_yaml.check() + assert uvb == beam_from_yaml + + err_msg = ( + "all entries in the filename parameter must be existing files to be " + "able to represent it in a yaml." + ) + with pytest.raises(ValueError, match=err_msg): + output_yaml = yaml.safe_dump({"beam": uvb}) + + if file_list: + uvb.filename = [filename] + else: + uvb.filename = filename + output_yaml = yaml.safe_dump({"beam": uvb}) + try: + new_beam_from_yaml = yaml.safe_load(output_yaml)["beam"] + + assert new_beam_from_yaml == beam_from_yaml + except ValueError: + # get here for the ill-defined casa beam. just test the null filename case + + uvb.filename = None + with pytest.raises( + ValueError, + match="beam must have a filename defined to be able to represent it " + "in a yaml.", + ): + output_yaml = yaml.safe_dump({"beam": uvb}) + + +def test_yaml_constructor_errors(): + fname_use = cst_yaml_file[len(DATA_PATH) + 1 :] + input_yaml = f""" + beam: !UVBeam + filename: {fname_use} + path_variable: DATA_PATH + run_check: False + """ + + with pytest.raises( + ValueError, + match="If 'path_variable' is specified, it should take the form of a " + "module.variable_name where the variable name can be imported " + "from the module.", + ): + yaml.safe_load(input_yaml)["beam"] + + input_yaml = """ + beam: !UVBeam + path_variable: DATA_PATH + run_check: False + """ + + with pytest.raises( + ValueError, match="yaml entries for UVBeam must specify a filename." + ): + yaml.safe_load(input_yaml)["beam"]