Skip to content

Commit

Permalink
Merge pull request #133 from mlcommons/refactor-converter
Browse files Browse the repository at this point in the history
Refactor converter for better maintainability and readability
srinivas212 authored Jul 16, 2024

Verified

This commit was signed with the committer’s verified signature.
jonathan-irvin Jonathan Irvin
2 parents 472c72a + 48de9fa commit 465e8d4
Showing 6 changed files with 501 additions and 539 deletions.
13 changes: 8 additions & 5 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
@@ -54,13 +54,16 @@ $ chakra_trace_link \
```

### Execution Trace Converter (chakra_converter)
Converts the merged execution traces into the Chakra schema.
Converts the execution traces from `chakra_trace_link` into traces in the protobuf format. It is responsible for identifying and encoding dependencies for simulation as well. The converter is designed for any downstream simulators that take Chakra execution traces in the protobuf format. It takes an input file in another format and generates a Chakra execution trace output in the protobuf format.
```bash
$ chakra_converter \
--input_filename /path/to/chakra_host_device_trace.json \
--output_filename /path/to/chakra_trace \
--input_type <input_type>
$ chakra_converter PyTorch \
--input /path/to/chakra_host_device_trace.json \
--output /path/to/chakra_trace \
[--simulate] \
```
* --input: Path to the input file containing the merged Chakra host and device traces in JSON format.
* --output: Path to the output file where the converted Chakra trace will be saved in protobuf format.
* --simulate: (Optional) Enable simulation of operators after the conversion for validation and debugging purposes. This option allows simulation of traces without running them through a simulator. Users can validate the converter or simulator against actual measured values using tools like chrome://tracing or https://perfetto.dev/. Read the duration of the timeline and compare the total execution time against the final simulation time of a trace. Disabled by default because it takes a long time.

### Execution Trace Feeder (et_feeder)
The Execution Trace Feeder (et_feeder) is a C++ library designed to feed Chakra traces into any compatible C++ simulator. This library specifically provides dependency-free nodes to a simulator, which must import the feeder as a library. Currently, ASTRA-sim is the only simulator that supports this trace feeder. Below are the commands to run execution traces on ASTRA-sim:
118 changes: 83 additions & 35 deletions src/converter/converter.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import argparse
import logging
import sys
import traceback

from .pytorch_converter import PyTorchConverter
from .text_converter import TextConverter


def setup_logging(log_filename: str) -> None:
"""Set up logging to file and stream handlers."""
formatter = logging.Formatter("%(levelname)s [%(asctime)s] %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p")

file_handler = logging.FileHandler(log_filename, mode="w")
@@ -21,46 +20,95 @@ def setup_logging(log_filename: str) -> None:
logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, stream_handler])


def convert_text(args: argparse.Namespace) -> None:
"""Convert text input trace to Chakra execution trace."""
converter = TextConverter(args.input, args.output, args.num_npus, args.num_passes)
converter.convert()


def convert_pytorch(args: argparse.Namespace) -> None:
"""Convert PyTorch input trace to Chakra execution trace."""
converter = PyTorchConverter()
converter.convert(args.input, args.output, args.simulate)


def main() -> None:
parser = argparse.ArgumentParser(description="Execution Trace Converter")
parser.add_argument("--input_type", type=str, default=None, required=True, help="Input execution trace type")
parser.add_argument(
"--input_filename", type=str, default=None, required=True, help="Input execution trace filename"
"""Convert to Chakra execution trace in the protobuf format."""
parser = argparse.ArgumentParser(
description=(
"Chakra execution trace converter for simulators. This converter is designed for any downstream "
"simulators that take Chakra execution traces in the protobuf format. This converter takes an input file "
"in another format and generates a Chakra execution trace output in the protobuf format."
)
)

parser.add_argument("--log-filename", type=str, default="debug.log", help="Log filename")

subparsers = parser.add_subparsers(title="subcommands", description="Valid subcommands", help="Input type")

pytorch_parser = subparsers.add_parser(
"PyTorch",
help="Convert Chakra host + device execution trace in JSON to Chakra host + device execution trace in the "
"Chakra schema with protobuf format",
)
parser.add_argument(
"--output_filename", type=str, default=None, required=True, help="Output Chakra execution trace filename"
pytorch_parser.add_argument(
"--input", type=str, required=True, help="Input Chakra host + device traces in the JSON format"
)
parser.add_argument(
"--num_npus", type=int, default=None, required="Text" in sys.argv, help="Number of NPUs in a system"
pytorch_parser.add_argument(
"--output", type=str, required=True, help="Output Chakra host + device traces in the protobuf format"
)
parser.add_argument(
"--num_passes", type=int, default=None, required="Text" in sys.argv, help="Number of training passes"
pytorch_parser.add_argument(
"--simulate",
action="store_true",
help=(
"Enable simulation of operators after the conversion for validation and debugging purposes. This option "
"allows simulation of traces without running them through a simulator. Users can validate the converter "
"or simulator against actual measured values using tools like chrome://tracing or https://perfetto.dev/. "
"Read the duration of the timeline and compare the total execution time against the final simulation time "
"of a trace. Disabled by default because it takes a long time."
),
)
parser.add_argument("--simulate", action="store_true", help="Run simulate_execution if set")
parser.add_argument("--log_filename", type=str, default="debug.log", help="Log filename")
pytorch_parser.set_defaults(func=convert_pytorch)

text_parser = subparsers.add_parser(
"Text", help="Convert text-based model description to Chakra schema-based traces in the protobuf format"
)
text_parser.add_argument(
"--input",
type=str,
required=True,
help=(
"Input file in the text format that describes a model. This follows the text format used in ASTRA-sim: "
"https://github.com/astra-sim/astra-sim"
),
)
text_parser.add_argument(
"--output", type=str, required=True, help="Output Chakra execution trace filename in the protobuf format"
)
text_parser.add_argument(
"--num-npus",
type=int,
required=True,
help="Number of NPUs in a system. Determines the number of traces the converter generates",
)
text_parser.add_argument(
"--num-passes",
type=int,
required=True,
help=(
"Number of loops when generating traces based on the text input file. Increasing the number of passes "
"increases the number of training iterations for a given text input."
),
)
text_parser.set_defaults(func=convert_text)

args = parser.parse_args()

setup_logging(args.log_filename)
logging.debug(" ".join(sys.argv))

try:
if args.input_type == "Text":
converter = TextConverter(args.input_filename, args.output_filename, args.num_npus, args.num_passes)
converter.convert()
elif args.input_type == "PyTorch":
converter = PyTorchConverter(args.input_filename, args.output_filename, simulate=args.simulate)
converter.convert()
else:
supported_types = ["Text", "PyTorch"]
logging.error(
f"The input type '{args.input_type}' is not supported. "
f"Supported types are: {', '.join(supported_types)}."
)
sys.exit(1)
except Exception:
traceback.print_exc()
logging.debug(traceback.format_exc())
sys.exit(1)
if "func" in args:
setup_logging(args.log_filename)
args.func(args)
else:
parser.print_help()


if __name__ == "__main__":
760 changes: 356 additions & 404 deletions src/converter/pytorch_converter.py

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion src/converter/pytorch_node.py
Original file line number Diff line number Diff line change
@@ -88,7 +88,13 @@ def parse_data(self, node_data: Dict[str, Any]) -> None:
else:
raise ValueError(
f"Unsupported schema version '{self.schema}'. Please check if the schema version is in the list of "
f"supported versions: {self.SUPPORTED_VERSIONS}"
f"supported versions: {self.SUPPORTED_VERSIONS}. The schema version of the trace is not supported by "
f"the converter. The schema version is determined by the PyTorch version used to collect Chakra host "
f"execution traces. Please consider changing the PyTorch version you are using. For more details, you "
f"can follow the git history of the relevant file: "
f"https://github.com/pytorch/pytorch/blob/7cd48df2dae7e2194438b162968c47d1f05bf20e/torch/csrc/"
f"profiler/standalone/execution_trace_observer.cpp#L308. Check which PyTorch versions generate Chakra "
f"host traces that are supported by the converter."
)

def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None:
7 changes: 6 additions & 1 deletion src/converter/pytorch_tensor.py
Original file line number Diff line number Diff line change
@@ -5,9 +5,14 @@ class PyTorchTensor:
"""
Represents a tensor with its associated properties.
Attributes
Attributes:
tensor_data (List[int]): Data of the tensor including tensor_id, storage_id, offset, number of elements, and
size of each element in bytes.
Note:
For more details on the tensor data structure, refer to:
https://github.com/pytorch/pytorch/blob/7cd48df2dae7e2194438b162968c47d1f05bf20e/torch/csrc/profiler/
standalone/execution_trace_observer.cpp#L400
"""

def __init__(self, tensor_data: List[int]) -> None:
134 changes: 41 additions & 93 deletions tests/converter/test_pytorch_converter.py
Original file line number Diff line number Diff line change
@@ -73,39 +73,27 @@ def mock_chakra_node() -> ChakraNode:
return node


def test_initialization() -> None:
converter = PyTorchConverter("input.json", "output.json")
assert converter.input_filename == "input.json"
assert converter.output_filename == "output.json"


@patch("builtins.open", new_callable=mock_open)
def test_load_pytorch_execution_traces(mock_file: MagicMock, sample_pytorch_data: Dict) -> None:
def test_load_json_execution_traces(mock_file: MagicMock, sample_pytorch_data: Dict) -> None:
mock_file.return_value.read.return_value = json.dumps(sample_pytorch_data)
converter = PyTorchConverter("input.json", "output.json")
data = converter.load_pytorch_execution_traces()
converter = PyTorchConverter()
data = converter.load_json_execution_traces("input.json")
assert data == sample_pytorch_data
mock_file.assert_called_once_with("input.json", "r")


def test_parse_and_instantiate_nodes(sample_pytorch_data: Dict) -> None:
converter = PyTorchConverter("input.json", "output.json")
(
pytorch_schema,
pytorch_pid,
pytorch_time,
pytorch_start_ts,
pytorch_finish_ts,
pytorch_nodes,
) = converter._parse_and_instantiate_nodes(sample_pytorch_data)
assert pytorch_schema == "1.0.2-chakra.0.0.4"
assert pytorch_pid == 1234
assert pytorch_time == "2023-01-01 12:00:00"
assert pytorch_start_ts == 1000
assert pytorch_finish_ts == 2000
assert len(pytorch_nodes) == 2
assert pytorch_nodes[1].id == 1
assert pytorch_nodes[2].id == 2
def test_parse_json_trace(sample_pytorch_data: Dict) -> None:
converter = PyTorchConverter()
json_metadata, json_node_map = converter.parse_json_trace(sample_pytorch_data)

assert json_metadata["schema"] == "1.0.2-chakra.0.0.4"
assert json_metadata["pid"] == 1234
assert json_metadata["time"] == "2023-01-01 12:00:00"
assert json_metadata["start_ts"] == 1000
assert json_metadata["finish_ts"] == 2000
assert len(json_node_map) == 2
assert json_node_map[1].id == 1
assert json_node_map[2].id == 2


def create_sample_graph(parent_id: int = 0, expected_child_id: int = 0) -> Dict[int, PyTorchNode]:
@@ -132,90 +120,50 @@ def create_sample_graph(parent_id: int = 0, expected_child_id: int = 0) -> Dict[

@pytest.mark.parametrize("parent_id, expected_child_id", [(1, 2), (None, None)])
def test_establish_parent_child_relationships(parent_id: int, expected_child_id: int) -> None:
converter = PyTorchConverter("input.json", "output.json")
pytorch_nodes = create_sample_graph(parent_id, expected_child_id)
converter = PyTorchConverter()
json_node_map = create_sample_graph(parent_id, expected_child_id)

pytorch_nodes = converter._establish_parent_child_relationships(pytorch_nodes, [])
json_node_map = converter.establish_parent_child_relationships(json_node_map, [])

if expected_child_id:
assert pytorch_nodes[parent_id].children[0].id == expected_child_id
assert json_node_map[parent_id].children[0].id == expected_child_id
else:
assert len(pytorch_nodes[1].children) == 0
assert len(json_node_map[1].children) == 0


def test_convert_nodes(sample_pytorch_data: Dict) -> None:
converter = PyTorchConverter("input.json", "output.json")
(
pytorch_schema,
pytorch_pid,
pytorch_time,
pytorch_start_ts,
pytorch_finish_ts,
pytorch_nodes,
) = converter._parse_and_instantiate_nodes(sample_pytorch_data)
pytorch_nodes = converter._establish_parent_child_relationships(pytorch_nodes, [])
def test_convert_json_to_protobuf_nodes(sample_pytorch_data: Dict) -> None:
converter = PyTorchConverter()
json_metadata, json_node_map = converter.parse_json_trace(sample_pytorch_data)
json_node_map = converter.establish_parent_child_relationships(json_node_map, [])
chakra_nodes = {}
converter.convert_nodes(pytorch_nodes, chakra_nodes)
converter.convert_json_to_protobuf_nodes(json_node_map, chakra_nodes)
assert len(chakra_nodes) == 2
assert chakra_nodes[1].id == 1
assert chakra_nodes[2].id == 2


def test_convert_ctrl_dep_to_data_dep(sample_pytorch_data: Dict) -> None:
converter = PyTorchConverter("input.json", "output.json")
(
pytorch_schema,
pytorch_pid,
pytorch_time,
pytorch_start_ts,
pytorch_finish_ts,
pytorch_nodes,
) = converter._parse_and_instantiate_nodes(sample_pytorch_data)
pytorch_nodes = converter._establish_parent_child_relationships(pytorch_nodes, [])
converter = PyTorchConverter()
json_metadata, json_node_map = converter.parse_json_trace(sample_pytorch_data)
json_node_map = converter.establish_parent_child_relationships(json_node_map, [])
chakra_nodes = {}
converter.convert_nodes(pytorch_nodes, chakra_nodes)
converter.convert_json_to_protobuf_nodes(json_node_map, chakra_nodes)
root_node = chakra_nodes[1]
converter.convert_ctrl_dep_to_data_dep(pytorch_nodes, chakra_nodes, root_node)
converter.convert_ctrl_dep_to_data_dep(json_node_map, chakra_nodes, root_node)
assert root_node.data_deps == []


@patch("builtins.open", new_callable=mock_open)
def test_write_chakra_et(mock_file: MagicMock, sample_pytorch_data: Dict) -> None:
converter = PyTorchConverter("input.json", "output.json")
converter.chakra_et = mock_file()
(
pytorch_schema,
pytorch_pid,
pytorch_time,
pytorch_start_ts,
pytorch_finish_ts,
pytorch_nodes,
) = converter._parse_and_instantiate_nodes(sample_pytorch_data)
pytorch_nodes = converter._establish_parent_child_relationships(pytorch_nodes, [])
converter = PyTorchConverter()
json_metadata, json_node_map = converter.parse_json_trace(sample_pytorch_data)
json_node_map = converter.establish_parent_child_relationships(json_node_map, [])
chakra_nodes = {}
converter.convert_nodes(pytorch_nodes, chakra_nodes)
converter.write_chakra_et(
converter.chakra_et,
pytorch_schema,
pytorch_pid,
pytorch_time,
pytorch_start_ts,
pytorch_finish_ts,
chakra_nodes,
)
converter.convert_json_to_protobuf_nodes(json_node_map, chakra_nodes)
converter.write_protobuf_execution_trace("output.et", json_metadata, chakra_nodes)
assert mock_file().write.called


@patch("builtins.open", new_callable=mock_open)
def test_close_chakra_execution_trace(mock_file: MagicMock) -> None:
converter = PyTorchConverter("input.json", "output.json")
file_handle = mock_file()
file_handle.closed = False # Simulate an open file
converter.chakra_et = file_handle
converter.close_chakra_execution_trace(converter.chakra_et)
file_handle.close.assert_called_once()


@pytest.mark.parametrize(
"pytorch_node_data, expected_type",
[
@@ -225,13 +173,13 @@ def test_close_chakra_execution_trace(mock_file: MagicMock) -> None:
({"name": "other_op", "is_gpu_op": False}, COMP_NODE),
],
)
def test_get_chakra_node_type_from_pytorch_node(pytorch_node_data: Dict, expected_type: int) -> None:
def test_get_protobuf_node_type_from_json_node(pytorch_node_data: Dict, expected_type: int) -> None:
# Create a mock PyTorchNode with the required attributes
pytorch_node = MagicMock(spec=PyTorchNode)
pytorch_node.name = pytorch_node_data["name"]
pytorch_node.is_gpu_op = MagicMock(return_value=pytorch_node_data["is_gpu_op"])

# Create a mock pytorch_nodes dictionary with actual PyTorchNode instances
# Create a mock json_node_map dictionary with actual PyTorchNode instances
mock_pytorch_node_data = {
"id": 0,
"name": "mock_node",
@@ -242,10 +190,10 @@ def test_get_chakra_node_type_from_pytorch_node(pytorch_node_data: Dict, expecte
"attrs": [],
}
mock_pytorch_node = PyTorchNode("1.0.2-chakra.0.0.4", mock_pytorch_node_data)
pytorch_nodes = {0: mock_pytorch_node, 1: pytorch_node}
json_node_map = {0: mock_pytorch_node, 1: pytorch_node}

converter = PyTorchConverter("input.json", "output.json")
node_type = converter.get_chakra_node_type_from_pytorch_node(pytorch_nodes, pytorch_node)
converter = PyTorchConverter()
node_type = converter.get_protobuf_node_type_from_json_node(json_node_map, pytorch_node)
assert node_type == expected_type


@@ -260,6 +208,6 @@ def test_get_chakra_node_type_from_pytorch_node(pytorch_node_data: Dict, expecte
],
)
def test_get_collective_comm_type(name: str, expected_comm_type: int) -> None:
converter = PyTorchConverter("input.json", "output.json")
converter = PyTorchConverter()
comm_type = converter.get_collective_comm_type(name)
assert comm_type == expected_comm_type

0 comments on commit 465e8d4

Please sign in to comment.