Skip to content

Commit

Permalink
output: adding output handler list to emission_tracker for extensible…
Browse files Browse the repository at this point in the history
… output management, rework BaseOutput with separate api contracts for live/cold/task calls (#459)

Co-authored-by: inimaz <[email protected]>
  • Loading branch information
eledhwen and inimaz authored Jun 12, 2024
1 parent a39fa58 commit d0ed60b
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 91 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ venv.bak/

# idea
.idea/
*.iml

# Sphinx
docs/_build/
Expand Down
130 changes: 72 additions & 58 deletions codecarbon/emissions_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""

import dataclasses
import os
import platform
import time
import uuid
Expand Down Expand Up @@ -150,6 +149,7 @@ def __init__(
logging_logger: Optional[LoggerOutput] = _sentinel,
save_to_prometheus: Optional[bool] = _sentinel,
prometheus_url: Optional[str] = _sentinel,
output_handlers: Optional[List[BaseOutput]] = _sentinel,
gpu_ids: Optional[List] = _sentinel,
emissions_endpoint: Optional[str] = _sentinel,
experiment_id: Optional[str] = _sentinel,
Expand Down Expand Up @@ -235,6 +235,7 @@ def __init__(
self._set_from_conf(logging_logger, "logging_logger")
self._set_from_conf(save_to_prometheus, "save_to_prometheus", False, bool)
self._set_from_conf(prometheus_url, "prometheus_url", "localhost:9091")
self._set_from_conf(output_handlers, "output_handlers", [])
self._set_from_conf(tracking_mode, "tracking_mode", "machine")
self._set_from_conf(on_csv_write, "on_csv_write", "append")
self._set_from_conf(logger_preamble, "logger_preamble", "")
Expand Down Expand Up @@ -400,38 +401,35 @@ def _init_output_methods(self, api_key):
"""
Prepare the different output methods
"""
self.persistence_objs: List[BaseOutput] = list()

if self._save_to_file:
self.persistence_objs.append(
self._output_handlers.append(
FileOutput(
os.path.join(self._output_dir, self._output_file),
self._output_file,
self._output_dir,
self._on_csv_write,
)
)

if self._save_to_logger:
self.persistence_objs.append(self._logging_logger)
self._output_handlers.append(self._logging_logger)

if self._emissions_endpoint:
self.persistence_objs.append(HTTPOutput(self._emissions_endpoint))
self._output_handlers.append(HTTPOutput(self._emissions_endpoint))

if self._save_to_api:
self._cc_api__out = CodeCarbonAPIOutput(
cc_api__out = CodeCarbonAPIOutput(
endpoint_url=self._api_endpoint,
experiment_id=self._experiment_id,
api_key=api_key,
conf=self._conf,
)
self.run_id = self._cc_api__out.run_id
self.persistence_objs.append(self._cc_api__out)

self.run_id = cc_api__out.run_id
self._output_handlers.append(cc_api__out)
else:
self.run_id = uuid.uuid4()

if self._save_to_prometheus:
self._cc_prometheus_out = PrometheusOutput(self._prometheus_url)
self.persistence_objs.append(self._cc_prometheus_out)
self._output_handlers.append(PrometheusOutput(self._prometheus_url))

def service_shutdown(self, signum, frame):
print("Caught signal %d" % signum)
Expand Down Expand Up @@ -477,7 +475,8 @@ def start_task(self, task_name=None) -> None:
# Read initial energy for hardware
for hardware in self._hardware:
hardware.start()
_ = self._prepare_emissions_data(delta=True)
_ = self._prepare_emissions_data()
_ = self._compute_emissions_delta(_)

self._tasks.update(
{
Expand All @@ -497,13 +496,14 @@ def stop_task(self, task_name: str = None) -> float:
task_name = task_name if task_name else self._active_task
self._measure_power_and_energy()

emissions_data = self._prepare_emissions_data(delta=True)
emissions_data = self._prepare_emissions_data()
emissions_data_delta = self._compute_emissions_delta(emissions_data)

task_duration = Time.from_seconds(
time.time() - self._tasks[task_name].start_time
)

task_emission_data = emissions_data
task_emission_data = emissions_data_delta
task_emission_data.duration = task_duration.seconds
self._tasks[task_name].emissions_data = task_emission_data
self._tasks[task_name].is_active = False
Expand All @@ -526,7 +526,11 @@ def flush(self) -> Optional[float]:
self._measure_power_and_energy()

emissions_data = self._prepare_emissions_data()
self._persist_data(emissions_data)
emissions_data_delta = self._compute_emissions_delta(emissions_data)

self._persist_data(
total_emissions=emissions_data, delta_emissions=emissions_data_delta
)

return emissions_data.emissions

Expand Down Expand Up @@ -554,29 +558,34 @@ def stop(self) -> Optional[float]:
self._measure_power_and_energy()

emissions_data = self._prepare_emissions_data()
emissions_data_delta = self._compute_emissions_delta(emissions_data)

self._persist_data(emissions_data, experiment_name=self._experiment_name)
self._persist_data(
total_emissions=emissions_data,
delta_emissions=emissions_data_delta,
experiment_name=self._experiment_name,
)

self.final_emissions_data = emissions_data
self.final_emissions = emissions_data.emissions
return emissions_data.emissions

def _persist_data(self, emissions_data, experiment_name=None):
for persistence in self.persistence_objs:
if isinstance(persistence, CodeCarbonAPIOutput):
emissions_data = self._prepare_emissions_data(delta=True)

persistence.out(emissions_data)
if isinstance(persistence, FileOutput):
if len(self._tasks) > 0:
task_emissions_data = []
for task in self._tasks:
task_emissions_data.append(self._tasks[task].out())
persistence.task_out(
task_emissions_data, experiment_name, self._output_dir
)
def _persist_data(
self,
total_emissions: EmissionsData,
delta_emissions: EmissionsData,
experiment_name=None,
):
task_emissions_data = []
for task in self._tasks:
task_emissions_data.append(self._tasks[task].out())

def _prepare_emissions_data(self, delta=False) -> EmissionsData:
for handler in self._output_handlers:
handler.out(total_emissions, delta_emissions)
if len(task_emissions_data) > 0:
handler.task_out(task_emissions_data, experiment_name)

def _prepare_emissions_data(self) -> EmissionsData:
"""
:delta: If 'True', return only the delta comsumption since the last call.
"""
Expand Down Expand Up @@ -636,21 +645,23 @@ def _prepare_emissions_data(self, delta=False) -> EmissionsData:
tracking_mode=self._conf.get("tracking_mode"),
pue=self._pue,
)
if delta:
if self._previous_emissions is None:
self._previous_emissions = total_emissions
else:
# Create a copy
delta_emissions = dataclasses.replace(total_emissions)
# Compute emissions rate from delta
delta_emissions.compute_delta_emission(self._previous_emissions)
# TODO : find a way to store _previous_emissions only when
# TODO : the API call succeeded
self._previous_emissions = total_emissions
total_emissions = delta_emissions
logger.debug(total_emissions)
return total_emissions

def _compute_emissions_delta(self, total_emissions: EmissionsData) -> EmissionsData:
delta_emissions: EmissionsData = total_emissions
if self._previous_emissions is None:
self._previous_emissions = total_emissions
else:
# Create a copy
delta_emissions = dataclasses.replace(total_emissions)
# Compute emissions rate from delta
delta_emissions.compute_delta_emission(self._previous_emissions)
# TODO : find a way to store _previous_emissions only when
# TODO : the API call succeeded
self._previous_emissions = total_emissions
return delta_emissions

@abstractmethod
def _get_geo_metadata(self) -> GeoMetadata:
"""
Expand Down Expand Up @@ -742,19 +753,19 @@ def _measure_power_and_energy(self) -> None:
self._last_measured_time = time.time()
self._measure_occurrence += 1
if (
self._cc_api__out is not None or self._cc_prometheus_out is not None
) and self._api_call_interval != -1:
if self._measure_occurrence >= self._api_call_interval:
emissions = self._prepare_emissions_data(delta=True)
logger.info(
f"{emissions.emissions_rate * 1000:.6f} g.CO2eq/s mean an estimation of "
+ f"{emissions.emissions_rate * 3600 * 24 * 365:,} kg.CO2eq/year"
)
if self._cc_api__out:
self._cc_api__out.out(emissions)
if self._cc_prometheus_out:
self._cc_prometheus_out.out(emissions)
self._measure_occurrence = 0
self._api_call_interval != -1
and len(self._output_handlers) > 0
and self._measure_occurrence >= self._api_call_interval
):
emissions = self._prepare_emissions_data()
emissions_delta = self._compute_emissions_delta(emissions)
logger.info(
f"{emissions_delta.emissions_rate * 1000:.6f} g.CO2eq/s mean an estimation of "
+ f"{emissions_delta.emissions_rate * 3600 * 24 * 365:,} kg.CO2eq/year"
)
for handler in self._output_handlers:
handler.live_out(emissions, emissions_delta)
self._measure_occurrence = 0
logger.debug(f"last_duration={last_duration}\n------------------------")

def __enter__(self):
Expand Down Expand Up @@ -924,6 +935,7 @@ def track_emissions(
save_to_logger: Optional[bool] = _sentinel,
save_to_prometheus: Optional[bool] = _sentinel,
prometheus_url: Optional[str] = _sentinel,
output_handlers: Optional[List[BaseOutput]] = _sentinel,
logging_logger: Optional[LoggerOutput] = _sentinel,
offline: Optional[bool] = _sentinel,
emissions_endpoint: Optional[str] = _sentinel,
Expand Down Expand Up @@ -1002,6 +1014,7 @@ def wrapped_fn(*args, **kwargs):
save_to_logger=save_to_logger,
save_to_prometheus=save_to_prometheus,
prometheus_url=prometheus_url,
output_handlers=output_handlers,
logging_logger=logging_logger,
country_iso_code=country_iso_code,
region=region,
Expand All @@ -1023,6 +1036,7 @@ def wrapped_fn(*args, **kwargs):
save_to_logger=save_to_logger,
save_to_prometheus=save_to_prometheus,
prometheus_url=prometheus_url,
output_handlers=output_handlers,
logging_logger=logging_logger,
gpu_ids=gpu_ids,
log_level=log_level,
Expand Down
25 changes: 17 additions & 8 deletions codecarbon/output_methods/base_output.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
from abc import ABC, abstractmethod
from typing import List

from codecarbon.output_methods.emissions_data import EmissionsData
from codecarbon.output_methods.emissions_data import EmissionsData, TaskEmissionsData


class BaseOutput(ABC):
class BaseOutput:
"""
An abstract class that requires children to inherit a single method,
`out` which is used for persisting data. This could be by saving it to a file,
posting to Json Box, saving to a database, sending a slack message etc.
An abstract class defining possible contracts for an output strategy, a strategy implementation can save emissions
data to a file, posting to Json Box, saving to a database, sending a Slack message etc.
Each method is responsible for a different part of the EmissionsData lifecycle:
- `out` is used by termination calls such as emissions_tracker.flush and emissions_tracker.stop
- `live_out` is used by live measurement events, e.g. the iterative update of prometheus metrics
- `task_out` is used by terminate calls such as emissions_tracker.flush and emissions_tracker.stop, but uses
emissions segregated by task
"""

@abstractmethod
def out(self, data: EmissionsData):
def out(self, total: EmissionsData, delta: EmissionsData):
pass

def live_out(self, total: EmissionsData, delta: EmissionsData):
pass

def task_out(self, data: List[TaskEmissionsData], experiment_name: str):
pass
34 changes: 19 additions & 15 deletions codecarbon/output_methods/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ class FileOutput(BaseOutput):
Saves experiment artifacts to a file
"""

def __init__(self, save_file_path: str, on_csv_write: str = "append"):
def __init__(
self, output_file_name: str, output_dir: str, on_csv_write: str = "append"
):
if on_csv_write not in {"append", "update"}:
raise ValueError(
f"Unknown `on_csv_write` value: {on_csv_write}"
+ " (should be one of 'append' or 'update'"
)
self.output_file_name: str = output_file_name
self.output_dir: str = output_dir
self.on_csv_write: str = on_csv_write
self.save_file_path: str = save_file_path
self.save_file_path = os.path.join(self.output_dir, self.output_file_name)
logger.info(
f"Saving emissions data to file {os.path.abspath(self.save_file_path)}"
)
Expand All @@ -34,42 +38,42 @@ def has_valid_headers(self, data: EmissionsData):
list_of_column_names = list(dict_from_csv.keys())
return list(data.values.keys()) == list_of_column_names

def out(self, data: EmissionsData):
def out(self, total: EmissionsData, delta: EmissionsData):
file_exists: bool = os.path.isfile(self.save_file_path)
if file_exists and not self.has_valid_headers(data):
if file_exists and not self.has_valid_headers(total):
logger.info("Backing up old emission file")
backup(self.save_file_path)
file_exists = False

if not file_exists:
df = pd.DataFrame(columns=data.values.keys())
df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])])
df = pd.DataFrame(columns=total.values.keys())
df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])])
elif self.on_csv_write == "append":
df = pd.read_csv(self.save_file_path)
df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])])
df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])])
else:
df = pd.read_csv(self.save_file_path)
df_run = df.loc[df.run_id == data.run_id]
df_run = df.loc[df.run_id == total.run_id]
if len(df_run) < 1:
df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])])
df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])])
elif len(df_run) > 1:
logger.warning(
f"CSV contains more than 1 ({len(df_run)})"
+ f" rows with current run ID ({data.run_id})."
+ f" rows with current run ID ({total.run_id})."
+ "Appending instead of updating."
)
df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])])
df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])])
else:
df.at[df.run_id == data.run_id, data.values.keys()] = (
data.values.values()
df.at[df.run_id == total.run_id, total.values.keys()] = (
total.values.values()
)

df.to_csv(self.save_file_path, index=False)

def task_out(self, data: List[TaskEmissionsData], experiment_name: str, output_dir):
def task_out(self, data: List[TaskEmissionsData], experiment_name: str):
run_id = data[0].run_id
save_task_file_path = os.path.join(
output_dir, "emissions_" + experiment_name + "_" + run_id + ".csv"
self.output_dir, "emissions_" + experiment_name + "_" + run_id + ".csv"
)
df = pd.DataFrame(columns=data[0].values.keys())
df = pd.concat(
Expand Down
8 changes: 4 additions & 4 deletions codecarbon/output_methods/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ class HTTPOutput(BaseOutput):
def __init__(self, endpoint_url: str):
self.endpoint_url: str = endpoint_url

def out(self, data: EmissionsData):
def out(self, total: EmissionsData, delta: EmissionsData):
try:
payload = dataclasses.asdict(data)
payload = dataclasses.asdict(total)
payload["user"] = getpass.getuser()
resp = requests.post(self.endpoint_url, json=payload, timeout=10)
if resp.status_code != 201:
Expand Down Expand Up @@ -50,8 +50,8 @@ def __init__(self, endpoint_url: str, experiment_id: str, api_key: str, conf):
)
self.run_id = self.api.run_id

def out(self, data: EmissionsData):
def out(self, total: EmissionsData, delta: EmissionsData):
try:
self.api.add_emission(dataclasses.asdict(data))
self.api.add_emission(dataclasses.asdict(delta))
except Exception as e:
logger.error(e, exc_info=True)
Loading

0 comments on commit d0ed60b

Please sign in to comment.