Skip to content

Commit

Permalink
Merge pull request #81 from p-slash/tile-format
Browse files Browse the repository at this point in the history
Redesign to read tile format
  • Loading branch information
p-slash authored Oct 11, 2023
2 parents 8552d2c + a5cebb1 commit 7713b57
Show file tree
Hide file tree
Showing 15 changed files with 425 additions and 327 deletions.
17 changes: 10 additions & 7 deletions docs/source/examples/simple_coadd_showcase.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ to read all arms (B, R, Z), but will not read the resolution matrix.
is_mock = False
skip_resomat = True
# Setup reader function
readerFunction = qsonic.io.get_spectra_reader_function(
indir, arms, is_mock, skip_resomat,
read_true_continuum=False, is_tile=False)
First, we read the catalog. Since ``qsonic`` sorts this the catalog by
HPXPIXEL, we can find the unique healpix values and split the catalog
into healpix groups. For example purposes, we are picking a single healpix and
Expand All @@ -28,19 +33,17 @@ reading all the quasar spectra in that file.
.. code:: python3
catalog = qsonic.catalog.read_quasar_catalog(fname)
# Group into unique pixels
unique_pix, s = np.unique(catalog['HPXPIXEL'], return_index=True)
split_catalog = np.split(catalog, s[1:])
# Pick one healpix to illustrate
hpx_cat = split_catalog[0]
healpix = hpx_cat['HPXPIXEL'][0]
spectra_by_hpx = qsonic.io.read_spectra_onehealpix(
hpx_cat, indir, arms, is_mock, skip_resomat
)
spectra_by_hpx = readerFunction(hpx_cat)
print(f"There are {len(spectra_by_hpx)} spectra in healpix {healpix}.")
Expand Down
10 changes: 6 additions & 4 deletions docs/source/quick_start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ Here's an example code snippet to use IO interface.
is_mock = False
skip_resomat = True
# Setup reader function
readerFunction = qsonic.io.get_spectra_reader_function(
indir, arms, is_mock, skip_resomat,
read_true_continuum=False, is_tile=False)
w1 = 3600.
w2 = 6000.
fw1 = 1050.
fw2 = 1180.
remove_nonforest_pixels = True # Less memory use
catalog = qsonic.catalog.read_quasar_catalog(fname)
Expand All @@ -56,9 +60,7 @@ Here's an example code snippet to use IO interface.
for hpx_cat in split_catalog:
healpix = hpx_cat['HPXPIXEL'][0]
spectra_by_hpx = qsonic.io.read_spectra_onehealpix(
hpx_cat, indir, arms, is_mock, skip_resomat
)
spectra_by_hpx = readerFunction(hpx_cat)
# Do stuff with spectra in this healpix
...
2 changes: 1 addition & 1 deletion py/qsonic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__author__ = 'Naim Goksel Karacayli'
__email__ = '[email protected]'
__version__ = '0.7.8'
__version__ = '0.8.0'


class QsonicException(Exception):
Expand Down
143 changes: 107 additions & 36 deletions py/qsonic/catalog.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
""" This module contains functions to read, validate and make necessary changes
to DESI quasar catalogs. Can be imported without a need for MPI. """

import logging
import warnings

import fitsio
from healpy import ang2pix
import numpy as np
from numpy.lib.recfunctions import rename_fields, append_fields

from qsonic.mpi_utils import logging_mpi, balance_load, mpi_fnc_bcast
from qsonic import QsonicException
from qsonic.mpi_utils import balance_load, mpi_fnc_bcast

_accepted_extnames = set(['QSO_CAT', 'ZCATALOG', 'METADATA'])
"""set: Accepted extensions for quasar catalog."""
Expand All @@ -22,25 +24,31 @@
set(['COADD_LASTNIGHT', 'LAST_NIGHT', 'LASTNIGHT'])
]
"""list(set): Required columns for real data analysis."""
_required_tile_columns = [set(['TILEID']), set(['PETAL_LOC'])]
"""list(set): Required columns for tile data analysis."""
_optional_columns = [
'HPXPIXEL', 'VMIN_CIV_450', 'VMAX_CIV_450', 'VMIN_CIV_2000',
'VMAX_CIV_2000'
]
"""list(str): Optional columns."""
_all_columns = [
col for reqset in (
_required_columns + _required_data_columns
_required_columns + _required_data_columns + _required_tile_columns
) for col in reqset
] + _optional_columns


