Skip to content

Commit

Permalink
feat: actual regions
Browse files Browse the repository at this point in the history
  • Loading branch information
JuanPedroGHM committed Aug 10, 2023
1 parent 60bd56e commit 3397ee2
Show file tree
Hide file tree
Showing 6 changed files with 550 additions and 280 deletions.
243 changes: 126 additions & 117 deletions perun/data_model/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import dataclasses
import enum
import time
from dataclasses import asdict
from typing import Any, Dict, List, Optional, Union

import numpy as np
Expand Down Expand Up @@ -43,6 +44,7 @@ class MetricType(str, enum.Enum):
GPU_ENERGY = "gpu_energy"
DRAM_ENERGY = "dram_energy"
OTHER_ENERGY = "other_energy"
N_RUNS = "n_runs"


class AggregateType(str, enum.Enum):
Expand Down Expand Up @@ -89,119 +91,6 @@ def copy(self):
)


class LocalRegions:
"""Stores local region data while an application is being monitored."""

def __init__(self) -> None:
self._regions: Dict[str, List[int]] = {}

def addEvent(self, region_name: str) -> None:
"""Mark a new event for the named region.
Parameters
----------
region_name : str
Region to mark the event from.
"""
if region_name not in self._regions:
self._regions[region_name] = []

self._regions[region_name].append(time.time_ns())


class Regions:
"""Stores region data from all MPI ranks.
For each marked region (decorated function), an numpy array with timestamps indicating function starts and ends.
"""

def __init__(self) -> None:
self._regions: Dict[str, Dict[int, np.ndarray]] = {}

def getRegion(self, region_name: str) -> Dict[int, np.ndarray]:
"""Get data from a named region.
Parameters
----------
region_name : str
Region id.
Returns
-------
Dict[int, np.ndarray]
Region data from all MPI ranks.
"""
return self._regions[region_name]

def getRegionFromRank(self, region_name: str, rank: int) -> np.ndarray:
"""Obtain region data from a particular MPI rank.
Parameters
----------
region_name : str
Region id.
rank : int
MPI rank.
Returns
-------
np.ndarray
Numpy array with all marked event timestamps.
"""
return self._regions[region_name][rank]

def toDict(self) -> Dict[str, Dict[int, np.ndarray]]:
"""Convert regions to a python dictionary.
Returns
-------
Dict[str, Dict[int, np.ndarray]]
Dictionary with region data.
"""
return self._regions

@classmethod
def fromLocalRegions(cls, local_regions: List[LocalRegions], start_time: int):
"""Create a Regions object from a list of local regions.
Parameters
----------
local_regions : List[LocalRegions]
Local region objects collected from multiple MPI ranks.
"""
regionObj = cls()
for rank, local_region in enumerate(local_regions):
for region_name, region in local_region._regions.items():
if region_name not in regionObj._regions:
regionObj._regions[region_name] = {}

t_s = np.array(region)
t_s -= start_time
t_s = t_s.astype("float32")
t_s *= 1e-9

regionObj._regions[region_name][rank] = t_s
return regionObj

@classmethod
def fromDict(cls, regions: Dict[str, Dict[int, np.ndarray]]):
"""Create Regions object from a dictionary.
Parameters
----------
regions : Dict[str, Dict[int, np.ndarray]]
Regions dictionary.
Returns
-------
Regions
Regions object.
"""
regionObj = cls()
regionObj._regions = regions
return regionObj


@dataclasses.dataclass
class Stats:
"""Collects statistics based on multiple metrics of the same type."""
Expand Down Expand Up @@ -309,6 +198,92 @@ def fromDict(cls, rawDataDict: Dict):
)


class LocalRegions:
"""Stores local region data while an application is being monitored."""

def __init__(self) -> None:
self._regions: Dict[str, List[int]] = {}

def addEvent(self, region_name: str) -> None:
"""Mark a new event for the named region.
Parameters
----------
region_name : str
Region to mark the event from.
"""
if region_name not in self._regions:
self._regions[region_name] = []

self._regions[region_name].append(time.time_ns())


@dataclasses.dataclass
class Region:
"""Stores region data from all MPI ranks.
For each marked region (decorated function), an numpy array with timestamps indicating function starts and ends.
"""

id: str = ""
world_size: int = 0
raw_data: Dict[int, np.ndarray] = dataclasses.field(default_factory=dict)
runs_per_rank: Optional[Stats] = None
runtime: Optional[Stats] = None
power: Optional[Stats] = None
energy: Optional[Stats] = None
processed: bool = False

def toDict(self) -> Dict[str, Any]:
"""Convert regions to a python dictionary.
Returns
-------
Dict[str, Dict[int, np.ndarray]]
Dictionary with region data.
"""
result = {
"id": self.id,
"world_size": self.world_size,
"raw_data": self.raw_data,
}

result["runs_per_rank"] = (
asdict(self.runs_per_rank) if self.runs_per_rank else None
)
result["runtime"] = asdict(self.runtime) if self.runtime else None
result["power"] = asdict(self.power) if self.power else None
result["energy"] = asdict(self.energy) if self.energy else None

return result

@classmethod
def fromDict(cls, regionDictionary: Dict[str, Any]):
"""Create Regions object from a dictionary.
Parameters
----------
regions : Dict[str, Dict[int, np.ndarray]]
Regions dictionary.
Returns
-------
Regions
Regions object.
"""
regionObj = Region()
regionObj.id = regionDictionary["id"]
regionObj.world_size = regionDictionary["world_size"]
regionObj.raw_data = regionDictionary["raw_data"]
regionObj.processed = regionDictionary["processed"]
if regionObj.processed:
regionObj.runs_per_rank = Stats.fromDict(regionDictionary["runs_per_rank"])
regionObj.runtime = Stats.fromDict(regionDictionary["runtime"])
regionObj.power = Stats.fromDict(regionDictionary["power"])
regionObj.energy = Stats.fromDict(regionDictionary["energy"])
return regionObj


