diff --git a/pipelines/merge.yaml b/pipelines/merge.yaml new file mode 100644 index 0000000..8615465 --- /dev/null +++ b/pipelines/merge.yaml @@ -0,0 +1,113 @@ +description: | + Merge and match MultiProFit outputs +parameters: + bands_match: ["g", "r", "i"] + # Bands to match aperture fluxes on as a fallback if model fit failed + bands_fallback: [] + include_psflux: True + model_prefix: "mpf_psgauss" + size_include: None +tasks: + merge_multiprofit: + class: lsst.meas.extensions.multiprofit.consolidate_astropy_table.ConsolidateAstropyTableTask + config: + connections.cat_output: objectTable_tract_multiprofit + python: | + from lsst.meas.extensions.multiprofit.consolidate_astropy_table import ( + InputConfig, MergeMultibandFluxes, + ) + config.inputs = { + "deepCoadd_psfs_multiprofit": InputConfig( + doc="PSF fit parameters", + column_id="id", + is_multiband=False, + ), + "objectTable_tract": InputConfig( + doc="Merged object table", + columns=["refExtendedness"], + is_multiband=True, + is_multipatch=True, + storageClass="DataFrame", + ), + "deepCoadd_psexpdev_multiprofit": InputConfig( + doc="Point Source + Exponential + DeVaucouleurs source fit", + action=MergeMultibandFluxes(name_model="psexpdev"), + column_id="id", + is_multiband=True, + ), + "deepCoadd_psgauss_multiprofit": InputConfig( + doc="Point Source + Gaussian source fit", + action=MergeMultibandFluxes(name_model="psgauss"), + column_id="id", + is_multiband=True, + ), + } + match_multiprofit: + class: lsst.pipe.tasks.match_tract_catalog.MatchTractCatalogTask + config: + connections.name_input_cat_target: "objectTable_tract_multiprofit" + python: | + # Target settings are likely common to all object tables + from lsst.pipe.tasks.match_tract_catalog_probabilistic import MatchTractCatalogProbabilisticTask + config.match_tract_catalog.retarget(MatchTractCatalogProbabilisticTask) + + print(f"{parameters.bands_match=}") + fluxes_ref = [ + f"flux_{band}" for bands in (parameters.bands_match, parameters.bands_fallback) for band in bands + ] + print(fluxes_ref) + config.match_tract_catalog.columns_ref_flux = fluxes_ref + config.match_tract_catalog.columns_ref_meas = ["ra", "dec"] + fluxes_ref + # TODO: Figure out why the list comp version not have parameters in scope + # fluxes + fluxes_meas = [] + for band in parameters.bands_match: + fluxes_meas.append(f"{parameters.model_prefix}_{band}_flux") + columns_meas = [ + f"{parameters.model_prefix}_cen_ra", f"{parameters.model_prefix}_cen_dec" + ] + fluxes_meas + config.match_tract_catalog.columns_target_meas = columns_meas + config.match_tract_catalog.columns_target_err = [f"{col}_err" for col in columns_meas] + config.match_tract_catalog.coord_format.column_target_coord1 = f"{parameters.model_prefix}_cen_ra" + config.match_tract_catalog.coord_format.column_target_coord2 = f"{parameters.model_prefix}_cen_dec" + + config.match_tract_catalog.mag_faintest_ref = 27.0 + config.match_tract_catalog.columns_ref_copy = [ "id", "truth_type" ] + config.match_tract_catalog.columns_ref_select_true = [ "is_unique_truth_entry" ] + config.match_tract_catalog.columns_target_copy = [ "objectId" ] + config.match_tract_catalog.columns_target_select_true = [] + config.match_tract_catalog.columns_target_select_false = [ + f"{parameters.model_prefix}_not_primary_flag", + ] + merge_matched_multiprofit: + class: lsst.pipe.tasks.diff_matched_tract_catalog.DiffMatchedTractCatalogTask + config: + connections.name_input_cat_target: "objectTable_tract_multiprofit" + + python: | + from lsst.pipe.tasks.diff_matched_tract_catalog import MatchedCatalogFluxesConfig + + columns_flux = {} + config.columns_target_copy = ["objectId", "patch"] + for band in parameters.bands_match: + columns_flux[band] = MatchedCatalogFluxesConfig( + column_ref_flux=f"flux_{band}", + columns_target_flux=[f"{parameters.model_prefix}_{band}_flux",], + columns_target_flux_err=[f"{parameters.model_prefix}_{band}_flux_err",], + ) + if parameters.include_psflux: + config.columns_target_copy.append(f"{parameters.model_prefix}_ps1_{band}_flux") + if parameters.size_include: + for ax in ("x", "y"): + config.columns_target_copy.append( + f"{parameters.model_prefix}_{parameters.size_include}_sigma_{ax}" + ) + config.columns_flux = columns_flux + config.coord_format.column_target_coord1 = f"{parameters.model_prefix}_cen_ra" + config.coord_format.column_target_coord2 = f"{parameters.model_prefix}_cen_dec" + config.columns_target_coord_err = [ + parameters.model_prefix + "_cen_ra_err", + parameters.model_prefix + "_cen_dec_err", + ] + config.columns_target_select_false = [parameters.model_prefix + "_not_primary_flag"] + config.columns_target_select_true = []