diff --git a/doc/news/DM-4721.feature.rst b/doc/news/DM-4721.feature.rst new file mode 100644 index 00000000..f3386de0 --- /dev/null +++ b/doc/news/DM-4721.feature.rst @@ -0,0 +1,12 @@ +Update BaseMakeCalibrations to trigger cp_verify and don't wait for it to finish. + +- Refactor run_block to handle calibration and verification concurrently + using asyncio +- Added helper methods (process_images, process_verification, + process_calibration) to reduce code duplication +- Manage background tasks with a list, including timeout handling and + cancellation if not completed in time +- Add configuration option `background_task_timeout` to control + background task timeouts +- Added unit test for BaseMakeCalibrations in + `tests/test_base_make_calibrations.py` diff --git a/python/lsst/ts/externalscripts/base_make_calibrations.py b/python/lsst/ts/externalscripts/base_make_calibrations.py index 5790494f..3005deae 100644 --- a/python/lsst/ts/externalscripts/base_make_calibrations.py +++ b/python/lsst/ts/externalscripts/base_make_calibrations.py @@ -17,6 +17,7 @@ # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License +# along with this program. If not, see . __all__ = ["BaseMakeCalibrations"] @@ -72,6 +73,8 @@ def __init__(self, index, descr): "PTC", ] + self.background_tasks = [] + # Pipetask methods to get parameters for calibrations generation self.pipetask_parameters = dict( BIAS=self.get_pipetask_parameters_bias, @@ -347,6 +350,10 @@ def get_schema(cls): type: integer default: 120 descriptor: Timeout value, in seconds, for OODS. + background_task_timeout: + type: integer + default: 30 + descriptor: Timeout value, in seconds, for background tasks note: description: A descriptive note about the images being taken. type: string @@ -1403,6 +1410,7 @@ async def certify_calib(self, image_type, job_id_calib): process = await asyncio.create_subprocess_shell(cmd) stdout, stderr = await process.communicate() self.log.debug(f"Process returned: {process.returncode}") + if process.returncode != 0: self.log.debug(stdout) self.log.error(stderr) @@ -1547,6 +1555,7 @@ async def run_block(self): # Basic sets of calibrations first : biases, darks, and flats. # After the loop is done, do defects and PTC. for im_type in image_types: + # 1. Take images with the instrument, only for "BIAS, # "DARK", or "FLAT". if im_type == "BIAS": @@ -1557,8 +1566,8 @@ async def run_block(self): await self.checkpoint(f"Taking {self.config.n_flat} flats.") # TODO: Before taking flats with LATISS (and also - # with LSSTComCam), check that the telescope is in - # position to do so. See DM-31496, DM-31497. + # with LSSTComCam), check that the telescope is in + # position to do so. See DM-31496, DM-31497. exposure_ids_list = await self.take_images(im_type) # Discard the first N exposures taken (DM-36422) @@ -1570,100 +1579,9 @@ async def run_block(self): f"Images taken: {self.exposure_ids[im_type]}; type: {im_type}" ) - if self.config.generate_calibrations: - # 2. Call the calibration pipetask via the OCPS - # to make a combined - self.log.info( - "Generating calibration from the images taken " - "as part of this script." - ) - response_ocps_calib_pipetask = await self.call_pipetask(im_type) - job_id_calib = response_ocps_calib_pipetask["jobId"] - else: - self.log.info( - f"A combined {im_type} will not be generated from the " - "images taken as part of this script. Any needed input " - "calibrations by the verification pipetasks will be " - "sought in their input calibrations." - ) - job_id_calib = None - - # 3. Verify the combined calibration (implemented so far for bias, - # dark, and flat), and certify it if the verification - # tests pass and it was generated. - if self.config.do_verify: - try: - if self.config.generate_calibrations: - response_ocps_verify_pipetask = await self.verify_calib( - im_type, job_id_calib - ) - # Check that the task running cp_verify - # did not fail. - job_id_verify = response_ocps_verify_pipetask["jobId"] - # Check verification statistics - report_check_verify_stats = await self.check_verification_stats( - im_type, job_id_verify, job_id_calib - ) - # Inform the user about the results from - # running cp_verify. - # TODO: If verification failed, issue an - # alarm in the watcher: DM-33898. - await self.analyze_report_check_verify_stats( - im_type, - report_check_verify_stats, - job_id_verify, - job_id_calib, - ) - # If the verification tests passed, - # certify the combined calibrations. - if report_check_verify_stats["CERTIFY_CALIB"]: - await self.certify_calib(im_type, job_id_calib) - # If tests did not pass, end the loop, as - # certified calibrations are needed to cons - # construct subsequent calibrations - # (bias->dark->flat). - else: - break - else: - # If combined calibrations are not being generated - # from the individual images just taken, and if - # do_verify=True, the verification task - # will run the tests using calibrations in its - # input collections as reference. - # Note that there is no certification of combined - # calibrations here, because we are not generating - # them. - # job_id_calib should be None - assert job_id_calib is None, "'job_id_calib' is not 'None'." - response_ocps_verify_pipetask = await self.verify_calib( - im_type, job_id_calib - ) - job_id_verify = response_ocps_verify_pipetask["jobId"] - # Check verification statistics - report_check_verify_stats = await self.check_verification_stats( - im_type, job_id_verify, job_id_calib - ) - # Inform the user about the results from running - # cp_verify. - # TODO: If verification failed, issue an alarm - # in the watcher: DM-33898 - await self.analyze_report_check_verify_stats( - im_type, - report_check_verify_stats, - job_id_verify, - job_id_calib, - ) - except Exception: - self.log.exception("Error in do_verify. Ignoring...") - # do verify is False - else: - if self.config.generate_calibrations: - self.log.info( - "'do_verify' is set to 'False' and " - "'generate_calibrations' to 'True'. " - f"{im_type} will be automatically certified." - ) - await self.certify_calib(im_type, job_id_calib) + # Create a task that processes calibration and verification + task = asyncio.create_task(self.process_images(im_type)) + self.background_tasks.append(task) # After taking the basic images (biases, darks, and flats) do # defects and PTC if requested. @@ -1683,26 +1601,252 @@ async def run_block(self): if len(calib_types): for calib_type in calib_types: - try: - # Run the pipetask - response_ocps_calib_pipetask = await self.call_pipetask(calib_type) - job_id_calib = response_ocps_calib_pipetask["jobId"] - # Certify the calibrations in self.config.calib_collection - # The quick gain estimation does not need to be certified. - self.log.info( - f"Verification for {calib_type} is not implemented yet " - f"in this script. {calib_type} will be automatically certified." - ) - if calib_type != "GAIN": - await self.certify_calib(calib_type, job_id_calib) + task = asyncio.create_task(self.process_calibration(calib_type)) + self.background_tasks.append(task) - self.log.info(f"{calib_type} generation job ID: {job_id_calib}") + await self.checkpoint("Data-taking part completed.") - # Report the estimated gain from each pair of flats - if calib_type in ["GAIN", "PTC"]: - await self.report_gains_from_flat_pairs(job_id_calib) - except Exception: - self.log.exception(f"Error processing {calib_type}. Ignoring...") + await self.wait_for_background_tasks() + + async def process_images(self, im_type): + """ + Generate and optionally verify and certify calibrations for a + given image type. + + Parameters + ---------- + im_type : `str` + The type of image or calibration to process. One of + ["BIAS", "DARK", "FLAT"]. + + Raises + ------ + Exception + If an error occurs during verification, it is logged and + ignored. + + Notes + ----- + - If `generate_calibrations` is set to `True`, the method + will generate combined calibrations using the + `call_pipetask` method. + - If `do_verify` is `True`, the method will initiate + verification of the generated calibrations. + - If `do_verify` is `False` and calibrations are generated, + the calibration will be automatically certified without + verification. + - If an error occurs during verification, it is logged and + ignored. + """ + + if self.config.generate_calibrations: + self.log.info( + "Generating calibration from the images taken " + "as part of this script." + ) + response_ocps_calib_pipetask = await self.call_pipetask(im_type) + job_id_calib = response_ocps_calib_pipetask["jobId"] + else: + self.log.info( + f"A combined {im_type} will not be generated from the " + "images taken as part of this script. Any needed input " + "calibrations by the verification pipetasks will be " + "sought in their input calibrations." + ) + job_id_calib = None + + if self.config.do_verify: + try: + await self.process_verification(im_type, job_id_calib) + except Exception: + self.log.exception("Error in do_verify. Ignoring...") + else: + if self.config.generate_calibrations: + self.log.info( + "'do_verify' is set to 'False' and " + "'generate_calibrations' to 'True'. " + f"{im_type} will be automatically certified." + ) + + await self.certify_calib(im_type, job_id_calib) + + async def process_verification(self, im_type, job_id_calib): + """ + Verify and certify the generated calibration for a given + image type. + + Parameters + ---------- + im_type : `str` + The type of image or calibration to verify. One of + ["BIAS", "DARK", "FLAT"]. + + job_id_calib : `str` or `None` + The job ID returned by OCPS during the calibration + generation pipetask call. + If `None`, the verification will use reference + calibrations from input collections. + + Returns + ------- + report_check_verify_stats : `dict` + Dictionary containing the results of the verification + checks, including whether the calibration should be + certified, any statistical errors, failure thresholds, + and the raw verification statistics. + + Raises + ------ + Exception + Logs and handles any exceptions that occur during the + verification process. + + Notes + ----- + - Verification involves running the `verify_calib` method + to execute the verification pipetask. + - The method analyzes verification statistics to determine + if the calibration meets the certification criteria. + """ + try: + self.log.info(f"Starting verification for {im_type}.") + + response_ocps_verify_pipetask = await self.verify_calib( + im_type, job_id_calib + ) + # Check that the task running cp_verify + # did not fail. + job_id_verify = response_ocps_verify_pipetask["jobId"] + + report_check_verify_stats = await self.check_verification_stats( + im_type, job_id_verify, job_id_calib + ) + # Inform the user about the results from + # running cp_verify. + # TODO: If verification failed, issue an + # alarm in the watcher: DM-33898. + await self.analyze_report_check_verify_stats( + im_type, + report_check_verify_stats, + job_id_verify, + job_id_calib, + ) + + if job_id_calib is not None and report_check_verify_stats["CERTIFY_CALIB"]: + await self.certify_calib(im_type, job_id_calib) + + return report_check_verify_stats + # Note: Since we are not generating calibrations, we don't certify + except Exception as e: + self.log.exception(f"Error in processing verification for {im_type}: {e}") + + async def process_calibration(self, calib_type): + """ + Generate and certify a specific type of calibration. + + This method handles the generation of a specific calibration + type using the `call_pipetask` method. After generating the + calibration, it automatically certifies the calibration unless + the calibration type is "GAIN", which does not require + certification. Additionally, it reports the estimated gains + from flat pairs for applicable calibration types. + + Parameters + ---------- + calib_type : `str` + The type of calibration to process. Supported types are: + - "DEFECTS" + - "PTC" + - "GAIN" + + Raises + ------ + Exception + If an error occurs during the calibration generation or + certification process. + + Notes + ----- + - The "GAIN" calibration type does not require certification. + - Gain estimation from flat pairs is performed for "GAIN" and + "PTC" calibration types. + - Verification for calibrations is not implemented in this + method and thus not required. + """ + try: + self.log.info(f"Starting calibration processing for {calib_type}.") + response_ocps_calib_pipetask = await self.call_pipetask(calib_type) + job_id_calib = response_ocps_calib_pipetask["jobId"] + # Certify the calibrations in self.config.calib_collection + # The quick gain estimation does not need to be certified. + self.log.info( + f"Verification for {calib_type} is not implemented yet " + f"in this script. {calib_type} will be automatically certified." + ) + if calib_type != "GAIN": + await self.certify_calib(calib_type, job_id_calib) + + self.log.info(f"{calib_type} generation job ID: {job_id_calib}") + + # Report the estimated gain from each pair of flats + if calib_type in ["GAIN", "PTC"]: + await self.report_gains_from_flat_pairs(job_id_calib) + except Exception as e: + self.log.exception(f"Error processing {calib_type}: {e}") + + async def wait_for_background_tasks(self): + """ + Await the completion of all background calibration and + verification tasks. + + This method waits for all background tasks (calibration and + verification processes) to complete within a specified + timeout. If the tasks do not complete within the timeout + period, it attempts to cancel any remaining unfinished tasks. + + Raises + ------ + asyncio.TimeoutError + If the background tasks do not complete within the + allotted timeout. + Exception + Logs and handles any other exceptions that occur while + waiting for tasks. + + Notes + ----- + - The timeout is calculated based on the + `background_task_timeout` configuration parameter multiplied + by the number of background tasks. + - Upon a timeout, the method logs a warning and cancels any + tasks that are still pending. + - After handling timeouts or exceptions, the list of + background tasks is cleared. + """ + self.log.info("Waiting for background tasks to complete.") + try: + # Note that when aysncio.wait_for times out, it cancels the task + # it's waiting on. If the task being waited on is an asyncio.gather + # instance, it propagates the cancellation to all the tasks it has + # gathered. + await asyncio.wait_for( + asyncio.gather(*self.background_tasks, return_exceptions=True), + timeout=self.config.background_task_timeout + * len(self.background_tasks), + ) + self.log.info("All background tasks have completed.") + except asyncio.TimeoutError: + self.log.warning("Background tasks did not complete before timeout.") + for task in self.background_tasks: + # all tasks should be done/cancelled at this point + if not task.done(): + # this code should never be reached. + self.log.warning(f"Cancelling task {task}") + task.cancel() + except Exception as e: + self.log.exception(f"Error in background tasks: {e}") + finally: + self.background_tasks = [] @staticmethod def get_exposure_id(obsid): diff --git a/tests/test_base_make_calibrations.py b/tests/test_base_make_calibrations.py new file mode 100644 index 00000000..09a5f160 --- /dev/null +++ b/tests/test_base_make_calibrations.py @@ -0,0 +1,385 @@ +# This file is part of ts_externalscripts +# +# Developed for the LSST Telescope and Site Systems. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +import logging +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +import yaml +from lsst.ts import standardscripts +from lsst.ts.externalscripts.base_make_calibrations import BaseMakeCalibrations + +logger = logging.getLogger(__name__) +logger.propagate = True + + +class TestMakeCalibrations( + standardscripts.BaseScriptTestCase, unittest.IsolatedAsyncioTestCase +): + async def basic_make_script(self, index): + self.script = TestBaseMakeCalibrations(index=index) + return (self.script,) + + @unittest.mock.patch( + "lsst.ts.standardscripts.BaseBlockScript.obs_id", "202306060001" + ) + async def test_configure(self): + async with self.make_script(): + n_bias = 2 + n_dark = 2 + exp_times_dark = 10 + n_flat = 4 + exp_times_flat = [10, 10, 50, 50] + detectors = [0, 1, 2] + n_processes = 4 + program = "BLOCK-123" + reason = "SITCOM-321" + + self.script.get_obs_id = unittest.mock.AsyncMock( + side_effect=["202306060001"] + ) + + await self.configure_script( + n_bias=n_bias, + n_dark=n_dark, + n_flat=n_flat, + exp_times_dark=exp_times_dark, + exp_times_flat=exp_times_flat, + detectors=detectors, + n_processes=n_processes, + program=program, + reason=reason, + script_mode="BIAS_DARK_FLAT", + generate_calibrations=True, + do_verify=True, + ) + + assert self.script.config.n_bias == n_bias + assert self.script.config.n_dark == n_dark + assert self.script.config.n_flat == n_flat + assert self.script.config.exp_times_dark == exp_times_dark + assert self.script.config.exp_times_flat == exp_times_flat + assert self.script.config.n_processes == n_processes + assert self.script.config.detectors == detectors + assert self.script.program == program + assert self.script.reason == reason + + assert ( + # from the configure method in BaseBlockScript + self.script.checkpoint_message + == "TestBaseMakeCalibrations BLOCK-123 202306060001 SITCOM-321" + ) + + async def test_process_images_calib_no_verify(self): + async with self.make_script(): + await self.configure_script( + n_bias=2, + n_dark=2, + n_flat=4, + exp_times_dark=10, + exp_times_flat=[10, 10, 50, 50], + n_processes=4, + program="BLOCK-123", + reason="SITCOM-321", + script_mode="BIAS_DARK_FLAT", + generate_calibrations=True, + do_verify=False, + ) + + self.script.call_pipetask = AsyncMock( + return_value={"jobId": "job_calib_123"} + ) + self.script.process_verification = AsyncMock() + self.script.certify_calib = AsyncMock() + self.script.process_verification = AsyncMock() + + im_type = "BIAS" + await self.script.process_images(im_type) + self.script.call_pipetask.assert_called_with(im_type) + self.script.process_verification.assert_not_called() + self.script.certify_calib.assert_called_with(im_type, "job_calib_123") + + async def test_process_images_calib_and_verify(self): + async with self.make_script(): + await self.configure_script( + n_bias=2, + n_dark=2, + n_flat=4, + exp_times_dark=10, + exp_times_flat=[10, 10, 50, 50], + n_processes=4, + program="BLOCK-123", + reason="SITCOM-321", + script_mode="BIAS_DARK_FLAT", + generate_calibrations=True, + do_verify=True, + ) + + self.script.call_pipetask = AsyncMock( + return_value={"jobId": "job_calib_123"} + ) + self.script.certify_calib = AsyncMock() + self.script.verify_calib = AsyncMock( + return_value={"jobId": "verify_job123"} + ) + + # Wrap the real process_verification method + original_process_verification = self.script.process_verification + self.script.process_verification = AsyncMock( + wraps=original_process_verification + ) + self.script.check_verification_stats = AsyncMock( + return_value={"CERTIFY_CALIB": True} + ) + self.script.analyze_report_check_verify_stats = AsyncMock() + + im_type = "BIAS" + await self.script.process_images(im_type) + self.script.call_pipetask.assert_called_with(im_type) + self.script.process_verification.assert_called_with( + im_type, "job_calib_123" + ) + self.script.certify_calib.assert_called_with(im_type, "job_calib_123") + + async def test_wait_for_background_tasks(self): + async with self.make_script(): + + # Create a future that will not complete + async def long_running_task(): + await asyncio.sleep(100) # Sleep longer than the timeout + + mock_task = asyncio.create_task(long_running_task()) + self.script.background_tasks = [mock_task] + + await self.configure_script( + n_bias=2, + n_dark=2, + n_flat=4, + exp_times_dark=10, + exp_times_flat=[10, 10, 50, 50], + n_processes=4, + program="BLOCK-123", + reason="SITCOM-321", + script_mode="BIAS_DARK_FLAT", + generate_calibrations=True, + do_verify=True, + ) + + # Set a short timeout to force timeout + self.script.config.background_task_timeout = 0.1 + + with patch.object( + self.script.log, "warning", new=MagicMock() + ) as mock_log_warning: + await self.script.wait_for_background_tasks() + + assert mock_task.cancelled(), "Task was not cancelled" + + assert self.script.background_tasks == [] + + mock_log_warning.assert_called_with( + "Background tasks did not " "complete before timeout." + ) + + async def test_certify_calib_failure(self): + async with self.make_script(): + await self.configure_script( + n_bias=2, + n_dark=2, + n_flat=4, + exp_times_dark=10, + exp_times_flat=[10, 10, 50, 50], + n_processes=4, + program="BLOCK-123", + reason="SITCOM-321", + script_mode="BIAS_DARK_FLAT", + generate_calibrations=True, + do_verify=True, + ) + + self.script.certify_calib_failed = False + + self.script.call_pipetask = AsyncMock(return_value={"jobId": "job123"}) + + self.script.ocps.cmd_execute.set_start = AsyncMock( + return_value=MagicMock(result='{"job_id": "job123"}') + ) + + self.script.ocps.evt_job_result.next = AsyncMock( + return_value=MagicMock(result='{"jobId": "job123"}') + ) + + self.script.verify_calib = AsyncMock( + return_value={"jobId": "verify_job123"} + ) + + self.script.check_verification_stats = AsyncMock( + return_value={"CERTIFY_CALIB": False} + ) + + self.script.analyze_report_check_verify_stats = AsyncMock() + + with patch.object( + self.script.log, "exception", new=MagicMock() + ) as mock_log_exception: + await self.script.process_images("BIAS") + + # Verify that exception was not logged + # (since failure is handled gracefully) + mock_log_exception.assert_not_called() + + +class TestBaseMakeCalibrations(BaseMakeCalibrations): + def __init__(self, index=1): + super().__init__( + index=index, + descr="Test script", + ) + + @classmethod + def get_schema(cls): + url = "https://github.com/lsst-ts/" + path = ( + "ts_externalscripts/blob/main/python/lsst/ts/externalscripts/" + "/make_comcam_calibrations.py" + ) + schema = f""" + $schema: http://json-schema.org/draft-07/schema# + $id: {url}/{path} + title: MakeComCamCalibrations v1 + description: Configuration for making a LSSTComCam combined calibrations SAL Script. + type: object + properties: + detectors: + description: Detector IDs. If omitted, all 9 LSSTComCam detectors \ + will be used. + type: array + items: + - type: integer + minContains: 0 + maxContains: 8 + minItems: 0 + maxItems: 9 + uniqueItems: true + default: [] + filter: + description: Filter name or ID; if omitted the filter is not changed. + anyOf: + - type: string + - type: integer + minimum: 1 + - type: "null" + default: null + input_collections_bias: + type: string + descriptor: Additional comma-separated input collections to pass to the bias pipetask. + default: "LSSTComCam/calib" + input_collections_verify_bias: + type: string + descriptor: Additional comma-separated input collections to pass to \ + the verify (bias) pipetask. + default: "LSSTComCam/calib" + input_collections_dark: + type: string + descriptor: Additional comma-separarted input collections to pass to the dark pipetask. + default: "LSSTComCam/calib" + input_collections_verify_dark: + type: string + descriptor: Additional comma-separated input collections to pass to \ + the verify (dark) pipetask. + default: "LSSTComCam/calib" + input_collections_flat: + type: string + descriptor: Additional comma-separated input collections to pass to the flat pipetask. + default: "LSSTComCam/calib" + input_collections_verify_flat: + type: string + descriptor: Additional comma-separated input collections to pass to \ + the verify (flat) pipetask. + default: "LSSTComCam/calib" + input_collections_defects: + type: string + descriptor: Additional comma-separated input collections to pass to the defects pipetask. + default: "LSSTComCam/calib" + input_collections_ptc: + type: string + descriptor: Additional comma-separated input collections to pass to the \ + Photon Transfer Curve pipetask. + default: "LSSTComCam/calib" + calib_collection: + type: string + descriptor: Calibration collection where combined calibrations will be certified into. + default: "LSSTComCam/calib/daily" + repo: + type: string + descriptor: Butler repository. + default: "/repo/LSSTComCam" + additionalProperties: false + """ + schema_dict = yaml.safe_load(schema) + base_schema_dict = super().get_schema() + + for properties in base_schema_dict["properties"]: + schema_dict["properties"][properties] = base_schema_dict["properties"][ + properties + ] + + return schema_dict + + @property + def ocps_group(self): + return MagicMock() + + @property + def ocps(self): + return MagicMock() + + @property + def camera(self): + return MagicMock() + + def get_instrument_configuration(self): + return {} + + @property + def instrument_name(self): + return "TestInstrument" + + @property + def pipeline_instrument(self): + return "TestPipelineInstrument" + + @property + def detectors(self): + return [0, 1, 2] + + @property + def n_detectors(self): + return 3 + + @property + def image_in_oods(self): + return MagicMock() + + +if __name__ == "__main__": + unittest.main()