def read_quasar_catalog(
filename, is_mock=False, keep_surveys=None,
zmin=0, zmax=100.0):
filename, is_mock=False, is_tile=False, keep_surveys=None,
zmin=0, zmax=100.0
):
""" Returns a quasar catalog object (ndarray).
It is sorted in the following order: HPXPIXEL, SURVEY (if applicable),
TARGETID. BAL info included if available. It is required for BAL masking.
It is sorted in the following order:
1. HPXPIXEL or TILEID
2. SURVEY or PETAL_LOC (if applicable),
3. TARGETID.
BAL info included if available. It is required for BAL masking.
If 'HPXPIXEL' column is not present, n_side is assumed 16 for mocks, 64 for
data.
Expand All @@ -50,6 +58,9 @@ def read_quasar_catalog(
Filename to catalog.
is_mock: bool, default: False
If the catalog is for mocks.
is_tile: bool, default: False
If the catalog is for tiles. Duplicate TARGETIDs are allowed as long as
they are in different TILEIDs.
keep_surveys: None or list(str), default: None
List of surveys to subselect. None keeps all.
zmin: float, default: 0
Expand All @@ -64,14 +75,15 @@ def read_quasar_catalog(
"""
n_side = 16 if is_mock else 64
catalog = _read(filename)
catalog = _validate_adjust_column_names(catalog, is_mock)
catalog = _prime_catalog(catalog, n_side, keep_surveys, zmin, zmax)
catalog = _validate_adjust_column_names(catalog, is_mock, is_tile)
catalog = _prime_catalog(
catalog, n_side, keep_surveys, zmin, zmax, is_tile)

return catalog


def mpi_read_quasar_catalog(
filename, comm=None, mpi_rank=0, is_mock=False,
filename, comm=None, mpi_rank=0, is_mock=False, is_tile=False,
keep_surveys=None, zmin=0, zmax=100
):
""" Returns the same quasar catalog object on all MPI ranks.
Expand All @@ -91,6 +103,8 @@ def mpi_read_quasar_catalog(
Rank of the MPI process
is_mock: bool, default: False
If the catalog is for mocks.
is_tile: bool, default: False
If the catalog is for tiles. Duplicate TARGETIDs are allowed.
keep_surveys: None or list(str), default: None
List of surveys to subselect. None keeps all.
zmin: float, default: 0
Expand All @@ -111,23 +125,39 @@ def mpi_read_quasar_catalog(
catalog = mpi_fnc_bcast(
read_quasar_catalog,
comm, mpi_rank, "Error while reading catalog.",
filename, is_mock, keep_surveys, zmin, zmax)
filename, is_mock, is_tile, keep_surveys, zmin, zmax)

return catalog


def mpi_get_local_queue(catalog, mpi_rank, mpi_size):
""" Take a 'HPXPIXEL' sorted `catalog` and assign a list of catalogs to
mpi_rank
def mpi_get_local_queue(
filename, comm=None, mpi_rank=0, mpi_size=1, is_mock=False,
is_tile=False, keep_surveys=None, zmin=0, zmax=100.0
):
"""Reads catalog on master and scatter a list of catalogs to every
mpi_rank. If in tile format, sort key is 'TILEID'.
Arguments
----------
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
'HPXPIXEL' sorted catalog.
filename: str
Filename to catalog.
comm: MPI comm object or None, default: None
MPI comm object for scatter
mpi_rank: int
Rank of the MPI process
mpi_size: int
Size of MPI processes
is_mock: bool, default: False
If the catalog is for mocks.
is_tile: bool, default: False
If the catalog is for tiles. Split key will be 'TILEID'. Duplicate
TARGETIDs are allowed.
keep_surveys: None or list(str), default: None
List of surveys to subselect. None keeps all.
zmin: float, default: 0
Minimum quasar redshift
zmax: float, default: 100
Maximum quasar redshift
Returns
----------
Expand All @@ -136,16 +166,38 @@ def mpi_get_local_queue(catalog, mpi_rank, mpi_size):
"""
# We decide forest filename list
# Group into unique pixels
unique_pix, s = np.unique(catalog['HPXPIXEL'], return_index=True)
split_catalog = np.split(catalog, s[1:])
logging_mpi(
f"There are {unique_pix.size} healpixels."
" Don't use more MPI processes.", mpi_rank)

# Roughly equal number of spectra
logging_mpi("Load balancing.", mpi_rank)
# Returns a list of catalog (ndarray)
return balance_load(split_catalog, mpi_size, mpi_rank)
if mpi_rank == 0:
try:
catalog = read_quasar_catalog(
filename, is_mock, is_tile, keep_surveys, zmin, zmax)
status = True
except Exception as e:
logging.exception(e)
status = False
else:
status = False

status = comm.bcast(status)
if not status:
raise QsonicException("Error while reading catalog.")

if mpi_rank == 0:
split_key = "TILEID" if is_tile else "HPXPIXEL"
unique_pix, s = np.unique(catalog[split_key], return_index=True)
split_catalog = np.split(catalog, s[1:])
logging.info(
f"There are {unique_pix.size} keys ({split_key})."
" Don't use more MPI processes.")

# Roughly equal number of spectra
logging.info("Load balancing.")
# Returns a list of catalog (ndarray)
local_queue = balance_load(split_catalog, mpi_size)
else:
local_queue = None

return comm.scatter(local_queue)


