diff --git a/pipelines/coaddQualityExtended.yaml b/pipelines/coaddQualityExtended.yaml new file mode 100644 index 0000000..2b11afb --- /dev/null +++ b/pipelines/coaddQualityExtended.yaml @@ -0,0 +1,45 @@ +description: | + Tier2 atools and metrics to assess coadd quality +tasks: + analyzeObjectTableExtended: + class: lsst.analysis.tools.tasks.ObjectTableTractAnalysisTask + config: + connections.data: objectTable_tract_multiprofit + connections.outputName: objectTableMultiprofitExtended + # set plots to run + atools.serReffVsMag: SizeMagnitudePlot + atools.serReffVsMag.fluxes_default.cmodel_err: model_ser + atools.serReffVsMag.sizes_default.shape_slot: sizes_ser + atools.serReffVsMag.config_moments: moments_ser + atools.serReffVsMag.size_type: "determinantRadius" + atools.serReffVsMag.mag_x: "cmodel_err" + atools.serReffVsMag.size_y: "shape_slot" + atools.serReffVsMag.is_covariance: False + atools.serReffVsMag.applyContext: CoaddContext + atools.serReffVsMag.prep.selectors.flagSelector.selectWhenFalse: flags_ser + atools.serReffVsMag.prep.selectors.flagSelector.selectWhenTrue: [] + atools.serReffVsMag.produce.plot.xLims: (17, 29) + atools.serReffVsMag.produce.plot.yLims: (-4, 3) + python: | + from lsst.analysis.tools.atools import * + from lsst.analysis.tools.contexts import * + from lsst.analysis.tools.atools.genericBuild import FluxConfig, MomentsConfig, SizeConfig + + models_mag = {} + flags_model = {} + for name_model, label_model in (("ser", "MPF Ser"), ("expdev", "MPF Exp+Dev")): + models_mag[name_model] = FluxConfig( + key_flux = f"mpf_{name_model}_ser_{{band}}_flux", + key_flux_error = f"mpf_{name_model}_ser_{{band}}_flux_err", + name_flux = label_model, + ) + flags_model[name_model] = [ + 'mpf_ser_fixedcen_unknown_flag', + 'mpf_ser_fixedcen_is_parent_flag', + 'mpf_ser_fixedcen_not_primary_flag', + 'mpf_ser_fixedcen_psf_fit_flag', + ] + flags_ser = flags_model["ser"] + model_ser = models_mag["ser"] + sizes_ser = SizeConfig(key_size="mpf_ser_ser_{suffix}", name_size="Sersic $R_{eff}$") + moments_ser = MomentsConfig(xx="reff_x", yy="reff_y", xy="rho") diff --git a/pipelines/fit.yaml b/pipelines/fit.yaml index 4b03af5..8e85dca 100644 --- a/pipelines/fit.yaml +++ b/pipelines/fit.yaml @@ -7,120 +7,255 @@ tasks: python: | from lsst.meas.extensions.multiprofit.fit_coadd_psf import MultiProFitPsfTask config.fit_coadd_psf.retarget(MultiProFitPsfTask) + config.fit_coadd_psf.config_fit.eval_residual = False fit_src_psgauss_multiprofit: class: lsst.pipe.tasks.fit_coadd_multiband.CoaddMultibandFitTask config: connections.cat_output: deepCoadd_psgauss_multiprofit python: | from lsst.meas.extensions.multiprofit.fit_coadd_multiband import MultiProFitSourceTask - from lsst.multiprofit.componentconfig import SersicConfig, SersicIndexConfig + from lsst.multiprofit.componentconfig import ( + GaussianComponentConfig, ParameterConfig, SersicComponentConfig, SersicIndexParameterConfig, + ) + from lsst.multiprofit.modelconfig import ModelConfig + from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig config.fit_coadd_multiband.retarget(MultiProFitSourceTask) - config.fit_coadd_multiband.n_pointsources = 1 - config.fit_coadd_multiband.sersics = { - "gauss": SersicConfig( - sersicindex=SersicIndexConfig(fixed=True, value_initial=0.5), - prior_axrat_stddev=0.8, - prior_size_stddev=0.3, - ) - } + config.fit_coadd_multiband.config_model = ModelConfig( + sources={"": SourceConfig(component_groups={"": ComponentGroupConfig( + components_gauss={ + "ps": GaussianComponentConfig( + size_x=ParameterConfig(value_initial=0., fixed=True), + size_y=ParameterConfig(value_initial=0., fixed=True), + rho=ParameterConfig(value_initial=0., fixed=True), + ), + }, + # It could be in components_gauss, but this keeps the size field + # conveniently named reff for consistency (and uses a Gaussian + # component under the hood anyway). + components_sersic={ + "gauss": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + sersic_index=SersicIndexParameterConfig(value_initial=0.5, fixed=True), + ), + }, + )})} + ) fit_src_psexp_multiprofit: class: lsst.pipe.tasks.fit_coadd_multiband.CoaddMultibandFitTask config: connections.cat_output: deepCoadd_psexp_multiprofit python: | from lsst.meas.extensions.multiprofit.fit_coadd_multiband import MultiProFitSourceTask - from lsst.multiprofit.componentconfig import SersicConfig, SersicIndexConfig + from lsst.multiprofit.componentconfig import ( + GaussianComponentConfig, ParameterConfig, SersicComponentConfig, SersicIndexParameterConfig, + ) + from lsst.multiprofit.modelconfig import ModelConfig + from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig config.fit_coadd_multiband.retarget(MultiProFitSourceTask) - config.fit_coadd_multiband.n_pointsources = 1 - config.fit_coadd_multiband.sersics = { - "exp": SersicConfig( - sersicindex=SersicIndexConfig(fixed=True, value_initial=1.0), - prior_axrat_stddev=0.8, - prior_size_stddev=0.3, - ) - } + config.fit_coadd_multiband.config_model = ModelConfig( + sources={"": SourceConfig(component_groups={"": ComponentGroupConfig( + components_gauss={ + "ps": GaussianComponentConfig( + size_x=ParameterConfig(value_initial=0., fixed=True), + size_y=ParameterConfig(value_initial=0., fixed=True), + rho=ParameterConfig(value_initial=0., fixed=True), + ) + }, + components_sersic={ + "exp": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + sersic_index=SersicIndexParameterConfig(value_initial=1., fixed=True), + ), + }, + )})} + ) fit_src_exp_multiprofit: class: lsst.pipe.tasks.fit_coadd_multiband.CoaddMultibandFitTask config: connections.cat_output: deepCoadd_exp_multiprofit python: | from lsst.meas.extensions.multiprofit.fit_coadd_multiband import MultiProFitSourceTask - from lsst.multiprofit.componentconfig import SersicConfig, SersicIndexConfig + from lsst.multiprofit.componentconfig import ( + GaussianComponentConfig, ParameterConfig, SersicComponentConfig, SersicIndexParameterConfig, + ) + from lsst.multiprofit.modelconfig import ModelConfig + from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig config.fit_coadd_multiband.retarget(MultiProFitSourceTask) - config.fit_coadd_multiband.sersics = { - "exp": SersicConfig( - sersicindex=SersicIndexConfig(fixed=True, value_initial=1.0), - prior_axrat_stddev=0.8, - prior_size_stddev=0.3, - ) - } + config.fit_coadd_multiband.config_model = ModelConfig( + sources={"": SourceConfig(component_groups={"": ComponentGroupConfig( + components_sersic={ + "exp": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + sersic_index=SersicIndexParameterConfig(value_initial=1., fixed=True), + ), + }, + )})} + ) fit_src_exp_fixedcen_multiprofit: class: lsst.pipe.tasks.fit_coadd_multiband.CoaddMultibandFitTask config: connections.cat_output: deepCoadd_exp_fixedcen_multiprofit python: | from lsst.meas.extensions.multiprofit.fit_coadd_multiband import MultiProFitSourceTask - from lsst.multiprofit.componentconfig import SersicConfig, SersicIndexConfig + from lsst.multiprofit.componentconfig import ( + CentroidConfig, ParameterConfig, SersicComponentConfig, SersicIndexParameterConfig, + ) + from lsst.multiprofit.modelconfig import ModelConfig + from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig config.fit_coadd_multiband.retarget(MultiProFitSourceTask) - config.fit_coadd_multiband.fit_cen_x = False - config.fit_coadd_multiband.fit_cen_y = False - config.fit_coadd_multiband.sersics = { - "exp": SersicConfig( - sersicindex=SersicIndexConfig(fixed=True, value_initial=1.0), - prior_axrat_stddev=0.8, - prior_size_stddev=0.3, - ) - } + config.fit_coadd_multiband.config_model = ModelConfig( + sources={"": SourceConfig(component_groups={"": ComponentGroupConfig( + centroids={"default": CentroidConfig( + x=ParameterConfig(fixed=True), + y=ParameterConfig(fixed=True), + )}, + components_sersic={ + "exp": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + sersic_index=SersicIndexParameterConfig(value_initial=1., fixed=True), + ), + }, + )})} + ) + fit_src_ser_fixedcen_multiprofit: + class: lsst.pipe.tasks.fit_coadd_multiband.CoaddMultibandFitTask + config: + connections.cat_output: deepCoadd_ser_fixedcen_multiprofit + python: | + from lsst.meas.extensions.multiprofit.fit_coadd_multiband import MultiProFitSourceTask + from lsst.multiprofit.componentconfig import ( + CentroidConfig, ParameterConfig, SersicComponentConfig + ) + from lsst.multiprofit.modelconfig import ModelConfig + from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig + config.fit_coadd_multiband.retarget(MultiProFitSourceTask) + config.fit_coadd_multiband.config_model = ModelConfig( + sources={"": SourceConfig(component_groups={"": ComponentGroupConfig( + centroids={"default": CentroidConfig( + x=ParameterConfig(fixed=True), + y=ParameterConfig(fixed=True), + )}, + components_sersic={ + "ser": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + ), + }, + )})} + ) + fit_src_expdev_multiprofit: + class: lsst.pipe.tasks.fit_coadd_multiband.CoaddMultibandFitTask + config: + connections.cat_output: deepCoadd_expdev_multiprofit + python: | + from lsst.meas.extensions.multiprofit.fit_coadd_multiband import MultiProFitSourceTask + from lsst.multiprofit.componentconfig import ( + GaussianComponentConfig, ParameterConfig, SersicComponentConfig, SersicIndexParameterConfig, + ) + from lsst.multiprofit.modelconfig import ModelConfig + from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig + config.fit_coadd_multiband.retarget(MultiProFitSourceTask) + config.fit_coadd_multiband.config_model = ModelConfig( + sources={"": SourceConfig(component_groups={"": ComponentGroupConfig( + components_sersic={ + "exp": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + sersic_index=SersicIndexParameterConfig(value_initial=1., fixed=True), + ), + "dev": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + sersic_index=SersicIndexParameterConfig(value_initial=4., fixed=True), + ), + }, + )})} + ) fit_src_ser_multiprofit: class: lsst.pipe.tasks.fit_coadd_multiband.CoaddMultibandFitTask config: connections.cat_output: deepCoadd_ser_multiprofit python: | from lsst.meas.extensions.multiprofit.fit_coadd_multiband import MultiProFitSourceTask - from lsst.multiprofit.componentconfig import SersicConfig, SersicIndexConfig + from lsst.multiprofit.componentconfig import (ParameterConfig, SersicComponentConfig) + from lsst.multiprofit.modelconfig import ModelConfig + from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig config.fit_coadd_multiband.retarget(MultiProFitSourceTask) - config.fit_coadd_multiband.n_pointsources = 0 - config.fit_coadd_multiband.sersics = { - "ser": SersicConfig( - sersicindex=SersicIndexConfig(fixed=False, value_initial=1.0), - prior_axrat_stddev=0.8, - prior_size_stddev=0.3, - ) - } + config.fit_coadd_multiband.config_model = ModelConfig( + sources={"": SourceConfig(component_groups={"": ComponentGroupConfig( + components_sersic={ + "ser": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + ), + }, + )})} + ) fit_src_psser_multiprofit: class: lsst.pipe.tasks.fit_coadd_multiband.CoaddMultibandFitTask config: connections.cat_output: deepCoadd_psser_multiprofit python: | from lsst.meas.extensions.multiprofit.fit_coadd_multiband import MultiProFitSourceTask - from lsst.multiprofit.componentconfig import SersicConfig, SersicIndexConfig + from lsst.multiprofit.componentconfig import ( + GaussianComponentConfig, ParameterConfig, SersicComponentConfig, + ) + from lsst.multiprofit.modelconfig import ModelConfig + from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig config.fit_coadd_multiband.retarget(MultiProFitSourceTask) - config.fit_coadd_multiband.n_pointsources = 1 - config.fit_coadd_multiband.sersics = { - "ser": SersicConfig( - sersicindex=SersicIndexConfig(fixed=False, value_initial=1.63), - prior_axrat_stddev=0.8, - prior_size_stddev=0.3, - ) - } + config.fit_coadd_multiband.config_model = ModelConfig( + sources={"": SourceConfig(component_groups={"": ComponentGroupConfig( + components_gauss={ + "ps": GaussianComponentConfig( + size_x=ParameterConfig(value_initial=0., fixed=True), + size_y=ParameterConfig(value_initial=0., fixed=True), + rho=ParameterConfig(value_initial=0., fixed=True), + ) + }, + components_sersic={ + "ser": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + ), + }, + )})} + ) fit_src_psexpdev_multiprofit: class: lsst.pipe.tasks.fit_coadd_multiband.CoaddMultibandFitTask config: connections.cat_output: deepCoadd_psexpdev_multiprofit python: | from lsst.meas.extensions.multiprofit.fit_coadd_multiband import MultiProFitSourceTask - from lsst.multiprofit.componentconfig import SersicConfig, SersicIndexConfig + from lsst.multiprofit.componentconfig import ( + GaussianComponentConfig, ParameterConfig, SersicComponentConfig, SersicIndexParameterConfig, + ) + from lsst.multiprofit.modelconfig import ModelConfig + from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig config.fit_coadd_multiband.retarget(MultiProFitSourceTask) - config.fit_coadd_multiband.n_pointsources = 1 - config.fit_coadd_multiband.sersics = { - "exp": SersicConfig( - sersicindex=SersicIndexConfig(fixed=True, value_initial=1.0), - prior_axrat_stddev=0.8, - prior_size_stddev=0.3, - ), - "dev": SersicConfig( - sersicindex=SersicIndexConfig(fixed=True, value_initial=4.0), - prior_axrat_stddev=0.8, - prior_size_stddev=0.3, - ) - } + config.fit_coadd_multiband.config_model = ModelConfig( + sources={"": SourceConfig(component_groups={"": ComponentGroupConfig( + components_gauss={ + "ps": GaussianComponentConfig( + size_x=ParameterConfig(value_initial=0., fixed=True), + size_y=ParameterConfig(value_initial=0., fixed=True), + rho=ParameterConfig(value_initial=0., fixed=True), + ) + }, + components_sersic={ + "exp": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + sersic_index=SersicIndexParameterConfig(value_initial=1., fixed=True), + ), + "dev": SersicComponentConfig( + prior_axrat_stddev=0.8, + prior_size_stddev=0.3, + sersic_index=SersicIndexParameterConfig(value_initial=4., fixed=True), + ), + }, + )})} + ) diff --git a/pipelines/merge.yaml b/pipelines/merge.yaml index 4a0ea2e..f841d9f 100644 --- a/pipelines/merge.yaml +++ b/pipelines/merge.yaml @@ -4,7 +4,13 @@ parameters: bands_match: ["g", "r", "i"] # Bands to match aperture fluxes on as a fallback if model fit failed bands_fallback: [] - include_psflux: True + include_psflux: False + merge_expdev: False + merge_psexpdev: False + merge_psgauss: False + merge_psser: False + merge_ser: False + merge_ser_fixedcen: False model_prefix: "mpf_psgauss" size_include: None tasks: @@ -16,38 +22,64 @@ tasks: from lsst.meas.extensions.multiprofit.consolidate_astropy_table import ( InputConfig, MergeMultibandFluxes, ) - config.inputs = { + inputs = { "deepCoadd_psfs_multiprofit": InputConfig( doc="PSF fit parameters", column_id="id", - is_multiband=False, ), "objectTable_tract": InputConfig( doc="Merged object table", - columns=["refExtendedness"], + # objectId is needed for inner join + columns=["objectId", "refExtendedness", "detect_isPatchInner"], is_multiband=True, is_multipatch=True, + join_column="objectId", storageClass="DataFrame", ), - "deepCoadd_psgauss_multiprofit": InputConfig( - doc="Point Source + Gaussian source fit", - action=MergeMultibandFluxes(name_model="psgauss"), + } + if parameters.merge_ser_fixedcen: + inputs["deepCoadd_ser_fixedcen_multiprofit"] = InputConfig( + doc="Sersic fixed centroid source fit", + action=MergeMultibandFluxes(name_model="ser_fixedcen"), column_id="id", is_multiband=True, - ), - "deepCoadd_psser_multiprofit": InputConfig( - doc="Point Source + Sersic source fit", - action=MergeMultibandFluxes(name_model="psser"), + ) + if parameters.merge_expdev: + inputs["deepCoadd_expdev_multiprofit"] = InputConfig( + doc="Exponential + DeVaucouleurs source fit", + action=MergeMultibandFluxes(name_model="expdev"), column_id="id", is_multiband=True, - ), - "deepCoadd_psexpdev_multiprofit": InputConfig( + ) + if parameters.merge_ser: + inputs["deepCoadd_ser_multiprofit"] = InputConfig( + doc="Sersic source fit", + action=MergeMultibandFluxes(name_model="ser"), + column_id="id", + is_multiband=True, + ) + if parameters.merge_psexpdev: + inputs["deepCoadd_psexpdev_multiprofit"] = InputConfig( doc="Point Source + Exponential + DeVaucouleurs source fit", action=MergeMultibandFluxes(name_model="psexpdev"), column_id="id", is_multiband=True, - ), - } + ) + if parameters.merge_psgauss: + inputs["deepCoadd_psgauss_multiprofit"] = InputConfig( + doc="Point Source + Gaussian source fit", + action=MergeMultibandFluxes(name_model="psgauss"), + column_id="id", + is_multiband=True, + ) + if parameters.merge_psser: + inputs["deepCoadd_psser_multiprofit"] = InputConfig( + doc="Point Source + Sersic source fit", + action=MergeMultibandFluxes(name_model="psser"), + column_id="id", + is_multiband=True, + ) + config.inputs = inputs match_multiprofit: class: lsst.pipe.tasks.match_tract_catalog.MatchTractCatalogTask config: @@ -56,19 +88,17 @@ tasks: # Target settings are likely common to all object tables from lsst.pipe.tasks.match_tract_catalog_probabilistic import MatchTractCatalogProbabilisticTask config.match_tract_catalog.retarget(MatchTractCatalogProbabilisticTask) - - print(f"{parameters.bands_match=}") + fluxes_ref = [ f"flux_{band}" for bands in (parameters.bands_match, parameters.bands_fallback) for band in bands ] print(fluxes_ref) config.match_tract_catalog.columns_ref_flux = fluxes_ref config.match_tract_catalog.columns_ref_meas = ["ra", "dec"] + fluxes_ref - # TODO: Figure out why the list comp version not have parameters in scope - # fluxes + # TODO: Figure out why the list comp version does not have fluxes in scope fluxes_meas = [] for band in parameters.bands_match: - fluxes_meas.append(f"{parameters.model_prefix}_{band}_flux") + fluxes_meas.append(f"{parameters.model_prefix}_{band}_flux") columns_meas = [ f"{parameters.model_prefix}_cen_ra", f"{parameters.model_prefix}_cen_dec" ] + fluxes_meas @@ -76,11 +106,11 @@ tasks: config.match_tract_catalog.columns_target_err = [f"{col}_err" for col in columns_meas] config.match_tract_catalog.coord_format.column_target_coord1 = f"{parameters.model_prefix}_cen_ra" config.match_tract_catalog.coord_format.column_target_coord2 = f"{parameters.model_prefix}_cen_dec" - + config.match_tract_catalog.mag_faintest_ref = 27.0 - config.match_tract_catalog.columns_ref_copy = [ "id", "truth_type" ] - config.match_tract_catalog.columns_ref_select_true = [ "is_unique_truth_entry" ] - config.match_tract_catalog.columns_target_copy = [ "objectId" ] + config.match_tract_catalog.columns_ref_copy = ["id", "truth_type"] + config.match_tract_catalog.columns_ref_select_true = ["is_unique_truth_entry"] + config.match_tract_catalog.columns_target_copy = ["objectId"] config.match_tract_catalog.columns_target_select_true = [] config.match_tract_catalog.columns_target_select_false = [ f"{parameters.model_prefix}_not_primary_flag", @@ -92,9 +122,13 @@ tasks: python: | from lsst.pipe.tasks.diff_matched_tract_catalog import MatchedCatalogFluxesConfig - + columns_flux = {} - config.columns_target_copy = ["objectId", "patch"] + config.columns_target_copy = [ + "objectId", "patch", + f"{parameters.model_prefix}_cen_x", f"{parameters.model_prefix}_cen_y", + f"{parameters.model_prefix}_cen_x_err", f"{parameters.model_prefix}_cen_y_err", + ] for band in parameters.bands_match: columns_flux[band] = MatchedCatalogFluxesConfig( column_ref_flux=f"flux_{band}", @@ -102,7 +136,7 @@ tasks: columns_target_flux_err=[f"{parameters.model_prefix}_{band}_flux_err",], ) if parameters.include_psflux: - config.columns_target_copy.append(f"{parameters.model_prefix}_ps1_{band}_flux") + config.columns_target_copy.append(f"{parameters.model_prefix}_ps_{band}_flux") if parameters.size_include: for ax in ("x", "y"): config.columns_target_copy.append( diff --git a/python/lsst/meas/extensions/multiprofit/consolidate_astropy_table.py b/python/lsst/meas/extensions/multiprofit/consolidate_astropy_table.py index 7c29ae5..5d26655 100644 --- a/python/lsst/meas/extensions/multiprofit/consolidate_astropy_table.py +++ b/python/lsst/meas/extensions/multiprofit/consolidate_astropy_table.py @@ -103,8 +103,27 @@ class InputConfig(pexConfig.Config): column_id = pexConfig.Field[str](doc="ID column to merge", optional=False, default="objectId") is_multiband = pexConfig.Field[bool](doc="Whether the dataset is multiband or not", default=False) is_multipatch = pexConfig.Field[bool](doc="Whether the dataset is multipatch or not", default=False) + join_column = pexConfig.Field[str]( + doc="Column to join on if unequal length instead of stacking", default=None, optional=True + ) storageClass = pexConfig.Field[str](doc="Storage class for DatasetType", default="ArrowAstropy") + def get_connection(self, name: str) -> connectionTypes.Input: + dimensions = ["skymap", "tract"] + if not self.is_multipatch: + dimensions.append("patch") + if not self.is_multiband: + dimensions.append("band") + connection = connectionTypes.Input( + doc=self.doc, + name=name, + storageClass=self.storageClass, + dimensions=dimensions, + multiple=not (self.is_multiband and self.is_multipatch), + deferLoad=self.columns is not None, + ) + return connection + class ConsolidateAstropyTableConfigBase(pexConfig.Config): """Config for ConsolidateAstropyTableTask.""" @@ -133,23 +152,11 @@ class ConsolidateAstropyTableConnections( def __init__(self, *, config: ConsolidateAstropyTableConfigBase): for name, config_input in config.inputs.items(): - dimensions = ["skymap", "tract"] - if not config_input.is_multipatch: - dimensions.append("patch") - if not config_input.is_multiband: - dimensions.append("band") - connection = connectionTypes.Input( - doc=config_input.doc, - name=name, - storageClass=config_input.storageClass, - dimensions=dimensions, - multiple=not config_input.is_multiband, - deferLoad=config_input.columns is not None, - ) if hasattr(self, name): raise ValueError( f"{config_input=} {name=} is invalid, due to being an existing attribute" f" of {self=}" ) + connection = config_input.get_connection(name) setattr(self, name, connection) @@ -160,6 +167,16 @@ class ConsolidateAstropyTableConfig( ): """PipelineTaskConfig for ConsolidateAstropyTableTask.""" + join_type = pexConfig.ChoiceField[str]( + doc="Type of join to perform in the final hstack", + allowed={ + "inner": "Inner join", + "outer": "Outer join", + }, + default="inner", + optional=False, + ) + class ConsolidateAstropyTableTask(pipeBase.PipelineTask): """Write patch-merged astropy tables to a tract-level astropy table.""" @@ -195,7 +212,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): columns = tuple(data_in.columns) if inputConfig.storageClass == "DataFrame": - data_in = apTab.Table.from_pandas(data_in) + data_in = apTab.Table.from_pandas(data_in.reset_index(drop=False)) elif inputConfig.storageClass == "ArrowAstropy": data_in.meta = {name: data_in.meta} @@ -264,6 +281,19 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): for patch in (patches_ref if not config_input.is_multipatch else patches_null) ] data[name] = tables[0] if (len(tables) == 1) else apTab.vstack(tables, join_type="exact") - table = apTab.hstack([data[name] for name in self.config.inputs]) + # This will break if all tables have config.join_column + # ... but that seems unlikely. + table = apTab.hstack( + [data[name] for name, config in self.config.inputs.items() if config.join_column is None], + join_type=self.config.join_type, + ) + for name, config in self.config.inputs.items(): + if config.join_column: + table = apTab.join( + table, + data[name], + join_type=self.config.join_type, + keys=config.join_column, + ) butlerQC.put(pipeBase.Struct(cat_output=table), outputRefs) diff --git a/python/lsst/meas/extensions/multiprofit/errors.py b/python/lsst/meas/extensions/multiprofit/errors.py new file mode 100644 index 0000000..cf0e72e --- /dev/null +++ b/python/lsst/meas/extensions/multiprofit/errors.py @@ -0,0 +1,38 @@ +# This file is part of meas_extensions_multiprofit. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from lsst.multiprofit.errors import CatalogError + + +class IsParentError(CatalogError): + """RuntimeError for objects that are not primary and shouldn't be fit.""" + + @classmethod + def column_name(cls) -> str: + return "is_parent_flag" + + +class NotPrimaryError(CatalogError): + """RuntimeError for objects that are not primary and shouldn't be fit.""" + + @classmethod + def column_name(cls) -> str: + return "not_primary_flag" diff --git a/python/lsst/meas/extensions/multiprofit/fit_coadd_multiband.py b/python/lsst/meas/extensions/multiprofit/fit_coadd_multiband.py index 9b114e6..83710ce 100644 --- a/python/lsst/meas/extensions/multiprofit/fit_coadd_multiband.py +++ b/python/lsst/meas/extensions/multiprofit/fit_coadd_multiband.py @@ -19,34 +19,69 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import logging import math -from typing import Any, Mapping, Sequence, Type +from typing import Any, Mapping, Sequence import gauss2d as g2 import gauss2d.fit as g2f -import lsst.geom as geom import lsst.pex.config as pexConfig import lsst.pipe.base as pipeBase import lsst.pipe.tasks.fit_coadd_multiband as fitMB import lsst.utils.timer as utilsTimer import numpy as np import pydantic +from astropy.table import Table from lsst.daf.butler.formatters.parquet import astropy_to_arrow from lsst.multiprofit.config import set_config_from_dict -from lsst.multiprofit.fit_psf import CatalogPsfFitterConfig, PsfRebuildFitFlagError +from lsst.multiprofit.errors import PsfRebuildFitFlagError +from lsst.multiprofit.fit_psf import CatalogPsfFitterConfig, CatalogPsfFitterConfigData from lsst.multiprofit.fit_source import ( CatalogExposureSourcesABC, CatalogSourceFitterABC, CatalogSourceFitterConfig, + CatalogSourceFitterConfigData, ) from lsst.multiprofit.utils import get_params_uniq from pydantic.dataclasses import dataclass +from .errors import IsParentError, NotPrimaryError from .utils import get_spanned_image -class NotPrimaryError(RuntimeError): - """RuntimeError for sources that are not primary and shouldn't be fit.""" +class MultiProFitSourceConfig(CatalogSourceFitterConfig, fitMB.CoaddMultibandFitSubConfig): + """Configuration for the MultiProFit profile fitter.""" + + bands_fit = pexConfig.ListField( + dtype=str, + default=[], + doc="list of bandpass filters to fit", + listCheck=lambda x: len(set(x)) == len(x), + ) + mask_names_zero = pexConfig.ListField[str]( + doc="Mask bits to mask out", + default=["BAD", "EDGE", "SAT", "NO_DATA"], + ) + psf_sigma_subtract = pexConfig.Field[float]( + doc="PSF x/y sigma value to subtract in quadrature from best-fit values", + default=0.1, + check=lambda x: np.isfinite(x) and (x >= 0), + ) + prefix_column = pexConfig.Field[str](default="mpf_", doc="Column name prefix") + + def bands_read_only(self) -> set[str]: + # TODO: Re-implement determination of prior-only bands once + # data-driven priors are re-implemented (DM-4xxxx) + return set() + + def setDefaults(self): + super().setDefaults() + self.flag_errors = { + IsParentError.column_name(): "IsParentError", + NotPrimaryError.column_name(): "NotPrimaryError", + PsfRebuildFitFlagError.column_name(): "PsfRebuildFitFlagError", + } + self.centroid_pixel_offset = -0.5 @dataclass(frozen=True, kw_only=True, config=fitMB.CatalogExposureConfig) @@ -54,19 +89,34 @@ class CatalogExposurePsfs(fitMB.CatalogExposureInputs, CatalogExposureSourcesABC """Input data from lsst pipelines, parsed for MultiProFit.""" channel: g2f.Channel = pydantic.Field(title="Channel for the image's band") - config_fit: CatalogSourceFitterConfig = pydantic.Field(title="Channel for the image's band") + config_fit: MultiProFitSourceConfig = pydantic.Field(title="Config for fitting options") - def get_psfmodel(self, source): + def get_psf_model(self, source): match = np.argwhere( - (self.table_psf_fits[self.config_fit_psf.column_id] == source[self.config_fit.column_id]) + (self.table_psf_fits[self.psf_model_data.config.column_id] == source[self.config_fit.column_id]) )[0][0] - return self.config_fit_psf.rebuild_psfmodel(self.table_psf_fits[match]) - - def get_source_observation(self, source) -> g2f.Observation: - if (not source["detect_isPrimary"]) or source["merge_peak_sky"]: - raise NotPrimaryError(f"source {source[self.config_fit.column_id]} has invalid flags for fit") + psf_model = self.psf_model_data.psf_model + self.psf_model_data.init_psf_model(self.table_psf_fits[match]) + + sigma_subtract = self.config_fit.psf_sigma_subtract + if sigma_subtract > 0: + sigma_subtract_sq = sigma_subtract * sigma_subtract + for param in self.psf_model_data.parameters.values(): + if isinstance( + param, + g2f.SigmaXParameterD | g2f.SigmaYParameterD | g2f.ReffXParameterD | g2f.ReffYParameterD, + ): + param.value = math.sqrt(param.value**2 - sigma_subtract_sq) + return psf_model + + def get_source_observation(self, source, **kwargs) -> g2f.Observation: + if not kwargs.get("skip_flags"): + if (not source["detect_isPrimary"]) or source["merge_peak_sky"]: + raise NotPrimaryError(f"source {source[self.config_fit.column_id]} has invalid flags for fit") footprint = source.getFootprint() bbox = footprint.getBBox() + if not (bbox.getArea() > 0): + return None bitmask = 0 mask = self.exposure.mask[bbox] spans = footprint.spans.asArray() @@ -75,58 +125,60 @@ def get_source_observation(self, source) -> g2f.Observation: bitmask |= bitval mask = ((mask.array & bitmask) != 0) & (spans != 0) mask = ~mask + + is_deblended_child = source["parent"] != 0 + img, _, sigma_inv = get_spanned_image( exposure=self.exposure, + footprint=footprint if is_deblended_child else None, bbox=bbox, spans=spans, get_sig_inv=True, ) + x_min_bbox, y_min_bbox = bbox.beginX, bbox.beginY + # Crop to tighter box for deblended model if edges are unusable + # ... this rarely ever seems to happen though + if is_deblended_child: + coords = np.argwhere(np.isfinite(img) & (sigma_inv > 0) & np.isfinite(sigma_inv)) + x_min, y_min = coords.min(axis=0) + x_max, y_max = coords.max(axis=0) + x_max += 1 + y_max += 1 + + if (x_min > 0) or (y_min > 0) or (x_max < img.shape[0]) or (y_max < img.shape[1]): + # Ensure the nominal centroid is still inside the box + # ... although it's a bad sign if that row/column is all bad + x_cen = source["slot_Centroid_x"] - x_min_bbox + y_cen = source["slot_Centroid_y"] - y_min_bbox + x_min = min(x_min, int(np.floor(x_cen))) + x_max = max(x_max, int(np.ceil(x_cen))) + y_min = min(y_min, int(np.floor(y_cen))) + y_max = max(y_max, int(np.ceil(y_cen))) + x_min_bbox += x_min + y_min_bbox += y_min + img = img[x_min:x_max, y_min:y_max] + sigma_inv = sigma_inv[x_min:x_max, y_min:y_max] + mask = mask[x_min:x_max, y_min:y_max] + sigma_inv[~mask] = 0 + coordsys = g2.CoordinateSystem(1.0, 1.0, x_min_bbox, y_min_bbox) + obs = g2f.Observation( - image=g2.ImageD(img), - sigma_inv=g2.ImageD(sigma_inv), - mask_inv=g2.ImageB(mask), + image=g2.ImageD(img, coordsys), + sigma_inv=g2.ImageD(sigma_inv, coordsys), + mask_inv=g2.ImageB(mask, coordsys), channel=self.channel, ) return obs def __post_init__(self): - # Regular standard library dataclasses require this hideous workaround - # due to https://github.com/python/cpython/issues/83315 : super( - # fitMB.CatalogExposureInputs, self).__thisclass__.__post_init__(self) - # ... but pydantic dataclasses do not seem to, and also don't pass self - super().__post_init__() + config_dict = self.table_psf_fits.meta["config"] + # TODO: Can/should this be the derived type (MultiProFitPsfConfig)? config = CatalogPsfFitterConfig() - set_config_from_dict(config, self.table_psf_fits.meta["config"]) - object.__setattr__(self, "config_fit_psf", config) - - -class MultiProFitSourceConfig(CatalogSourceFitterConfig, fitMB.CoaddMultibandFitSubConfig): - """Configuration for the MultiProFit profile fitter.""" - - bands_fit = pexConfig.ListField( - dtype=str, - default=[], - doc="list of bandpass filters to fit", - listCheck=lambda x: len(set(x)) == len(x), - ) - mask_names_zero = pexConfig.ListField[str]( - default=["BAD", "EDGE", "SAT", "NO_DATA"], doc="Mask bits to mask out" - ) - prefix_column = pexConfig.Field[str](default="mpf_", doc="Column name prefix") - - def bands_read_only(self) -> set[str]: - # TODO: Re-implement determination of prior-only bands once priors - # are re-implemented (DM-3xxxx) - return set() - - def setDefaults(self): - super().setDefaults() - self.flag_errors = { - "not_primary_flag": "NotPrimaryError", - "psf_fit_flag": "PsfRebuildFitFlagError", - } + set_config_from_dict(config, config_dict) + config_data = CatalogPsfFitterConfigData(config=config) + object.__setattr__(self, "psf_model_data", config_data) class MultiProFitSourceTask(CatalogSourceFitterABC, fitMB.CoaddMultibandFitSubTask): @@ -152,135 +204,170 @@ class MultiProFitSourceTask(CatalogSourceFitterABC, fitMB.CoaddMultibandFitSubTa def __init__(self, **kwargs: Any): errors_expected = {} if "errors_expected" not in kwargs else kwargs.pop("errors_expected") - if NotPrimaryError not in errors_expected: - errors_expected[NotPrimaryError] = "not_primary_flag" - if PsfRebuildFitFlagError not in errors_expected: - errors_expected[PsfRebuildFitFlagError] = "psf_fit_flag" + for error_catalog in (IsParentError, NotPrimaryError, PsfRebuildFitFlagError): + if error_catalog not in errors_expected: + errors_expected[error_catalog] = error_catalog.column_name() CatalogSourceFitterABC.__init__(self, errors_expected=errors_expected) fitMB.CoaddMultibandFitSubTask.__init__(self, **kwargs) - @staticmethod - def _init_component( - component: g2f.Component, - values_init: dict[Type[g2f.ParameterD], float] = None, - limits_init: dict[Type[g2f.ParameterD], g2f.LimitsD] = None, + def copy_centroid_errors( + self, + columns_cenx_err_copy: tuple[str], + columns_ceny_err_copy: tuple[str], + results: Table, + catalog_multi: Sequence, + catexps: list[CatalogExposureSourcesABC], + config_data: CatalogSourceFitterConfigData, ): - """Initialize component parameter values. - - Parameters - ---------- - component : `gauss2d.fit.Component` - The component to initialize. - values_init - Initial values to set per parameter type. - limits_init - Initial limits to set per parameter type. - """ - # These aren't necessarily all free - should set cen_x, y - # even if they're fixed, for example - params_init = get_params_uniq(component) - for param in params_init: - type_param = type(param) - if (value := values_init.get(type_param)) is not None: - if param.limits.check(value): - param.value = value - if (limits := limits_init.get(type_param)) is not None: - value = value if value is not None else param.value - if not limits.check(value): - param.value = (limits.max + limits.min) / 2.0 - param.limits = limits - - def get_model_cens(self, source: Mapping[str, Any]): - cenx_img, ceny_img = self.catexps[0].exposure.wcs.skyToPixel( - geom.SpherePoint(source["coord_ra"], source["coord_dec"]) - ) - bbox = source.getFootprint().getBBox() - begin_x, begin_y = bbox.beginX, bbox.beginY - # multiprofit bottom left corner coords are 0, 0, not -0.5, -0.5 - cen_x = cenx_img - begin_x + 0.5 - cen_y = ceny_img - begin_y + 0.5 - return cen_x, cen_y + for column in columns_cenx_err_copy: + results[column] = catalog_multi["slot_Centroid_xErr"] + for column in columns_ceny_err_copy: + results[column] = catalog_multi["slot_Centroid_yErr"] def get_model_radec(self, source: Mapping[str, Any], cen_x: float, cen_y: float): - bbox = source.getFootprint().getBBox() - begin_x, begin_y = bbox.beginX, bbox.beginY - # multiprofit bottom left corner coords are 0, 0, not -0.5, -0.5 - cen_x_img = cen_x + begin_x - 0.5 - cen_y_img = cen_y + begin_y - 0.5 - ra, dec = self.catexps[0].exposure.wcs.pixelToSky(cen_x_img, cen_y_img) + # no extra conversions are needed here - cen_x, cen_y are in catalog + # coordinates already + ra, dec = self.catexps[0].exposure.wcs.pixelToSky(cen_x, cen_y) return ra.asDegrees(), dec.asDegrees() def initialize_model( - self, model: g2f.Model, source: Mapping[str, Any], limits_x: g2f.LimitsD, limits_y: g2f.LimitsD + self, + model: g2f.Model, + source: Mapping[str, Any], + catexps: list[CatalogExposureSourcesABC], + values_init: Mapping[g2f.ParameterD, float] | None = None, + centroid_pixel_offset: float = 0, + **kwargs, ): - comps = model.sources[0].components - sig_x = math.sqrt(source["base_SdssShape_xx"]) - sig_y = math.sqrt(source["base_SdssShape_yy"]) - # There is a sign convention difference - rho = np.clip(-source["base_SdssShape_xy"] / (sig_x * sig_y), -0.5, 0.5) + if values_init is None: + values_init = {} + set_flux_limits = kwargs.pop("set_flux_limits") if "set_flux_limits" in kwargs else True + if kwargs: + raise ValueError(f"Unexpected {kwargs=}") + sig_x = math.sqrt(source["slot_Shape_xx"]) + sig_y = math.sqrt(source["slot_Shape_yy"]) + # TODO: Verify if there's a sign difference here + rho = np.clip(source["slot_Shape_xy"] / (sig_x * sig_y), -0.5, 0.5) if not np.isfinite(rho): sig_x, sig_y, rho = 0.5, 0.5, 0 - if not source["base_SdssShape_flag"]: - flux = source["base_SdssShape_instFlux"] - else: - flux = source["base_GaussianFlux_instFlux"] - if not (flux > 0): - flux = source["base_PsfFlux_instFlux"] - if not (flux > 0): - flux = 1 - n_psfs = self.config.n_pointsources - n_extended = len(self.config.sersics) - observation = model.data[0] - x_max = float(observation.image.n_cols) - y_max = float(observation.image.n_rows) - limits_x.max = x_max - limits_y.max = y_max + + # Make restrictive centroid limits (intersection, not union) + x_min, y_min, x_max, y_max = -np.Inf, -np.Inf, np.Inf, np.Inf + + fluxes_init = [] + fluxes_limits = [] + + n_observations = len(model.data) + n_components = len(model.sources[0].components) + + for idx_obs, observation in enumerate(model.data): + coordsys = observation.image.coordsys + catexp = catexps[idx_obs] + band = catexp.band + + x_min = max(x_min, coordsys.x_min) + y_min = max(y_min, coordsys.y_min) + x_max = min(x_max, coordsys.x_min + float(observation.image.n_cols)) + y_max = min(y_max, coordsys.y_min + float(observation.image.n_rows)) + + flux_total = np.nansum(observation.image.data[observation.mask_inv.data]) + + column_ref = f"merge_measurement_{band}" + if column_ref in source.schema.getNames() and source[column_ref]: + row = source + else: + row = catexp.catalog.find(source["id"]) + + if not row["base_SdssShape_flag"]: + flux_init = row["base_SdssShape_instFlux"] + else: + flux_init = row["slot_GaussianFlux_instFlux"] + if not (flux_init > 0): + flux_init = row["slot_PsfFlux_instFlux"] + + calib = catexp.exposure.photoCalib + flux_init = calib.instFluxToNanojansky(flux_init) if (flux_init > 0) else max(flux_total, 1.0) + if set_flux_limits: + flux_max = 10 * max((flux_init, flux_total)) + flux_min = min(1e-12, flux_max / 1000) + else: + flux_min, flux_max = 0, np.Inf + fluxes_init.append(flux_init / n_components) + fluxes_limits.append((flux_min, flux_max)) + try: - cenx, ceny = self.get_model_cens(source) + cen_x, cen_y = ( + source["slot_Centroid_x"] - centroid_pixel_offset, + source["slot_Centroid_y"] - centroid_pixel_offset, + ) # TODO: determine which exceptions can occur above except Exception: - cenx = observation.image.n_cols / 2.0 - ceny = observation.image.n_rows / 2.0 - flux = flux / (n_psfs + n_extended) - values_init = { - g2f.IntegralParameterD: flux, - g2f.CentroidXParameterD: cenx, - g2f.CentroidYParameterD: ceny, - g2f.ReffXParameterD: sig_x, - g2f.ReffYParameterD: sig_y, - g2f.SigmaXParameterD: sig_x, - g2f.SigmaYParameterD: sig_y, - g2f.RhoParameterD: -rho, - } - # Do not initialize PSF size/rho: they'll all stay zero - params_psf_init = (g2f.IntegralParameterD, g2f.CentroidXParameterD, g2f.CentroidYParameterD) - values_init_psf = {key: values_init[key] for key in params_psf_init} - size_major = g2.EllipseMajor(g2.Ellipse(sigma_x=sig_x, sigma_y=sig_y, rho=rho)).r_major + # TODO: Add bbox coords or remove + cen_x = observation.image.n_cols / 2.0 + cen_y = observation.image.n_rows / 2.0 + # An R_eff larger than the box size is problematic. This should also # stop unreasonable size proposals; a log10 transform isn't enough. # TODO: Try logit for r_eff? - flux_max = 5 * max([np.sum(np.abs(datum.image.data)) for datum in model.data]) - flux_min = 1 / flux_max - limits_flux = g2f.LimitsD(flux_min, flux_max, "unreliable flux limits") - - limits_init = { - g2f.IntegralParameterD: limits_flux, - g2f.ReffXParameterD: g2f.LimitsD(1e-5, x_max), - g2f.ReffYParameterD: g2f.LimitsD(1e-5, y_max), - g2f.SigmaXParameterD: g2f.LimitsD(1e-5, x_max), - g2f.SigmaYParameterD: g2f.LimitsD(1e-5, y_max), + size_major = g2.EllipseMajor(g2.Ellipse(sigma_x=sig_x, sigma_y=sig_y, rho=rho)).r_major + limits_size = max(5.0 * size_major, 2.0 * np.hypot(x_max - x_min, y_max - y_min)) + limits_xy = (1e-5, limits_size) + params_limits_init = { + g2f.CentroidXParameterD: (cen_x, (x_min, x_max)), + g2f.CentroidYParameterD: (cen_y, (y_min, y_max)), + g2f.ReffXParameterD: (sig_x, limits_xy), + g2f.ReffYParameterD: (sig_y, limits_xy), + g2f.SigmaXParameterD: (sig_x, limits_xy), + g2f.SigmaYParameterD: (sig_y, limits_xy), + g2f.RhoParameterD: (rho, None), + # TODO: get guess from configs? + g2f.SersicMixComponentIndexParameterD: (1.0, None), } - limits_init_psf = {g2f.IntegralParameterD: limits_init[g2f.IntegralParameterD]} - - for comp in comps[:n_psfs]: - self._init_component(comp, values_init=values_init_psf, limits_init=limits_init_psf) - for comp, config_comp in zip(comps[n_psfs:], self.config.sersics.values()): - if config_comp.sersicindex.fixed: - if g2f.SersicIndexParameterD in values_init: - del values_init[g2f.SersicMixComponentIndexParameterD] + + # TODO: There ought to be a better way to not get the PSF centroids + # (those are part of model.data's fixed parameters) + params_init = ( + tuple( + ( + param + for param in get_params_uniq(model.sources[0]) + if param.free + or ( + isinstance(param, g2f.CentroidXParameterD) + or isinstance(param, g2f.CentroidYParameterD) + ) + ) + ) + if (len(model.sources) == 1) + else tuple( + { + param: None + for source in model.sources + for param in get_params_uniq(source) + if param.free + or ( + isinstance(param, g2f.CentroidXParameterD) + or isinstance(param, g2f.CentroidYParameterD) + ) + }.keys() + ) + ) + + idx_obs = 0 + for param in params_init: + if param.linear: + value_init = fluxes_init[idx_obs] + limits_new = fluxes_limits[idx_obs] + idx_obs += 1 + if idx_obs == n_observations: + idx_obs = 0 else: - values_init[g2f.SersicMixComponentIndexParameterD] = config_comp.sersicindex.value_initial - self._init_component(comp, values_init=values_init, limits_init=limits_init) + value_init, limits_new = params_limits_init.get(type(param), (values_init.get(param), None)) + if limits_new: + param.limits = g2f.LimitsD(limits_new[0], limits_new[1]) + if value_init is not None: + param.value = value_init + for prior in model.priors: if isinstance(prior, g2f.GaussianPrior): # TODO: Add centroid prior @@ -288,6 +375,29 @@ def initialize_model( elif isinstance(prior, g2f.ShapePrior): prior.prior_size.mean_parameter.value = size_major + def make_CatalogExposurePsfs(self, catexp: fitMB.CatalogExposureInputs) -> CatalogExposurePsfs: + catexp_psf = CatalogExposurePsfs( + # dataclasses.asdict(catexp)_makes a recursive deep copy. + # That must be avoided. + **{key: getattr(catexp, key) for key in catexp.__dataclass_fields__.keys()}, + channel=g2f.Channel.get(catexp.band), + config_fit=self.config, + ) + return catexp_psf + + def validate_fit_inputs( + self, + catalog_multi: Sequence, + catexps: list[CatalogExposurePsfs], + configdata: CatalogSourceFitterConfigData = None, + logger: logging.Logger = None, + **kwargs: Any, + ) -> None: + errors = [] + for idx, catexp in enumerate(catexps): + if not isinstance(catexp, CatalogExposurePsfs): + errors.append(f"catexps[{idx=} {type(catexp)=} !isinstance(CatalogExposurePsfs)") + @utilsTimer.timeMethod def run( self, @@ -312,18 +422,17 @@ def run( A table with fit parameters for the PSF model at the location of each source. """ - catexps_conv = [None] * len(catexps) + n_catexps = len(catexps) + catexps_conv: list[CatalogExposurePsfs] = [None] * n_catexps + channels: list[g2f.Channel] = [None] * n_catexps for idx, catexp in enumerate(catexps): - if isinstance(catexp, CatalogExposurePsfs): - catexps_conv[idx] = catexp - else: - catexps_conv[idx] = CatalogExposurePsfs( - # dataclasses.asdict(catexp)_makes a recursive deep copy. - # That must be avoided. - **{key: getattr(catexp, key) for key in catexp.__dataclass_fields__.keys()}, - channel=g2f.Channel.get(catexp.band), - config_fit=self.config, - ) + if not isinstance(catexp, CatalogExposurePsfs): + catexp = self.make_CatalogExposurePsfs(catexp) + catexps_conv[idx] = catexp + channels[idx] = catexp.channel self.catexps = catexps - catalog = self.fit(catalog_multi=catalog_multi, catexps=catexps_conv, config=self.config, **kwargs) + config_data = CatalogSourceFitterConfigData(channels=channels, config=self.config) + catalog = self.fit( + catalog_multi=catalog_multi, catexps=catexps_conv, config_data=config_data, **kwargs + ) return pipeBase.Struct(output=astropy_to_arrow(catalog)) diff --git a/python/lsst/meas/extensions/multiprofit/fit_coadd_psf.py b/python/lsst/meas/extensions/multiprofit/fit_coadd_psf.py index d3dca19..a84b49a 100644 --- a/python/lsst/meas/extensions/multiprofit/fit_coadd_psf.py +++ b/python/lsst/meas/extensions/multiprofit/fit_coadd_psf.py @@ -24,19 +24,16 @@ import lsst.pipe.tasks.fit_coadd_psf as fitCP import lsst.utils.timer as utilsTimer from lsst.daf.butler.formatters.parquet import astropy_to_arrow -from lsst.multiprofit.fit_psf import CatalogExposurePsfABC, CatalogPsfFitter, CatalogPsfFitterConfig +from lsst.multiprofit.fit_psf import CatalogPsfFitter, CatalogPsfFitterConfig, CatalogPsfFitterConfigData from lsst.pex.exceptions import InvalidParameterError -from pydantic.dataclasses import dataclass - -@dataclass(frozen=True, kw_only=True, config=fitCP.CatalogExposureConfig) -class CatalogExposure(fitCP.CatalogExposurePsf, CatalogExposurePsfABC): - """A CatalogExposure for PSF fitting.""" +from .errors import IsParentError class MultiProFitPsfConfig(CatalogPsfFitterConfig, fitCP.CoaddPsfFitSubConfig): """Configuration for the MultiProFit Gaussian mixture PSF fitter.""" + fit_parents = pexConfig.Field[bool](default=False, doc="Whether to fit parent object PSFs") prefix_column = pexConfig.Field[str](default="mpf_psf_", doc="Column name prefix") def setDefaults(self): @@ -72,17 +69,29 @@ def __init__(self, **kwargs): CatalogPsfFitter.__init__(self, errors_expected=errors_expected) fitCP.CoaddPsfFitSubTask.__init__(self, **kwargs) + def check_source(self, source, config): + if ( + config + and hasattr(config, "fit_parents") + and not config.fit_parents + and (source["parent"] == 0) + and (source["deblend_nChild"] > 1) + ): + raise IsParentError( + f"{source['id']=} is a parent with nChild={source['deblend_nChild']}" f" and will be skipped" + ) + @utilsTimer.timeMethod def run( self, - catexp: CatalogExposure, + catexp: fitCP.CatalogExposurePsf, **kwargs, ) -> pipeBase.Struct: """Run the MultiProFit PSF task on a catalog-exposure pair. Parameters ---------- - catexp : `CatalogExposure` + catexp An exposure to fit a model PSF at the position of all sources in the corresponding catalog. **kwargs @@ -90,9 +99,25 @@ def run( Returns ------- - catalog : `astropy.Table` + catalog A table with fit parameters for the PSF model at the location of each source. """ - catalog = self.fit(catexp=catexp, config=self.config, **kwargs) + is_parent_name = IsParentError.column_name() + if not self.config.fit_parents: + if is_parent_name not in self.errors_expected: + self.errors_expected[IsParentError] = is_parent_name + if "IsParentError" not in self.config.flag_errors.values(): + self.config._frozen = False + self.config.flag_errors[is_parent_name] = "IsParentError" + self.config._frozen = True + elif is_parent_name in self.errors_expected: + del self.errors_expected[is_parent_name] + if is_parent_name in self.config.flag_errors.keys(): + self.config._frozen = False + del self.config.flag_errors[is_parent_name] + self.config._frozen = True + + config_data = CatalogPsfFitterConfigData(config=self.config) + catalog = self.fit(catexp=catexp, config_data=config_data, **kwargs) return pipeBase.Struct(output=astropy_to_arrow(catalog)) diff --git a/python/lsst/meas/extensions/multiprofit/utils.py b/python/lsst/meas/extensions/multiprofit/utils.py index f364fd5..fbc7e07 100644 --- a/python/lsst/meas/extensions/multiprofit/utils.py +++ b/python/lsst/meas/extensions/multiprofit/utils.py @@ -46,6 +46,7 @@ def get_spanned_image( bbox: geom.Box2I | None = None, spans: np.ndarray | None = None, get_sig_inv: bool = False, + calibrate: bool = True, ) -> tuple[np.ndarray, geom.Box2I, np.ndarray]: """Get an image masked by its spanset. @@ -64,6 +65,8 @@ def get_spanned_image( Defaults to the footprint's spans. get_sig_inv Whether to get the inverse variance and return its square root. + calibrate + Whether to calibrate the image; set to False if already calibrated. Returns ------- @@ -82,14 +85,42 @@ def get_spanned_image( return None, bbox if spans is None: spans = footprint.getSpans().asArray() - sig_inv = None - img = afwImage.Image(bbox, dtype="D") - maskedIm = exposure.photoCalib.calibrateImage(exposure.maskedImage.subset(bbox)) - img.array[spans] = maskedIm.image.array[spans] - if get_sig_inv: - sig_inv = afwImage.Image(bbox, dtype="D").array - sig_inv[spans] = 1 / np.sqrt(maskedIm.variance.array[spans]) - return img.array, bbox, sig_inv + sig_inv = afwImage.ImageF(bbox) if get_sig_inv else None + img = afwImage.ImageF(bbox) + img.array[:] = np.nan + if footprint is None: + maskedIm = exposure.maskedImage.subset(bbox) + if not calibrate: + img = maskedIm.image.array + sig_inv.array[spans] = 1 / np.sqrt(maskedIm.variance.array[spans]) + else: + img.array[spans] = footprint.getImageArray() + if get_sig_inv: + # footprint.getVarianceArray() returns zeros + variance = exposure.variance[bbox] + if not calibrate: + sig_inv.array[spans] = 1 / np.sqrt(variance.array[spans]) + if calibrate: + # Have to calibrate with the original image + maskedIm = afwImage.MaskedImageF( + image=exposure.image[bbox], + variance=variance if get_sig_inv else None, + ) + if calibrate: + maskedIm = exposure.photoCalib.calibrateImage(maskedIm) + if footprint is None: + img = maskedIm.image.array + else: + # Apply the calibration to the deblended footprint + # ... hopefully it's multiplicative enough + img.array[spans] *= maskedIm.image.array[spans] / exposure.image[bbox].array[spans] + img = img.array + if get_sig_inv: + sig_inv.array[spans] = 1 / np.sqrt(maskedIm.variance.array[spans]) + # Should not happen but does with footprints having nans + sig_inv.array[~(sig_inv.array >= 0)] = 0 + + return np.array(img, dtype="float64"), bbox, np.array(sig_inv.array, dtype="float64") def join_and_filter(separator: str, items: Iterable[str], exclusion: str | None = None) -> str: diff --git a/tests/test_fit_coadd.py b/tests/test_fit_coadd.py index 2b038ce..2ed2000 100644 --- a/tests/test_fit_coadd.py +++ b/tests/test_fit_coadd.py @@ -19,80 +19,197 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from lsst.afw.image import ExposureF -from lsst.afw.table import SourceCatalog -from lsst.daf.butler.formatters.parquet import arrow_to_astropy -import lsst.meas.extensions.multiprofit.fit_coadd_multiband as fitCMB -import lsst.meas.extensions.multiprofit.fit_coadd_psf as fitCP +import os import gauss2d.fit as g2f -from lsst.multiprofit.componentconfig import SersicConfig, SersicIndexConfig +import lsst.meas.extensions.multiprofit.fit_coadd_multiband as fitCMB +import lsst.meas.extensions.multiprofit.fit_coadd_psf as fitCP import numpy as np -import os import pytest +from lsst.afw.image import ExposureF +from lsst.afw.table import SourceCatalog +from lsst.daf.butler.formatters.parquet import arrow_to_astropy +from lsst.multiprofit.componentconfig import ( + CentroidConfig, + GaussianComponentConfig, + ParameterConfig, + SersicComponentConfig, + SersicIndexParameterConfig, +) +from lsst.multiprofit.modelconfig import ModelConfig +from lsst.multiprofit.sourceconfig import ComponentGroupConfig, SourceConfig +from lsst.pipe.tasks.fit_coadd_psf import CatalogExposurePsf ROOT = os.environ.get("TESTDATA_CI_IMSIM_MINI", None) has_files = (ROOT is not None) and os.path.isdir(ROOT) filename_cat = os.path.join(ROOT, "data", "deepCoadd_meas_0_24_r_2k_ci_imsim.fits") if has_files else None filename_exp = os.path.join(ROOT, "data", "deepCoadd_calexp_0_24_r_2k_ci_imsim.fits") if has_files else None -band = 'r' +band = "r" channel = g2f.Channel.get(band) -dataId = {'band': band} +dataId = {"band": band} +do_exp_fixedcen = False +include_ps = False n_test = 5 -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def catalog(): if not has_files: return None catalog = SourceCatalog.readFits(filename_cat) - good = (catalog['detect_isPrimary'] == 1) & (catalog['merge_peak_sky'] == 0) + good = (catalog["detect_isPrimary"] == 1) & (catalog["merge_peak_sky"] == 0) good[np.where(good)[0][n_test:]] = False return catalog[good] -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def exposure(): if not has_files: return None return ExposureF.readFits(filename_exp) -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def psf_fit_config(): return fitCP.MultiProFitPsfConfig() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def psf_fit_results(catalog, exposure, psf_fit_config): if not has_files: return None - catexp = fitCP.CatalogExposure(dataId=dataId, catalog=catalog, exposure=exposure) + catexp = CatalogExposurePsf(dataId=dataId, catalog=catalog, exposure=exposure) task = fitCP.MultiProFitPsfTask(config=psf_fit_config) results = task.run(catexp).output return arrow_to_astropy(results) -@pytest.fixture(scope='module') -def source_fit_config(): +@pytest.fixture(scope="module") +def source_fit_exp_fixedcen_config(): + config = fitCMB.MultiProFitSourceConfig( + bands_fit=(band,), + config_model=ModelConfig( + sources={ + "": SourceConfig( + component_groups={ + "": ComponentGroupConfig( + centroids={ + "default": CentroidConfig( + x=ParameterConfig(fixed=True), + y=ParameterConfig(fixed=True), + ) + }, + components_sersic={ + "exp": SersicComponentConfig( + sersic_index=SersicIndexParameterConfig(value_initial=1.0, fixed=True), + ) + }, + ), + } + ), + }, + ), + ) + config.validate() + return config + + +@pytest.fixture(scope="module") +def source_fit_ser_config(): config = fitCMB.MultiProFitSourceConfig( - n_pointsources=1, - sersics={'gauss': SersicConfig(sersicindex=SersicIndexConfig(fixed=True, value_initial=0.5))}, + bands_fit=(band,), + config_model=ModelConfig( + sources={ + "": SourceConfig( + component_groups={ + "": ComponentGroupConfig( + components_gauss={ + "ps": GaussianComponentConfig( + size_x=ParameterConfig(value_initial=0.0, fixed=True), + size_y=ParameterConfig(value_initial=0.0, fixed=True), + rho=ParameterConfig(value_initial=0.0, fixed=True), + ) + } + if include_ps + else {}, + components_sersic={ + "ser": SersicComponentConfig( + sersic_index=SersicIndexParameterConfig(value_initial=1.0), + ) + }, + ), + } + ), + }, + ), ) config.validate() return config -@pytest.fixture(scope='module') -def source_fit_results(catalog, exposure, psf_fit_results, psf_fit_config, source_fit_config): +@pytest.fixture(scope="module") +def source_fit_ser_config(): + config = fitCMB.MultiProFitSourceConfig( + bands_fit=(band,), + config_model=ModelConfig( + sources={ + "": SourceConfig( + component_groups={ + "": ComponentGroupConfig( + components_sersic={ + "ser": SersicComponentConfig( + sersic_index=SersicIndexParameterConfig(value_initial=1.0), + ) + }, + ), + } + ), + }, + ), + ) + config.validate() + return config + + +@pytest.fixture(scope="module") +def source_fit_exp_fixedcen_results( + catalog, + exposure, + psf_fit_results, + psf_fit_config, + source_fit_exp_fixedcen_config, +): if not has_files: return None + if not do_exp_fixedcen: + return None catexp = fitCMB.CatalogExposurePsfs( - dataId=dataId, catalog=catalog, exposure=exposure, table_psf_fits=psf_fit_results, - channel=channel, config_fit=source_fit_config, + dataId=dataId, + catalog=catalog, + exposure=exposure, + table_psf_fits=psf_fit_results, + channel=channel, + config_fit=source_fit_exp_fixedcen_config, ) - task = fitCMB.MultiProFitSourceTask(config=source_fit_config) + task = fitCMB.MultiProFitSourceTask(config=source_fit_exp_fixedcen_config) + results = task.run(catalog_multi=catalog, catexps=[catexp]) + return results.output.to_pandas() + + +@pytest.fixture(scope="module") +def source_fit_ser_results(catalog, exposure, psf_fit_results, psf_fit_config, source_fit_ser_config): + if not has_files: + return None + catexp = fitCMB.CatalogExposurePsfs( + dataId=dataId, + catalog=catalog, + exposure=exposure, + table_psf_fits=psf_fit_results, + channel=channel, + config_fit=source_fit_ser_config, + ) + task = fitCMB.MultiProFitSourceTask(config=source_fit_ser_config) results = task.run(catalog_multi=catalog, catexps=[catexp]) return results.output.to_pandas() @@ -105,9 +222,10 @@ def test_psf_fits(psf_fit_results): # TODO: Determine what checks can be done against previous values -def test_source_fits(source_fit_results): - if source_fit_results is not None: - assert len(source_fit_results) == n_test - good = source_fit_results[~source_fit_results['mpf_unknown_flag']] - assert all(good.values.flat > -np.Inf) - # TODO: Determine what checks can be done against previous values +def test_source_fits(source_fit_exp_fixedcen_results, source_fit_ser_results): + for results in (source_fit_exp_fixedcen_results, source_fit_ser_results): + if results is not None: + assert len(results) == n_test + good = results[~results["mpf_unknown_flag"]] + assert all(good.values.flat > -np.Inf) + # TODO: Determine what checks can be done against previous values