Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(output): unifying execution of 'live' output handlers #459

Merged
merged 2 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ venv.bak/

# idea
.idea/
*.iml

# Sphinx
docs/_build/
Expand Down
130 changes: 72 additions & 58 deletions codecarbon/emissions_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""

import dataclasses
import os
import platform
import time
import uuid
Expand Down Expand Up @@ -150,6 +149,7 @@ def __init__(
logging_logger: Optional[LoggerOutput] = _sentinel,
save_to_prometheus: Optional[bool] = _sentinel,
prometheus_url: Optional[str] = _sentinel,
output_handlers: Optional[List[BaseOutput]] = _sentinel,
gpu_ids: Optional[List] = _sentinel,
emissions_endpoint: Optional[str] = _sentinel,
experiment_id: Optional[str] = _sentinel,
Expand Down Expand Up @@ -235,6 +235,7 @@ def __init__(
self._set_from_conf(logging_logger, "logging_logger")
self._set_from_conf(save_to_prometheus, "save_to_prometheus", False, bool)
self._set_from_conf(prometheus_url, "prometheus_url", "localhost:9091")
self._set_from_conf(output_handlers, "output_handlers", [])
self._set_from_conf(tracking_mode, "tracking_mode", "machine")
self._set_from_conf(on_csv_write, "on_csv_write", "append")
self._set_from_conf(logger_preamble, "logger_preamble", "")
Expand Down Expand Up @@ -400,38 +401,35 @@ def _init_output_methods(self, api_key):
"""
Prepare the different output methods
"""
self.persistence_objs: List[BaseOutput] = list()

if self._save_to_file:
self.persistence_objs.append(
self._output_handlers.append(
FileOutput(
os.path.join(self._output_dir, self._output_file),
self._output_file,
self._output_dir,
self._on_csv_write,
)
)

if self._save_to_logger:
self.persistence_objs.append(self._logging_logger)
self._output_handlers.append(self._logging_logger)

if self._emissions_endpoint:
self.persistence_objs.append(HTTPOutput(self._emissions_endpoint))
self._output_handlers.append(HTTPOutput(self._emissions_endpoint))

if self._save_to_api:
self._cc_api__out = CodeCarbonAPIOutput(
cc_api__out = CodeCarbonAPIOutput(
endpoint_url=self._api_endpoint,
experiment_id=self._experiment_id,
api_key=api_key,
conf=self._conf,
)
self.run_id = self._cc_api__out.run_id
self.persistence_objs.append(self._cc_api__out)

self.run_id = cc_api__out.run_id
self._output_handlers.append(cc_api__out)
else:
self.run_id = uuid.uuid4()

if self._save_to_prometheus:
self._cc_prometheus_out = PrometheusOutput(self._prometheus_url)
self.persistence_objs.append(self._cc_prometheus_out)
self._output_handlers.append(PrometheusOutput(self._prometheus_url))

def service_shutdown(self, signum, frame):
print("Caught signal %d" % signum)
Expand Down Expand Up @@ -477,7 +475,8 @@ def start_task(self, task_name=None) -> None:
# Read initial energy for hardware
for hardware in self._hardware:
hardware.start()
_ = self._prepare_emissions_data(delta=True)
_ = self._prepare_emissions_data()
_ = self._compute_emissions_delta(_)

self._tasks.update(
{
Expand All @@ -497,13 +496,14 @@ def stop_task(self, task_name: str = None) -> float:
task_name = task_name if task_name else self._active_task
self._measure_power_and_energy()

emissions_data = self._prepare_emissions_data(delta=True)
emissions_data = self._prepare_emissions_data()
emissions_data_delta = self._compute_emissions_delta(emissions_data)

task_duration = Time.from_seconds(
time.time() - self._tasks[task_name].start_time
)

task_emission_data = emissions_data
task_emission_data = emissions_data_delta
task_emission_data.duration = task_duration.seconds
self._tasks[task_name].emissions_data = task_emission_data
self._tasks[task_name].is_active = False
Expand All @@ -526,7 +526,11 @@ def flush(self) -> Optional[float]:
self._measure_power_and_energy()

emissions_data = self._prepare_emissions_data()
self._persist_data(emissions_data)
emissions_data_delta = self._compute_emissions_delta(emissions_data)

self._persist_data(
total_emissions=emissions_data, delta_emissions=emissions_data_delta
)

return emissions_data.emissions

Expand Down Expand Up @@ -554,29 +558,34 @@ def stop(self) -> Optional[float]:
self._measure_power_and_energy()

emissions_data = self._prepare_emissions_data()
emissions_data_delta = self._compute_emissions_delta(emissions_data)

self._persist_data(emissions_data, experiment_name=self._experiment_name)
self._persist_data(
total_emissions=emissions_data,
delta_emissions=emissions_data_delta,
experiment_name=self._experiment_name,
)

self.final_emissions_data = emissions_data
self.final_emissions = emissions_data.emissions
return emissions_data.emissions

def _persist_data(self, emissions_data, experiment_name=None):
for persistence in self.persistence_objs:
if isinstance(persistence, CodeCarbonAPIOutput):
emissions_data = self._prepare_emissions_data(delta=True)

persistence.out(emissions_data)
if isinstance(persistence, FileOutput):
if len(self._tasks) > 0:
task_emissions_data = []
for task in self._tasks:
task_emissions_data.append(self._tasks[task].out())
persistence.task_out(
task_emissions_data, experiment_name, self._output_dir
)
def _persist_data(
self,
total_emissions: EmissionsData,
delta_emissions: EmissionsData,
experiment_name=None,
):
task_emissions_data = []
for task in self._tasks:
task_emissions_data.append(self._tasks[task].out())

def _prepare_emissions_data(self, delta=False) -> EmissionsData:
for handler in self._output_handlers:
handler.out(total_emissions, delta_emissions)
if len(task_emissions_data) > 0:
handler.task_out(task_emissions_data, experiment_name)

def _prepare_emissions_data(self) -> EmissionsData:
"""
:delta: If 'True', return only the delta comsumption since the last call.
"""
Expand Down Expand Up @@ -636,21 +645,23 @@ def _prepare_emissions_data(self, delta=False) -> EmissionsData:
tracking_mode=self._conf.get("tracking_mode"),
pue=self._pue,
)
if delta:
if self._previous_emissions is None:
self._previous_emissions = total_emissions
else:
# Create a copy
delta_emissions = dataclasses.replace(total_emissions)
# Compute emissions rate from delta
delta_emissions.compute_delta_emission(self._previous_emissions)
# TODO : find a way to store _previous_emissions only when
# TODO : the API call succeeded
self._previous_emissions = total_emissions
total_emissions = delta_emissions
logger.debug(total_emissions)
return total_emissions

def _compute_emissions_delta(self, total_emissions: EmissionsData) -> EmissionsData:
delta_emissions: EmissionsData = total_emissions
if self._previous_emissions is None:
self._previous_emissions = total_emissions
else:
# Create a copy
delta_emissions = dataclasses.replace(total_emissions)
# Compute emissions rate from delta
delta_emissions.compute_delta_emission(self._previous_emissions)
# TODO : find a way to store _previous_emissions only when
# TODO : the API call succeeded
self._previous_emissions = total_emissions
return delta_emissions

@abstractmethod
def _get_geo_metadata(self) -> GeoMetadata:
"""
Expand Down Expand Up @@ -742,19 +753,19 @@ def _measure_power_and_energy(self) -> None:
self._last_measured_time = time.time()
self._measure_occurrence += 1
if (
self._cc_api__out is not None or self._cc_prometheus_out is not None
) and self._api_call_interval != -1:
if self._measure_occurrence >= self._api_call_interval:
emissions = self._prepare_emissions_data(delta=True)
logger.info(
f"{emissions.emissions_rate * 1000:.6f} g.CO2eq/s mean an estimation of "
+ f"{emissions.emissions_rate * 3600 * 24 * 365:,} kg.CO2eq/year"
)
if self._cc_api__out:
self._cc_api__out.out(emissions)
if self._cc_prometheus_out:
self._cc_prometheus_out.out(emissions)
self._measure_occurrence = 0
self._api_call_interval != -1
and len(self._output_handlers) > 0
and self._measure_occurrence >= self._api_call_interval
):
emissions = self._prepare_emissions_data()
emissions_delta = self._compute_emissions_delta(emissions)
logger.info(
f"{emissions_delta.emissions_rate * 1000:.6f} g.CO2eq/s mean an estimation of "
+ f"{emissions_delta.emissions_rate * 3600 * 24 * 365:,} kg.CO2eq/year"
)
for handler in self._output_handlers:
handler.live_out(emissions, emissions_delta)
self._measure_occurrence = 0
logger.debug(f"last_duration={last_duration}\n------------------------")

def __enter__(self):
Expand Down Expand Up @@ -924,6 +935,7 @@ def track_emissions(
save_to_logger: Optional[bool] = _sentinel,
save_to_prometheus: Optional[bool] = _sentinel,
prometheus_url: Optional[str] = _sentinel,
output_handlers: Optional[List[BaseOutput]] = _sentinel,
logging_logger: Optional[LoggerOutput] = _sentinel,
offline: Optional[bool] = _sentinel,
emissions_endpoint: Optional[str] = _sentinel,
Expand Down Expand Up @@ -1002,6 +1014,7 @@ def wrapped_fn(*args, **kwargs):
save_to_logger=save_to_logger,
save_to_prometheus=save_to_prometheus,
prometheus_url=prometheus_url,
output_handlers=output_handlers,
logging_logger=logging_logger,
country_iso_code=country_iso_code,
region=region,
Expand All @@ -1023,6 +1036,7 @@ def wrapped_fn(*args, **kwargs):
save_to_logger=save_to_logger,
save_to_prometheus=save_to_prometheus,
prometheus_url=prometheus_url,
output_handlers=output_handlers,
logging_logger=logging_logger,
gpu_ids=gpu_ids,
log_level=log_level,
Expand Down
25 changes: 17 additions & 8 deletions codecarbon/output_methods/base_output.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
from abc import ABC, abstractmethod
from typing import List

from codecarbon.output_methods.emissions_data import EmissionsData
from codecarbon.output_methods.emissions_data import EmissionsData, TaskEmissionsData


class BaseOutput(ABC):
class BaseOutput:
"""
An abstract class that requires children to inherit a single method,
`out` which is used for persisting data. This could be by saving it to a file,
posting to Json Box, saving to a database, sending a slack message etc.
An abstract class defining possible contracts for an output strategy, a strategy implementation can save emissions
data to a file, posting to Json Box, saving to a database, sending a Slack message etc.
Each method is responsible for a different part of the EmissionsData lifecycle:
- `out` is used by termination calls such as emissions_tracker.flush and emissions_tracker.stop
- `live_out` is used by live measurement events, e.g. the iterative update of prometheus metrics
- `task_out` is used by terminate calls such as emissions_tracker.flush and emissions_tracker.stop, but uses
emissions segregated by task
"""

@abstractmethod
def out(self, data: EmissionsData):
def out(self, total: EmissionsData, delta: EmissionsData):
pass

def live_out(self, total: EmissionsData, delta: EmissionsData):
pass

def task_out(self, data: List[TaskEmissionsData], experiment_name: str):
pass
34 changes: 19 additions & 15 deletions codecarbon/output_methods/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ class FileOutput(BaseOutput):
Saves experiment artifacts to a file
"""

def __init__(self, save_file_path: str, on_csv_write: str = "append"):
def __init__(
self, output_file_name: str, output_dir: str, on_csv_write: str = "append"
):
if on_csv_write not in {"append", "update"}:
raise ValueError(
f"Unknown `on_csv_write` value: {on_csv_write}"
+ " (should be one of 'append' or 'update'"
)
self.output_file_name: str = output_file_name
self.output_dir: str = output_dir
self.on_csv_write: str = on_csv_write
self.save_file_path: str = save_file_path
self.save_file_path = os.path.join(self.output_dir, self.output_file_name)
logger.info(
f"Saving emissions data to file {os.path.abspath(self.save_file_path)}"
)
Expand All @@ -34,42 +38,42 @@ def has_valid_headers(self, data: EmissionsData):
list_of_column_names = list(dict_from_csv.keys())
return list(data.values.keys()) == list_of_column_names

def out(self, data: EmissionsData):
def out(self, total: EmissionsData, delta: EmissionsData):
file_exists: bool = os.path.isfile(self.save_file_path)
if file_exists and not self.has_valid_headers(data):
if file_exists and not self.has_valid_headers(total):
logger.info("Backing up old emission file")
backup(self.save_file_path)
file_exists = False

if not file_exists:
df = pd.DataFrame(columns=data.values.keys())
df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])])
df = pd.DataFrame(columns=total.values.keys())
df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])])
elif self.on_csv_write == "append":
df = pd.read_csv(self.save_file_path)
df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])])
df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])])
else:
df = pd.read_csv(self.save_file_path)
df_run = df.loc[df.run_id == data.run_id]
df_run = df.loc[df.run_id == total.run_id]
if len(df_run) < 1:
df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])])
df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])])
elif len(df_run) > 1:
logger.warning(
f"CSV contains more than 1 ({len(df_run)})"
+ f" rows with current run ID ({data.run_id})."
+ f" rows with current run ID ({total.run_id})."
+ "Appending instead of updating."
)
df = pd.concat([df, pd.DataFrame.from_records([dict(data.values)])])
df = pd.concat([df, pd.DataFrame.from_records([dict(total.values)])])
else:
df.at[df.run_id == data.run_id, data.values.keys()] = (
data.values.values()
df.at[df.run_id == total.run_id, total.values.keys()] = (
total.values.values()
)

df.to_csv(self.save_file_path, index=False)

def task_out(self, data: List[TaskEmissionsData], experiment_name: str, output_dir):
def task_out(self, data: List[TaskEmissionsData], experiment_name: str):
run_id = data[0].run_id
save_task_file_path = os.path.join(
output_dir, "emissions_" + experiment_name + "_" + run_id + ".csv"
self.output_dir, "emissions_" + experiment_name + "_" + run_id + ".csv"
)
df = pd.DataFrame(columns=data[0].values.keys())
df = pd.concat(
Expand Down
8 changes: 4 additions & 4 deletions codecarbon/output_methods/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ class HTTPOutput(BaseOutput):
def __init__(self, endpoint_url: str):
self.endpoint_url: str = endpoint_url

def out(self, data: EmissionsData):
def out(self, total: EmissionsData, delta: EmissionsData):
try:
payload = dataclasses.asdict(data)
payload = dataclasses.asdict(total)
payload["user"] = getpass.getuser()
resp = requests.post(self.endpoint_url, json=payload, timeout=10)
if resp.status_code != 201:
Expand Down Expand Up @@ -50,8 +50,8 @@ def __init__(self, endpoint_url: str, experiment_id: str, api_key: str, conf):
)
self.run_id = self.api.run_id

def out(self, data: EmissionsData):
def out(self, total: EmissionsData, delta: EmissionsData):
try:
self.api.add_emission(dataclasses.asdict(data))
self.api.add_emission(dataclasses.asdict(delta))
except Exception as e:
logger.error(e, exc_info=True)
Loading
Loading