diff --git a/.gitignore b/.gitignore index 03321b53b..a3392e04f 100644 --- a/.gitignore +++ b/.gitignore @@ -110,6 +110,7 @@ venv.bak/ # idea .idea/ +*.iml # Sphinx docs/_build/ diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index 4d0ad2d2d..62e7968a4 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -4,7 +4,6 @@ """ import dataclasses -import os import platform import time import uuid @@ -150,6 +149,7 @@ def __init__( logging_logger: Optional[LoggerOutput] = _sentinel, save_to_prometheus: Optional[bool] = _sentinel, prometheus_url: Optional[str] = _sentinel, + output_handlers: Optional[List[BaseOutput]] = _sentinel, gpu_ids: Optional[List] = _sentinel, emissions_endpoint: Optional[str] = _sentinel, experiment_id: Optional[str] = _sentinel, @@ -235,6 +235,7 @@ def __init__( self._set_from_conf(logging_logger, "logging_logger") self._set_from_conf(save_to_prometheus, "save_to_prometheus", False, bool) self._set_from_conf(prometheus_url, "prometheus_url", "localhost:9091") + self._set_from_conf(output_handlers, "output_handlers", []) self._set_from_conf(tracking_mode, "tracking_mode", "machine") self._set_from_conf(on_csv_write, "on_csv_write", "append") self._set_from_conf(logger_preamble, "logger_preamble", "") @@ -400,38 +401,35 @@ def _init_output_methods(self, api_key): """ Prepare the different output methods """ - self.persistence_objs: List[BaseOutput] = list() - if self._save_to_file: - self.persistence_objs.append( + self._output_handlers.append( FileOutput( - os.path.join(self._output_dir, self._output_file), + self._output_file, + self._output_dir, self._on_csv_write, ) ) if self._save_to_logger: - self.persistence_objs.append(self._logging_logger) + self._output_handlers.append(self._logging_logger) if self._emissions_endpoint: - self.persistence_objs.append(HTTPOutput(self._emissions_endpoint)) + self._output_handlers.append(HTTPOutput(self._emissions_endpoint)) if self._save_to_api: - self._cc_api__out = CodeCarbonAPIOutput( + cc_api__out = CodeCarbonAPIOutput( endpoint_url=self._api_endpoint, experiment_id=self._experiment_id, api_key=api_key, conf=self._conf, ) - self.run_id = self._cc_api__out.run_id - self.persistence_objs.append(self._cc_api__out) - + self.run_id = cc_api__out.run_id + self._output_handlers.append(cc_api__out) else: self.run_id = uuid.uuid4() if self._save_to_prometheus: - self._cc_prometheus_out = PrometheusOutput(self._prometheus_url) - self.persistence_objs.append(self._cc_prometheus_out) + self._output_handlers.append(PrometheusOutput(self._prometheus_url)) def service_shutdown(self, signum, frame): print("Caught signal %d" % signum) @@ -477,7 +475,8 @@ def start_task(self, task_name=None) -> None: # Read initial energy for hardware for hardware in self._hardware: hardware.start() - _ = self._prepare_emissions_data(delta=True) + _ = self._prepare_emissions_data() + _ = self._compute_emissions_delta(_) self._tasks.update( { @@ -497,13 +496,14 @@ def stop_task(self, task_name: str = None) -> float: task_name = task_name if task_name else self._active_task self._measure_power_and_energy() - emissions_data = self._prepare_emissions_data(delta=True) + emissions_data = self._prepare_emissions_data() + emissions_data_delta = self._compute_emissions_delta(emissions_data) task_duration = Time.from_seconds( time.time() - self._tasks[task_name].start_time ) - task_emission_data = emissions_data + task_emission_data = emissions_data_delta task_emission_data.duration = task_duration.seconds self._tasks[task_name].emissions_data = task_emission_data self._tasks[task_name].is_active = False @@ -526,7 +526,11 @@ def flush(self) -> Optional[float]: self._measure_power_and_energy() emissions_data = self._prepare_emissions_data() - self._persist_data(emissions_data) + emissions_data_delta = self._compute_emissions_delta(emissions_data) + + self._persist_data( + total_emissions=emissions_data, delta_emissions=emissions_data_delta + ) return emissions_data.emissions @@ -554,29 +558,34 @@ def stop(self) -> Optional[float]: self._measure_power_and_energy() emissions_data = self._prepare_emissions_data() + emissions_data_delta = self._compute_emissions_delta(emissions_data) - self._persist_data(emissions_data, experiment_name=self._experiment_name) + self._persist_data( + total_emissions=emissions_data, + delta_emissions=emissions_data_delta, + experiment_name=self._experiment_name, + ) self.final_emissions_data = emissions_data self.final_emissions = emissions_data.emissions return emissions_data.emissions - def _persist_data(self, emissions_data, experiment_name=None): - for persistence in self.persistence_objs: - if isinstance(persistence, CodeCarbonAPIOutput): - emissions_data = self._prepare_emissions_data(delta=True) - - persistence.out(emissions_data) - if isinstance(persistence, FileOutput): - if len(self._tasks) > 0: - task_emissions_data = [] - for task in self._tasks: - task_emissions_data.append(self._tasks[task].out()) - persistence.task_out( - task_emissions_data, experiment_name, self._output_dir - ) + def _persist_data( + self, + total_emissions: EmissionsData, + delta_emissions: EmissionsData, + experiment_name=None, + ): + task_emissions_data = [] + for task in self._tasks: + task_emissions_data.append(self._tasks[task].out()) - def _prepare_emissions_data(self, delta=False) -> EmissionsData: + for handler in self._output_handlers: + handler.out(total_emissions, delta_emissions) + if len(task_emissions_data) > 0: + handler.task_out(task_emissions_data, experiment_name) + + def _prepare_emissions_data(self) -> EmissionsData: """ :delta: If 'True', return only the delta comsumption since the last call. """ @@ -636,21 +645,23 @@ def _prepare_emissions_data(self, delta=False) -> EmissionsData: tracking_mode=self._conf.get("tracking_mode"), pue=self._pue, ) - if delta: - if self._previous_emissions is None: - self._previous_emissions = total_emissions - else: - # Create a copy - delta_emissions = dataclasses.replace(total_emissions) - # Compute emissions rate from delta - delta_emissions.compute_delta_emission(self._previous_emissions) - # TODO : find a way to store _previous_emissions only when - # TODO : the API call succeeded - self._previous_emissions = total_emissions - total_emissions = delta_emissions logger.debug(total_emissions) return total_emissions + def _compute_emissions_delta(self, total_emissions: EmissionsData) -> EmissionsData: + delta_emissions: EmissionsData = total_emissions + if self._previous_emissions is None: + self._previous_emissions = total_emissions + else: + # Create a copy + delta_emissions = dataclasses.replace(total_emissions) + # Compute emissions rate from delta + delta_emissions.compute_delta_emission(self._previous_emissions) + # TODO : find a way to store _previous_emissions only when + # TODO : the API call succeeded + self._previous_emissions = total_emissions + return delta_emissions + @abstractmethod def _get_geo_metadata(self) -> GeoMetadata: """ @@ -742,19 +753,19 @@ def _measure_power_and_energy(self) -> None: self._last_measured_time = time.time() self._measure_occurrence += 1 if ( - self._cc_api__out is not None or self._cc_prometheus_out is not None - ) and self._api_call_interval != -1: - if self._measure_occurrence >= self._api_call_interval: - emissions = self._prepare_emissions_data(delta=True) - logger.info( - f"{emissions.emissions_rate * 1000:.6f} g.CO2eq/s mean an estimation of " - + f"{emissions.emissions_rate * 3600 * 24 * 365:,} kg.CO2eq/year" - ) - if self._cc_api__out: - self._cc_api__out.out(emissions) - if self._cc_prometheus_out: - self._cc_prometheus_out.out(emissions) - self._measure_occurrence = 0 + self._api_call_interval != -1 + and len(self._output_handlers) > 0 + and self._measure_occurrence >= self._api_call_interval + ): + emissions = self._prepare_emissions_data() + emissions_delta = self._compute_emissions_delta(emissions) + logger.info( + f"{emissions_delta.emissions_rate * 1000:.6f} g.CO2eq/s mean an estimation of " + + f"{emissions_delta.emissions_rate * 3600 * 24 * 365:,} kg.CO2eq/year" + ) + for handler in self._output_handlers: + handler.live_out(emissions, emissions_delta) + self._measure_occurrence = 0 logger.debug(f"last_duration={last_duration}\n------------------------") def __enter__(self): @@ -924,6 +935,7 @@ def track_emissions( save_to_logger: Optional[bool] = _sentinel, save_to_prometheus: Optional[bool] = _sentinel, prometheus_url: Optional[str] = _sentinel, + output_handlers: Optional[List[BaseOutput]] = _sentinel, logging_logger: Optional[LoggerOutput] = _sentinel, offline: Optional[bool] = _sentinel, emissions_endpoint: Optional[str] = _sentinel, @@ -1002,6 +1014,7 @@ def wrapped_fn(*args, **kwargs): save_to_logger=save_to_logger, save_to_prometheus=save_to_prometheus, prometheus_url=prometheus_url, + output_handlers=output_handlers, logging_logger=logging_logger, country_iso_code=country_iso_code, region=region, @@ -1023,6 +1036,7 @@ def wrapped_fn(*args, **kwargs): save_to_logger=save_to_logger, save_to_prometheus=save_to_prometheus, prometheus_url=prometheus_url, + output_handlers=output_handlers, logging_logger=logging_logger, gpu_ids=gpu_ids, log_level=log_level, diff --git a/codecarbon/output_methods/base_output.py b/codecarbon/output_methods/base_output.py index 6b7ea85e9..4b152c29b 100644 --- a/codecarbon/output_methods/base_output.py +++ b/codecarbon/output_methods/base_output.py @@ -1,15 +1,24 @@ -from abc import ABC, abstractmethod +from typing import List -from codecarbon.output_methods.emissions_data import EmissionsData +from codecarbon.output_methods.emissions_data import EmissionsData, TaskEmissionsData -class BaseOutput(ABC): +class BaseOutput: """ - An abstract class that requires children to inherit a single method, - `out` which is used for persisting data. This could be by saving it to a file, - posting to Json Box, saving to a database, sending a slack message etc. + An abstract class defining possible contracts for an output strategy, a strategy implementation can save emissions + data to a file, posting to Json Box, saving to a database, sending a Slack message etc. + Each method is responsible for a different part of the EmissionsData lifecycle: + - `out` is used by termination calls such as emissions_tracker.flush and emissions_tracker.stop + - `live_out` is used by live measurement events, e.g. the iterative update of prometheus metrics + - `task_out` is used by terminate calls such as emissions_tracker.flush and emissions_tracker.stop, but uses + emissions segregated by task """ - @abstractmethod - def out(self, data: EmissionsData): + def out(self, total: EmissionsData, delta: EmissionsData): + pass + + def live_out(self, total: EmissionsData, delta: EmissionsData): + pass + + def task_out(self, data: List[TaskEmissionsData], experiment_name: str): pass diff --git a/codecarbon/output_methods/file.py b/codecarbon/output_methods/file.py index dba6dd91d..94b006d49 100644 --- a/codecarbon/output_methods/file.py +++ b/codecarbon/output_methods/file.py @@ -15,14 +15,18 @@ class FileOutput(BaseOutput): Saves experiment artifacts to a file """ - def __init__(self, save_file_path: str, on_csv_write: str = "append"): + def __init__( + self, output_file_name: str, output_dir: str, on_csv_write: str = "append" + ): if on_csv_write not in {"append", "update"}: raise ValueError( f"Unknown `on_csv_write` value: {on_csv_write}" + " (should be one of 'append' or 'update'" ) + self.output_file_name: str = output_file_name + self.output_dir: str = output_dir self.on_csv_write: str = on_csv_write - self.save_file_path: str = save_file_path + self.save_file_path = os.path.join(self.output_dir, self.output_file_name) logger.info( f"Saving emissions data to file {os.path.abspath(self.save_file_path)}" ) @@ -34,42 +38,42 @@ def has_valid_headers(self, data: EmissionsData): list_of_column_names = list(dict_from_csv.keys()) return list(data.values.keys()) == list_of_column_names - def out(self, data: EmissionsData): + def out(self, total: EmissionsData, delta: EmissionsData): file_exists: bool = os.path.isfile(self.save_file_path) - if file_exists and not self.has_valid_headers(data): + if file_exists and not self.has_valid_headers(total): logger.info("Backing up old emission file") backup(self.save_file_path) file_exists = False if not file_exists: - df = pd.DataFrame(columns=data.values.keys()) - df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])]) + df = pd.DataFrame(columns=total.values.keys()) + df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])]) elif self.on_csv_write == "append": df = pd.read_csv(self.save_file_path) - df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])]) + df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])]) else: df = pd.read_csv(self.save_file_path) - df_run = df.loc[df.run_id == data.run_id] + df_run = df.loc[df.run_id == total.run_id] if len(df_run) < 1: - df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])]) + df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])]) elif len(df_run) > 1: logger.warning( f"CSV contains more than 1 ({len(df_run)})" - + f" rows with current run ID ({data.run_id})." + + f" rows with current run ID ({total.run_id})." + "Appending instead of updating." ) - df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])]) + df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])]) else: - df.at[df.run_id == data.run_id, data.values.keys()] = ( - data.values.values() + df.at[df.run_id == total.run_id, total.values.keys()] = ( + total.values.values() ) df.to_csv(self.save_file_path, index=False) - def task_out(self, data: List[TaskEmissionsData], experiment_name: str, output_dir): + def task_out(self, data: List[TaskEmissionsData], experiment_name: str): run_id = data[0].run_id save_task_file_path = os.path.join( - output_dir, "emissions_" + experiment_name + "_" + run_id + ".csv" + self.output_dir, "emissions_" + experiment_name + "_" + run_id + ".csv" ) df = pd.DataFrame(columns=data[0].values.keys()) df = pd.concat( diff --git a/codecarbon/output_methods/http.py b/codecarbon/output_methods/http.py index 55580a381..bbe53a0f1 100644 --- a/codecarbon/output_methods/http.py +++ b/codecarbon/output_methods/http.py @@ -19,9 +19,9 @@ class HTTPOutput(BaseOutput): def __init__(self, endpoint_url: str): self.endpoint_url: str = endpoint_url - def out(self, data: EmissionsData): + def out(self, total: EmissionsData, delta: EmissionsData): try: - payload = dataclasses.asdict(data) + payload = dataclasses.asdict(total) payload["user"] = getpass.getuser() resp = requests.post(self.endpoint_url, json=payload, timeout=10) if resp.status_code != 201: @@ -50,8 +50,8 @@ def __init__(self, endpoint_url: str, experiment_id: str, api_key: str, conf): ) self.run_id = self.api.run_id - def out(self, data: EmissionsData): + def out(self, total: EmissionsData, delta: EmissionsData): try: - self.api.add_emission(dataclasses.asdict(data)) + self.api.add_emission(dataclasses.asdict(delta)) except Exception as e: logger.error(e, exc_info=True) diff --git a/codecarbon/output_methods/logger.py b/codecarbon/output_methods/logger.py index a40029d97..e0b7a792b 100644 --- a/codecarbon/output_methods/logger.py +++ b/codecarbon/output_methods/logger.py @@ -16,22 +16,28 @@ def __init__(self, logger, severity=logging.INFO): self.logger = logger self.logging_severity = severity - def out(self, data: EmissionsData): + def out(self, total: EmissionsData, delta: EmissionsData): try: - payload = dataclasses.asdict(data) + payload = dataclasses.asdict(total) self.logger.log(self.logging_severity, msg=json.dumps(payload)) except Exception as e: logger.error(e, exc_info=True) + def live_out(self, total: EmissionsData, delta: EmissionsData): + self.out(total, delta) + class GoogleCloudLoggerOutput(LoggerOutput): """ Send emissions data to GCP Cloud Logging """ - def out(self, data: EmissionsData): + def out(self, total: EmissionsData, delta: EmissionsData): try: - payload = dataclasses.asdict(data) + payload = dataclasses.asdict(total) self.logger.log_struct(payload, severity=self.logging_severity) except Exception as e: logger.error(e, exc_info=True) + + def live_out(self, total: EmissionsData, delta: EmissionsData): + self.out(total, delta) diff --git a/codecarbon/output_methods/metrics/prometheus/prometheus.py b/codecarbon/output_methods/metrics/prometheus/prometheus.py index b47cc0e3f..70946ec51 100644 --- a/codecarbon/output_methods/metrics/prometheus/prometheus.py +++ b/codecarbon/output_methods/metrics/prometheus/prometheus.py @@ -30,12 +30,15 @@ class PrometheusOutput(BaseOutput): def __init__(self, prometheus_url: str): self.prometheus_url = prometheus_url - def out(self, data: EmissionsData): + def out(self, total: EmissionsData, delta: EmissionsData): try: - self.add_emission(dataclasses.asdict(data)) + self.add_emission(dataclasses.asdict(delta)) except Exception as e: logger.error(e, exc_info=True) + def live_out(self, total: EmissionsData, delta: EmissionsData): + self.out(total, delta) + def _auth_handler(self, url, method, timeout, headers, data): username = os.getenv("PROMETHEUS_USERNAME") password = os.getenv("PROMETHEUS_PASSWORD") diff --git a/tests/test_custom_handler.py b/tests/test_custom_handler.py new file mode 100644 index 000000000..d46776bf0 --- /dev/null +++ b/tests/test_custom_handler.py @@ -0,0 +1,70 @@ +import time +import unittest +from typing import List + +from codecarbon.emissions_tracker import EmissionsTracker, track_emissions +from codecarbon.output import BaseOutput, EmissionsData + + +def heavy_computation(run_time_secs: int = 3): + end_time: float = time.time() + run_time_secs # Run for `run_time_secs` seconds + while time.time() < end_time: + pass + + +class CustomOutput(BaseOutput): + def __init__(self): + self.log: List[EmissionsData] = list() + + def live_out(self, delta: EmissionsData, total: EmissionsData): + self.log.append(total) + + +class TestCarbonCustomHandler(unittest.TestCase): + def setUp(self) -> None: + self.project_name = "project_TestCarbonCustomHandler" + + def test_carbon_tracker_custom_handler(self): + handler_0 = CustomOutput() + handler_1 = CustomOutput() + tracker = EmissionsTracker( + project_name=self.project_name, + output_handlers=[handler_0, handler_1], + api_call_interval=1, + ) + tracker.start() + heavy_computation(run_time_secs=1) + emissions = tracker.stop() + + assert isinstance(emissions, float) + self.assertNotEqual(emissions, 0.0) + self.assertAlmostEqual(emissions, 6.262572537957655e-05, places=2) + self.verify_custom_handler_state(handler_0) + self.verify_custom_handler_state(handler_1) + + def test_decorator_flush(self): + handler_0 = CustomOutput() + handler_1 = CustomOutput() + + @track_emissions( + project_name=self.project_name, + save_to_logger=True, + output_handlers=[handler_0, handler_1], + api_call_interval=1, + ) + def dummy_train_model(): + heavy_computation(run_time_secs=1) + return 42 + + dummy_train_model() + self.verify_custom_handler_state(handler_0) + self.verify_custom_handler_state(handler_1) + + def verify_custom_handler_state( + self, handler: CustomOutput, expected_lines=1 + ) -> None: + assert len(handler.log) == expected_lines + results = handler.log[0] + self.assertEqual(results.project_name, self.project_name) + self.assertNotEqual(results.emissions, 0.0) + self.assertAlmostEqual(results.emissions, 6.262572537957655e-05, places=2)