def _check_required_columns(required_cols, colnames):
Expand All @@ -171,7 +223,7 @@ def _check_required_columns(required_cols, colnames):
f"{', '.join(reqset)}!")


def _validate_adjust_column_names(catalog, is_mock):
def _validate_adjust_column_names(catalog, is_mock, is_tile):
""" Validate `catalog` for required columns in `_required_columns`.
'SURVEY' is also required for data. 'TARGET_{RA, DEC}' is transformed to
'RA' and 'DEC' in return.
Expand All @@ -182,6 +234,9 @@ def _validate_adjust_column_names(catalog, is_mock):
Catalog.
is_mock: bool
If the catalog is for mocks, does not perform 'SURVEY' check.
is_tile: bool
If the catalog is for tiles, duplicate TARGETIDs are allowed as long as
they are in different TILEIDs.
Returns
----------
Expand All @@ -193,12 +248,19 @@ def _validate_adjust_column_names(catalog, is_mock):
_check_required_columns(_required_columns, colnames)
if not is_mock:
_check_required_columns(_required_data_columns, colnames)
if is_tile:
_check_required_columns(_required_tile_columns, colnames)

logging_mpi(f"There are {catalog.size} quasars in the catalog.", 0)
nqso = catalog.size
logging.info(f"There are {nqso} quasars in the catalog.")

if catalog.size != np.unique(catalog['TARGETID']).size:
if not is_tile and nqso != np.unique(catalog['TARGETID']).size:
raise Exception("There are duplicate TARGETIDs in catalog!")

if is_tile and nqso != np.unique(catalog[['TARGETID', 'TILEID']]).size:
raise Exception(
"There are duplicate TARGETID - TILEID combinations in catalog!")

# Adjust column names
colname_map = {}
for x in ['RA', 'DEC']:
Expand Down Expand Up @@ -228,7 +290,7 @@ def _read(filename):
catalog: :external+numpy:py:class:`ndarray <numpy.ndarray>`
Catalog. No checks are performed.
"""
logging_mpi(f'Reading catalogue from {filename}', 0)
logging.info(f'Reading catalogue from {filename}')
fitsfile = fitsio.FITS(filename)
extnames = [hdu.get_extname() for hdu in fitsfile]
cat_hdu = _accepted_extnames.intersection(extnames)
Expand Down Expand Up @@ -268,14 +330,17 @@ def _add_healpix(catalog, n_side, keep_columns):
if 'HPXPIXEL' not in keep_columns:
pixnum = ang2pix(
n_side, catalog['RA'], catalog['DEC'], lonlat=True, nest=True)
catalog = append_fields(catalog, 'HPXPIXEL', pixnum, dtypes=int)
catalog = append_fields(
catalog, 'HPXPIXEL', pixnum, dtypes=int, usemask=False)

return catalog


def _prime_catalog(catalog, n_side, keep_surveys, zmin, zmax):
def _prime_catalog(
catalog, n_side, keep_surveys, zmin, zmax, is_tile
):
""" Returns quasar catalog object. It is sorted in the following order:
HPXPIXEL, SURVEY (if applicable), TARGETID
sort_key (HPXPIXEL), SURVEY (if applicable), TARGETID
Arguments
----------
Expand All @@ -289,6 +354,8 @@ def _prime_catalog(catalog, n_side, keep_surveys, zmin, zmax):
Minimum quasar redshift
zmax: float
Maximum quasar redshift
is_tile: bool
If the catalog is for tiles. Sort oder will be TILEID, PETAL_LOC.
Returns
----------
Expand All @@ -299,19 +366,23 @@ def _prime_catalog(catalog, n_side, keep_surveys, zmin, zmax):

# Redshift cuts
w = (catalog['Z'] >= zmin) & (catalog['Z'] <= zmax)
logging_mpi(f"There are {w.sum()} quasars in the redshift range.", 0)
logging.info(f"There are {w.sum()} quasars in the redshift range.")
catalog = catalog[w]

sort_order = ['HPXPIXEL', 'TARGETID']
if is_tile:
sort_order = ['TILEID', 'PETAL_LOC', 'TARGETID']
else:
sort_order = ["HPXPIXEL", 'TARGETID']

# Filter all the objects in the catalogue not belonging to the specified
# surveys.
if 'SURVEY' in colnames and keep_surveys is not None:
w = np.isin(catalog["SURVEY"], keep_surveys)
catalog = catalog[w]
if len(keep_surveys) > 1:
if len(keep_surveys) > 1 and not is_tile:
sort_order.insert(1, 'SURVEY')
logging_mpi(
f"There are {w.sum()} quasars in given surveys {keep_surveys}.", 0)
logging.info(
f"There are {w.sum()} quasars in given surveys {keep_surveys}.")

if catalog.size == 0:
raise Exception("Empty quasar catalogue.")
Expand Down
Loading

0 comments on commit 7713b57

Please sign in to comment.