class DataNode:
"""Recursive data structure that contains all the information of a monitored application."""

Expand All @@ -321,7 +296,7 @@ def __init__(
metrics: Optional[Dict[MetricType, Union[Metric, Stats]]] = None,
deviceType: Optional[DeviceType] = None,
raw_data: Optional[RawData] = None,
regions: Optional[Regions] = None,
regions: Optional[Dict[str, Region]] = None,
processed: bool = False,
) -> None:
"""Perun DataNode.
Expand Down Expand Up @@ -354,9 +329,35 @@ def __init__(
)
self.deviceType: Optional[DeviceType] = deviceType
self.raw_data: Optional[RawData] = raw_data
self.regions: Optional[Regions] = regions
self.regions: Optional[Dict[str, Region]] = regions
self.processed = processed

def addRegionData(self, localRegions: List[LocalRegions], start_time: int):
"""Add region information to to data node.
Parameters
----------
localRegions : List[LocalRegions]
Gathered local regions from all MPI ranks
start_time : int
'Official' start time of the run.
"""
self.regions = {}
world_size = len(localRegions)
for rank, l_region in enumerate(localRegions):
for region_name, data in l_region._regions.items():
if region_name not in self.regions:
r = Region()
r.id = region_name
r.world_size = world_size
self.regions[region_name] = r

t_s = np.array(data)
t_s -= start_time
t_s = t_s.astype("float32")
t_s *= 1e-9
self.regions[region_name].raw_data[rank] = t_s

def toDict(self, include_raw_data: bool = True) -> Dict:
"""Transform object to dictionary."""
resultsDict = {
Expand All @@ -367,7 +368,12 @@ def toDict(self, include_raw_data: bool = True) -> Dict:
type.value: dataclasses.asdict(metric)
for type, metric in self.metrics.items()
},
"regions": self.regions.toDict() if self.regions else None,
"regions": {
region_name: region.toDict()
for region_name, region in self.regions.items()
}
if self.regions
else None,
"deviceType": self.deviceType,
"processed": self.processed,
}
Expand Down Expand Up @@ -423,6 +429,9 @@ def fromDict(cls, resultsDict: Dict):
newResults.raw_data = RawData.fromDict(resultsDict["raw_data"])

if "regions" in resultsDict:
newResults.regions = Regions.fromDict(resultsDict["regions"])
newResults.regions = {
region_name: Region.fromDict(region_dict)
for region_name, region_dict in resultsDict["regions"].items()
}

return newResults
54 changes: 40 additions & 14 deletions perun/io/hdf5.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
MetricType,
NodeType,
RawData,
Regions,
Region,
Stats,
)
from perun.data_model.measurement_type import Magnitude, Unit
Expand Down Expand Up @@ -221,21 +221,47 @@ def _readRawData(group: h5py.Group) -> RawData:
)


def _addRegions(h5Group: h5py.Group, regions: Regions):
def _addRegions(h5Group: h5py.Group, regions: Dict[str, Region]):
regions_group: h5py.Group = h5Group.create_group("regions")
for region_name, ranks in regions._regions.items():
region_group = regions_group.create_group(region_name)
for rank, events in ranks.items():
region_group.create_dataset(f"{rank}", data=events)
for region in regions.values():
_addRegion(regions_group, region)


def _addRegion(h5Group: h5py.Group, region: Region):
region_group = h5Group.create_group(region.id)
_addMetric(region_group, region.energy) # type: ignore
_addMetric(region_group, region.power) # type: ignore
_addMetric(region_group, region.runs_per_rank) # type: ignore
_addMetric(region_group, region.runtime) # type: ignore
region_group.attrs["id"] = region.id
region_group.attrs["processed"] = region.processed
region_group.attrs["world_size"] = region.world_size
raw_data_group = region_group.create_group("raw_data")
for rank, data in region.raw_data.items():
raw_data_group.create_dataset(str(rank), data=data)


def _readRegions(group: h5py.Group) -> Dict[str, Region]:
regionsDict: Dict[str, Region] = {}
for key, region_group in group.items():
regionsDict[key] = _readRegion(region_group)
return regionsDict


def _readRegions(group: h5py.Group) -> Regions:
regionsDict: Dict[str, Dict[int, np.ndarray]] = {}
for key, region_group in group.items():
if key not in regionsDict:
regionsDict[key] = {}
def _readRegion(group: h5py.Group) -> Region:
regionObj = Region()
regionObj.id = group.attrs["id"]
regionObj.processed = group.attrs["processed"]
regionObj.world_size = group.attrs["world_size"]

regionObj.energy = _readMetric(group["ENERGY"]) # type: ignore
regionObj.power = _readMetric(group["POWER"]) # type: ignore
regionObj.runtime = _readMetric(group["RUNTIME"]) # type: ignore
regionObj.runs_per_rank = _readMetric(group["N_RUNS"]) # type: ignore

for rank, dataset in region_group.items():
regionsDict[key][rank] = dataset[:]
raw_data_group = group["raw_data"]
regionObj.raw_data = {}
for key, data in raw_data_group.items():
regionObj.raw_data[int(key)] = data[:]

return Regions.fromDict(regionsDict)
return regionObj
Loading

0 comments on commit 3397ee2

Please sign in to comment.