diff --git a/USER_GUIDE.md b/USER_GUIDE.md index f04d6dbf..569f47f9 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -54,13 +54,16 @@ $ chakra_trace_link \ ``` ### Execution Trace Converter (chakra_converter) -Converts the merged execution traces into the Chakra schema. +Converts the execution traces from `chakra_trace_link` into traces in the protobuf format. It is responsible for identifying and encoding dependencies for simulation as well. The converter is designed for any downstream simulators that take Chakra execution traces in the protobuf format. It takes an input file in another format and generates a Chakra execution trace output in the protobuf format. ```bash -$ chakra_converter \ - --input_filename /path/to/chakra_host_device_trace.json \ - --output_filename /path/to/chakra_trace \ - --input_type +$ chakra_converter PyTorch \ + --input /path/to/chakra_host_device_trace.json \ + --output /path/to/chakra_trace \ + [--simulate] \ ``` +* --input: Path to the input file containing the merged Chakra host and device traces in JSON format. +* --output: Path to the output file where the converted Chakra trace will be saved in protobuf format. +* --simulate: (Optional) Enable simulation of operators after the conversion for validation and debugging purposes. This option allows simulation of traces without running them through a simulator. Users can validate the converter or simulator against actual measured values using tools like chrome://tracing or https://perfetto.dev/. Read the duration of the timeline and compare the total execution time against the final simulation time of a trace. Disabled by default because it takes a long time. ### Execution Trace Feeder (et_feeder) The Execution Trace Feeder (et_feeder) is a C++ library designed to feed Chakra traces into any compatible C++ simulator. This library specifically provides dependency-free nodes to a simulator, which must import the feeder as a library. Currently, ASTRA-sim is the only simulator that supports this trace feeder. Below are the commands to run execution traces on ASTRA-sim: diff --git a/src/converter/converter.py b/src/converter/converter.py index 9aecf507..0e3b85b7 100644 --- a/src/converter/converter.py +++ b/src/converter/converter.py @@ -1,13 +1,12 @@ import argparse import logging -import sys -import traceback from .pytorch_converter import PyTorchConverter from .text_converter import TextConverter def setup_logging(log_filename: str) -> None: + """Set up logging to file and stream handlers.""" formatter = logging.Formatter("%(levelname)s [%(asctime)s] %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p") file_handler = logging.FileHandler(log_filename, mode="w") @@ -21,46 +20,95 @@ def setup_logging(log_filename: str) -> None: logging.basicConfig(level=logging.DEBUG, handlers=[file_handler, stream_handler]) +def convert_text(args: argparse.Namespace) -> None: + """Convert text input trace to Chakra execution trace.""" + converter = TextConverter(args.input, args.output, args.num_npus, args.num_passes) + converter.convert() + + +def convert_pytorch(args: argparse.Namespace) -> None: + """Convert PyTorch input trace to Chakra execution trace.""" + converter = PyTorchConverter() + converter.convert(args.input, args.output, args.simulate) + + def main() -> None: - parser = argparse.ArgumentParser(description="Execution Trace Converter") - parser.add_argument("--input_type", type=str, default=None, required=True, help="Input execution trace type") - parser.add_argument( - "--input_filename", type=str, default=None, required=True, help="Input execution trace filename" + """Convert to Chakra execution trace in the protobuf format.""" + parser = argparse.ArgumentParser( + description=( + "Chakra execution trace converter for simulators. This converter is designed for any downstream " + "simulators that take Chakra execution traces in the protobuf format. This converter takes an input file " + "in another format and generates a Chakra execution trace output in the protobuf format." + ) + ) + + parser.add_argument("--log-filename", type=str, default="debug.log", help="Log filename") + + subparsers = parser.add_subparsers(title="subcommands", description="Valid subcommands", help="Input type") + + pytorch_parser = subparsers.add_parser( + "PyTorch", + help="Convert Chakra host + device execution trace in JSON to Chakra host + device execution trace in the " + "Chakra schema with protobuf format", ) - parser.add_argument( - "--output_filename", type=str, default=None, required=True, help="Output Chakra execution trace filename" + pytorch_parser.add_argument( + "--input", type=str, required=True, help="Input Chakra host + device traces in the JSON format" ) - parser.add_argument( - "--num_npus", type=int, default=None, required="Text" in sys.argv, help="Number of NPUs in a system" + pytorch_parser.add_argument( + "--output", type=str, required=True, help="Output Chakra host + device traces in the protobuf format" ) - parser.add_argument( - "--num_passes", type=int, default=None, required="Text" in sys.argv, help="Number of training passes" + pytorch_parser.add_argument( + "--simulate", + action="store_true", + help=( + "Enable simulation of operators after the conversion for validation and debugging purposes. This option " + "allows simulation of traces without running them through a simulator. Users can validate the converter " + "or simulator against actual measured values using tools like chrome://tracing or https://perfetto.dev/. " + "Read the duration of the timeline and compare the total execution time against the final simulation time " + "of a trace. Disabled by default because it takes a long time." + ), ) - parser.add_argument("--simulate", action="store_true", help="Run simulate_execution if set") - parser.add_argument("--log_filename", type=str, default="debug.log", help="Log filename") + pytorch_parser.set_defaults(func=convert_pytorch) + + text_parser = subparsers.add_parser( + "Text", help="Convert text-based model description to Chakra schema-based traces in the protobuf format" + ) + text_parser.add_argument( + "--input", + type=str, + required=True, + help=( + "Input file in the text format that describes a model. This follows the text format used in ASTRA-sim: " + "https://github.com/astra-sim/astra-sim" + ), + ) + text_parser.add_argument( + "--output", type=str, required=True, help="Output Chakra execution trace filename in the protobuf format" + ) + text_parser.add_argument( + "--num-npus", + type=int, + required=True, + help="Number of NPUs in a system. Determines the number of traces the converter generates", + ) + text_parser.add_argument( + "--num-passes", + type=int, + required=True, + help=( + "Number of loops when generating traces based on the text input file. Increasing the number of passes " + "increases the number of training iterations for a given text input." + ), + ) + text_parser.set_defaults(func=convert_text) + args = parser.parse_args() - setup_logging(args.log_filename) - logging.debug(" ".join(sys.argv)) - - try: - if args.input_type == "Text": - converter = TextConverter(args.input_filename, args.output_filename, args.num_npus, args.num_passes) - converter.convert() - elif args.input_type == "PyTorch": - converter = PyTorchConverter(args.input_filename, args.output_filename, simulate=args.simulate) - converter.convert() - else: - supported_types = ["Text", "PyTorch"] - logging.error( - f"The input type '{args.input_type}' is not supported. " - f"Supported types are: {', '.join(supported_types)}." - ) - sys.exit(1) - except Exception: - traceback.print_exc() - logging.debug(traceback.format_exc()) - sys.exit(1) + if "func" in args: + setup_logging(args.log_filename) + args.func(args) + else: + parser.print_help() if __name__ == "__main__": diff --git a/src/converter/pytorch_converter.py b/src/converter/pytorch_converter.py index c0e7995c..a1ab7bd2 100644 --- a/src/converter/pytorch_converter.py +++ b/src/converter/pytorch_converter.py @@ -22,151 +22,110 @@ class PyTorchConverter: """ - Converter class for transforming PyTorch execution traces into Chakra format. + Converter for transforming Chakra host + device execution traces in JSON format into the Chakra protobuf format. - This class is responsible for converting the execution traces collected from PyTorch into a format that is - compatible with Chakra, a performance analysis tool. It handles the intricate mappings and transformations required - to accurately represent the execution in a different format. - - Attributes - input_filename (str): Input file name containing PyTorch execution trace. - output_filename (str): Output file name for the converted Chakra trace. + This class is responsible for converting the execution traces collected from Chakra host + device in JSON format + into the Chakra protobuf format. The input JSON traces are generated by trace_link and lack the proper dependencies + for simulation. This converter handles the conversion of JSON nodes to protobuf nodes, identification and encoding + of dependencies, removal of dangling nodes, and writing the final protobuf trace to the output file. """ - def __init__(self, input_filename: str, output_filename: str, simulate: bool = False) -> None: + def convert(self, input_filename: str, output_filename: str, simulate: bool) -> None: """ - Initialize the PyTorch to Chakra converter. It sets up necessary attributes and prepares the environment. + Convert Chakra host + device execution traces in JSON format into the Chakra protobuf format. Args: - input_filename (str): Name of the input file containing PyTorch execution trace. - output_filename (str): Name of the output file for the converted Chakra trace. - simulate (bool): Whether to run simulate_execution after conversion. - """ - self.input_filename = input_filename - self.output_filename = output_filename - self.simulate = simulate - - def convert(self) -> None: - """Convert PyTorch execution traces into the Chakra format.""" - pytorch_et_data = self.load_pytorch_execution_traces() - ( - pytorch_schema, - pytorch_pid, - pytorch_time, - pytorch_start_ts, - pytorch_finish_ts, - pytorch_nodes, - ) = self._parse_and_instantiate_nodes(pytorch_et_data) - chakra_et = self.open_chakra_execution_trace(self.output_filename) - chakra_nodes = {} - self.convert_nodes(pytorch_nodes, chakra_nodes) - root_nodes = [node for node in chakra_nodes.values() if self.is_root_node(node)] - for root_node in root_nodes: - self.convert_ctrl_dep_to_data_dep(pytorch_nodes, chakra_nodes, root_node) - chakra_nodes = self.remove_dangling_nodes(chakra_nodes) - parent_to_children_map = self.update_parent_to_children_map(chakra_nodes) - self.identify_cyclic_dependencies(chakra_nodes) - self.write_chakra_et( - chakra_et, - pytorch_schema, - pytorch_pid, - pytorch_time, - pytorch_start_ts, - pytorch_finish_ts, - chakra_nodes, - ) - self.close_chakra_execution_trace(chakra_et) - if self.simulate: - self.simulate_execution(chakra_nodes, pytorch_nodes, parent_to_children_map) - - def load_pytorch_execution_traces(self) -> Dict: + input_filename (str): Input Chakra host + device execution trace in the JSON format. + output_filename (str): Output Chakra host + device execution trace in the protobuf format. + simulate (bool): Flag to indicate whether to simulate the execution of the converted trace. If True, + the method will simulate the execution after writing the protobuf trace to the output file. """ - Load PyTorch execution traces from a file. + json_trace = self.load_json_execution_traces(input_filename) + json_metadata, json_node_map = self.parse_json_trace(json_trace) - Read and parse the PyTorch execution trace data from a file, creating PyTorchNode objects and establishing - node relationships. + protobuf_node_map = {} + self.convert_json_to_protobuf_nodes(json_node_map, protobuf_node_map) + root_node_list = [node for node in protobuf_node_map.values() if self.is_root_node(node.name)] + for root_node in root_node_list: + self.convert_ctrl_dep_to_data_dep(json_node_map, protobuf_node_map, root_node) - Raises - Exception: If there is an IOError in opening the file. + protobuf_node_map = self.remove_dangling_nodes(protobuf_node_map) - Returns - Dict: The loaded PyTorch execution trace data. - """ - logging.info("Loading PyTorch execution traces from file.") - try: - with open(self.input_filename, "r") as pytorch_et: - return json.load(pytorch_et) - except IOError as e: - logging.error(f"Error opening file {self.input_filename}: {e}") - raise Exception(f"Could not open file {self.input_filename}") from e + parent_to_children_map = self.update_parent_to_children_map(protobuf_node_map) + + self.identify_cyclic_dependencies(protobuf_node_map) + + self.write_protobuf_execution_trace(output_filename, json_metadata, protobuf_node_map) - def _parse_and_instantiate_nodes( - self, pytorch_et_data: Dict - ) -> Tuple[str, int, str, int, int, Dict[int, PyTorchNode]]: + if simulate: + self.simulate_execution(json_node_map, protobuf_node_map, parent_to_children_map) + + def load_json_execution_traces(self, input_filename: str) -> Dict: """ - Parse and instantiate PyTorch nodes from execution trace data. + Load Chakra host + device execution traces in JSON format from a file. Args: - pytorch_et_data (Dict): The execution trace data. - - Extract node information, sort nodes by timestamp, and establish parent-child relationships among them. + input_filename (str): Input Chakra host + device execution trace in the JSON format. Returns: - Tuple: A tuple containing PyTorch schema, PID, time, start timestamp, finish timestamp, and dictionary of - PyTorch node objects. - """ - logging.info("Extracting and processing node data from execution trace.") - pytorch_schema = pytorch_et_data["schema"] - pytorch_pid = pytorch_et_data["pid"] - pytorch_time = pytorch_et_data["time"] - pytorch_start_ts = pytorch_et_data["start_ts"] - pytorch_finish_ts = pytorch_et_data["finish_ts"] - - pytorch_nodes = pytorch_et_data["nodes"] - pytorch_node_objects = {node_data["id"]: PyTorchNode(pytorch_schema, node_data) for node_data in pytorch_nodes} - pytorch_root_nids = [] - pytorch_node_objects = self._establish_parent_child_relationships(pytorch_node_objects, pytorch_root_nids) - return pytorch_schema, pytorch_pid, pytorch_time, pytorch_start_ts, pytorch_finish_ts, pytorch_node_objects - - def _establish_parent_child_relationships( - self, pytorch_node_objects: Dict[int, PyTorchNode], pytorch_root_nids: List[int] - ) -> Dict[int, PyTorchNode]: + Dict: The loaded Chakra host + device execution trace data. """ - Establish parent-child relationships among PyTorch nodes and count the node types. + logging.debug(f"Loading Chakra host + device execution traces in JSON format from file: {input_filename}") + with open(input_filename, "r") as json_file: + return json.load(json_file) + + def parse_json_trace(self, json_trace: Dict) -> Tuple[Dict, Dict[int, PyTorchNode]]: + """ + Parse and instantiate PyTorch nodes from execution trace data. Args: - pytorch_node_objects (Dict[int, PyTorchNode]): Dictionary of PyTorch node objects. - pytorch_root_nids (List[int]): List to store root node IDs. + json_trace (Dict): The execution trace data. + + Extract node information, sort nodes by timestamp, and establish parent-child relationships among them. Returns: - Dict[int, PyTorchNode]: Dictionary of PyTorch nodes with established relationships. + Tuple: A tuple containing JSON metadata and dictionary of PyTorch node objects. """ - node_type_counts = self._initialize_node_type_counts() + logging.debug("Extracting and processing node data from execution trace.") - for pytorch_node in pytorch_node_objects.values(): - parent_id = pytorch_node.parent - if parent_id in pytorch_node_objects: - self._process_parent_child_relationships(pytorch_node_objects, pytorch_node, parent_id) + json_metadata = { + "schema": json_trace["schema"], + "pid": json_trace["pid"], + "time": json_trace["time"], + "start_ts": json_trace["start_ts"], + "finish_ts": json_trace["finish_ts"], + } - if self._is_root_node(pytorch_node): - pytorch_root_nids.append(pytorch_node.id) - node_type_counts["root_op"] += 1 + logging.debug(f"Parsed JSON metadata: {json_metadata}") - self._update_node_type_counts(node_type_counts, pytorch_node) + json_nodes = json_trace["nodes"] + node_count = len(json_nodes) + logging.debug(f"Number of nodes in JSON trace: {node_count}") - for node_type, count in node_type_counts.items(): - logging.info(f"{node_type}: {count}") + json_node_map = {node_data["id"]: PyTorchNode(json_trace["schema"], node_data) for node_data in json_nodes} + json_node_root_nids = [] + json_node_map = self.establish_parent_child_relationships(json_node_map, json_node_root_nids) + return json_metadata, json_node_map - return pytorch_node_objects - - def _initialize_node_type_counts(self) -> Dict[str, int]: + def establish_parent_child_relationships( + self, json_node_map: Dict[int, PyTorchNode], json_node_root_nids: List[int] + ) -> Dict[int, PyTorchNode]: """ - Initialize counters for different types of nodes. + Establish parent-child relationships among JSON nodes and count the node types. - Returns - Dict[str, int]: A dictionary with node type counters initialized to zero. + In Chakra host execution traces, the parent-child relationship is represented in the ctrl dep or parent field. + The name of the field is determined by the schema version of the Chakra host execution traces. When a function + calls multiple functions, the callee functions appear as children nodes in the control dependency. This method + is responsible for reading such dependencies and updating the field accordingly. + + Args: + json_node_map (Dict[int, PyTorchNode]): Dictionary of JSON node objects. + json_node_root_nids (List[int]): List to store root node IDs. + + Returns: + Dict[int, PyTorchNode]: Dictionary of JSON nodes with established relationships. """ - return { + node_type_counts = { "total_op": 0, "cpu_op": 0, "gpu_op": 0, @@ -175,100 +134,107 @@ def _initialize_node_type_counts(self) -> Dict[str, int]: "root_op": 0, } - def _is_root_node(self, pytorch_node: PyTorchNode) -> bool: + for json_node in json_node_map.values(): + parent_id = json_node.parent + if parent_id in json_node_map: + self.process_parent_child_relationships(json_node_map, json_node, parent_id) + + if self.is_root_node(json_node.name): + json_node_root_nids.append(json_node.id) + node_type_counts["root_op"] += 1 + + node_type_counts["total_op"] += 1 + if json_node.is_cpu_op(): + node_type_counts["cpu_op"] += 1 + if json_node.is_gpu_op(): + node_type_counts["gpu_op"] += 1 + if json_node.is_record_param_comms_op(): + node_type_counts["record_param_comms_op"] += 1 + if json_node.is_nccl_op(): + node_type_counts["nccl_op"] += 1 + + for node_type, count in node_type_counts.items(): + logging.debug(f"{node_type}: {count}") + + return json_node_map + + def is_root_node(self, node_name: str) -> bool: """ - Check if a given PyTorch node is a root node. + Check if a given node name corresponds to a root node in the Chakra host execution trace. + + In the context of Chakra host execution traces, root nodes are the starting points of execution traces. + These nodes typically do not have parent nodes and act as the original sources of execution flow. + The execution trace has a call-stack-like structure in the ctrl-dep field (or parent field), and root + nodes should be identified during the process of conversion. + + Chakra host execution traces may have multiple root nodes. These root nodes can be identified with specific + keywords as shown in this method. Identifying root nodes is essential for correctly converting and representing + the execution trace in the Chakra protobuf format. Args: - pytorch_node (PyTorchNode): The PyTorch node to check. + node_name (str): The name of the node to check. Returns: - bool: True if the node is a root node, False otherwise. + bool: True if the node name corresponds to a root node, False otherwise. """ - return pytorch_node.name in [ + return node_name in [ "[pytorch|profiler|execution_graph|thread]", "[pytorch|profiler|execution_trace|thread]", ] - def _process_parent_child_relationships( - self, pytorch_node_objects: Dict[int, PyTorchNode], pytorch_node: PyTorchNode, parent_id: int + def process_parent_child_relationships( + self, json_node_map: Dict[int, PyTorchNode], json_node: PyTorchNode, parent_id: int ) -> None: """ - Process the parent-child relationships for PyTorch nodes. + Process the parent-child relationships for Chakra JSON nodes. Args: - pytorch_node_objects (Dict[int, PyTorchNode]): Dictionary of PyTorch node objects. - pytorch_node (PyTorchNode): The current PyTorch node being processed. + json_node_map (Dict[int, PyTorchNode]): Dictionary of JSON node objects. + json_node (PyTorchNode): The current JSON node being processed. parent_id (int): The ID of the parent node. """ - parent_node = pytorch_node_objects[parent_id] - parent_node.add_child(pytorch_node) + parent_node = json_node_map[parent_id] + parent_node.add_child(json_node) - if pytorch_node.is_gpu_op(): - parent_node.add_gpu_child(pytorch_node) + if json_node.is_gpu_op(): + parent_node.add_gpu_child(json_node) - if pytorch_node.is_record_param_comms_op(): - parent_node.record_param_comms_node = pytorch_node + if json_node.is_record_param_comms_op(): + # Add the record_param_comms node to the parent. + # These operators act as metadata operators between the launcher and the actual communication operator. + # This registration allows the converter to easily identify the communication operator to use. + parent_node.record_param_comms_node = json_node - if pytorch_node.is_nccl_op(): - parent_node.nccl_node = pytorch_node + if json_node.is_nccl_op(): + # Add the NCCL node to the parent. + # NCCL operators are actual communication operators. + # This registration allows the converter to easily identify the communication operator to use. + parent_node.nccl_node = json_node - def _update_node_type_counts(self, node_type_counts: Dict[str, int], pytorch_node: PyTorchNode) -> None: - """ - Update the node type counts based on the current PyTorch node. - - Args: - node_type_counts (Dict[str, int]): Dictionary of node type counts. - pytorch_node (PyTorchNode): The current PyTorch node being processed. + def convert_json_to_protobuf_nodes( + self, json_node_map: Dict[int, PyTorchNode], protobuf_node_map: Dict[int, ChakraNode] + ) -> None: """ - node_type_counts["total_op"] += 1 - if pytorch_node.is_cpu_op(): - node_type_counts["cpu_op"] += 1 - if pytorch_node.is_gpu_op(): - node_type_counts["gpu_op"] += 1 - if pytorch_node.is_record_param_comms_op(): - node_type_counts["record_param_comms_op"] += 1 - if pytorch_node.is_nccl_op(): - node_type_counts["nccl_op"] += 1 + Convert JSON nodes to Protobuf nodes. - def open_chakra_execution_trace(self, output_filename: str) -> IO[bytes]: - """ - Open the Chakra execution trace file for writing. + This method directly converts JSON nodes to Protobuf nodes without considering any dependencies. Dependencies + will be handled by the convert_ctrl_dep_to_data_dep method. Args: - output_filename (str): Name of the output file for the converted Chakra trace. - - Raises: - Exception: If there is an IOError in opening the file. - - Returns: - IO[bytes]: File handle for the Chakra execution trace output file. - """ - logging.info(f"Opening Chakra execution trace file: {output_filename}") - try: - chakra_et = open(output_filename, "wb") # noqa: SIM115 - return chakra_et - except IOError as e: - err_msg = f"Error opening file {output_filename}: {e}" - logging.error(err_msg) - raise Exception(err_msg) from e - - def convert_nodes(self, pytorch_nodes: Dict[int, PyTorchNode], chakra_nodes: Dict[int, ChakraNode]) -> None: + json_node_map (Dict[int, PyTorchNode]): Dictionary of JSON nodes to be converted. + protobuf_node_map (Dict[int, ChakraNode]): Dictionary where the converted Protobuf nodes will be stored. """ - Convert PyTorch nodes to Chakra nodes. - - This method traverses through the PyTorch nodes and converts them to Chakra nodes. It also handles special - cases for GPU nodes and collective communication types. - """ - for _, pytorch_node in pytorch_nodes.items(): - if (pytorch_node.get_op_type() == PyTorchNodeType.CPU_OP) or ( - pytorch_node.get_op_type() == PyTorchNodeType.LABEL + for _, json_node in json_node_map.items(): + if (json_node.get_op_type() == PyTorchNodeType.CPU_OP) or ( + json_node.get_op_type() == PyTorchNodeType.LABEL ): - chakra_node = self.convert_to_chakra_node(pytorch_nodes, chakra_nodes, pytorch_node) - chakra_nodes[chakra_node.id] = chakra_node + chakra_node = self.convert_json_to_protobuf_node(json_node_map, protobuf_node_map, json_node) + protobuf_node_map[chakra_node.id] = chakra_node - for pytorch_gpu_node in pytorch_node.gpu_children: - chakra_gpu_node = self.convert_to_chakra_node(pytorch_nodes, chakra_nodes, pytorch_gpu_node) + for pytorch_gpu_node in json_node.gpu_children: + chakra_gpu_node = self.convert_json_to_protobuf_node( + json_node_map, protobuf_node_map, pytorch_gpu_node + ) if chakra_gpu_node.type == COMM_COLL_NODE: collective_comm_type = self.get_collective_comm_type(pytorch_gpu_node.name) @@ -286,82 +252,85 @@ def convert_nodes(self, pytorch_nodes: Dict[int, PyTorchNode], chakra_nodes: Dic ] ) - chakra_nodes[chakra_gpu_node.id] = chakra_gpu_node + protobuf_node_map[chakra_gpu_node.id] = chakra_gpu_node - def convert_to_chakra_node( - self, pytorch_nodes: Dict[int, PyTorchNode], chakra_nodes: Dict[int, ChakraNode], pytorch_node: PyTorchNode + def convert_json_to_protobuf_node( + self, + json_node_map: Dict[int, PyTorchNode], + protobuf_node_map: Dict[int, ChakraNode], + json_node: PyTorchNode, ) -> ChakraNode: """ - Convert a PyTorchNode to a ChakraNode. + Convert a JSON node (PyTorchNode) to a protobuf node (ChakraNode). + + This method takes a JSON node from the Chakra host + device execution trace and converts it to a protobuf node. + The conversion includes transferring attributes, types, and dependencies from the JSON node to the protobuf + node. Special handling is performed for nodes covering more than 90% of the runtime, such as Optimizer.step, + to filter them out. Args: - pytorch_nodes (Dict[int, PyTorchNode]): Dictionary of PyTorch nodes. - chakra_nodes (Dict[int, ChakraNode]): Dictionary of existing Chakra nodes. - pytorch_node (PyTorchNode): The PyTorch node to convert. + json_node_map (Dict[int, PyTorchNode]): Dictionary of JSON nodes. + protobuf_node_map (Dict[int, ChakraNode]): Dictionary of protobuf nodes. + json_node (PyTorchNode): The JSON node to convert. Returns: - ChakraNode: The converted Chakra node. - """ - logging.debug(f"Converting PyTorch node ID {pytorch_node.id} to Chakra node.") - chakra_node = ChakraNode() - chakra_node.id = pytorch_node.id - chakra_node.name = pytorch_node.name - chakra_node.type = self.get_chakra_node_type_from_pytorch_node(pytorch_nodes, pytorch_node) - if pytorch_node.parent in chakra_nodes: - chakra_node.ctrl_deps.append(pytorch_node.parent) - chakra_node.duration_micros = int(pytorch_node.exclusive_dur) - - """ - Quick and straightforward solution to identify an operator that covers more than 90% of the runtime. These are - usually user_annotation operators and should be ignored. One such case is Optimizer.step, which we filter out - in this code. Ideally, we should identify any user annotation nodes that cover more than 90% of the runtime and - then set their runtime to 0. - - Note: We will cover this with a more general solution. - """ - if "Optimizer.step" in pytorch_node.name: - chakra_node.duration_micros = 0 - - chakra_node.inputs.values = str(pytorch_node.inputs["values"]) - chakra_node.inputs.shapes = str(pytorch_node.inputs["shapes"]) - chakra_node.inputs.types = str(pytorch_node.inputs["types"]) - chakra_node.outputs.values = str(pytorch_node.outputs["values"]) - chakra_node.outputs.shapes = str(pytorch_node.outputs["shapes"]) - chakra_node.outputs.types = str(pytorch_node.outputs["types"]) - chakra_node.attr.extend( + ChakraNode: The converted protobuf node. + """ + logging.debug(f"Converting JSON node ID {json_node.id} to protobuf node.") + + protobuf_node = ChakraNode() + protobuf_node.id = json_node.id + protobuf_node.name = json_node.name + protobuf_node.type = self.get_protobuf_node_type_from_json_node(json_node_map, json_node) + if json_node.parent in protobuf_node_map: + protobuf_node.ctrl_deps.append(json_node.parent) + protobuf_node.duration_micros = int(json_node.exclusive_dur) + + # Handle nodes covering more than 90% of the runtime + if "Optimizer.step" in json_node.name: + protobuf_node.duration_micros = 0 + + protobuf_node.inputs.values = str(json_node.inputs["values"]) + protobuf_node.inputs.shapes = str(json_node.inputs["shapes"]) + protobuf_node.inputs.types = str(json_node.inputs["types"]) + protobuf_node.outputs.values = str(json_node.outputs["values"]) + protobuf_node.outputs.shapes = str(json_node.outputs["shapes"]) + protobuf_node.outputs.types = str(json_node.outputs["types"]) + protobuf_node.attr.extend( [ - ChakraAttr(name="rf_id", int64_val=pytorch_node.rf_id), - ChakraAttr(name="fw_parent", int64_val=pytorch_node.fw_parent), - ChakraAttr(name="seq_id", int64_val=pytorch_node.seq_id), - ChakraAttr(name="scope", int64_val=pytorch_node.scope), - ChakraAttr(name="tid", int64_val=pytorch_node.tid), - ChakraAttr(name="fw_tid", int64_val=pytorch_node.fw_tid), - ChakraAttr(name="op_schema", string_val=pytorch_node.op_schema), - ChakraAttr(name="is_cpu_op", bool_val=not pytorch_node.is_gpu_op()), + ChakraAttr(name="rf_id", int64_val=json_node.rf_id), + ChakraAttr(name="fw_parent", int64_val=json_node.fw_parent), + ChakraAttr(name="seq_id", int64_val=json_node.seq_id), + ChakraAttr(name="scope", int64_val=json_node.scope), + ChakraAttr(name="tid", int64_val=json_node.tid), + ChakraAttr(name="fw_tid", int64_val=json_node.fw_tid), + ChakraAttr(name="op_schema", string_val=json_node.op_schema), + ChakraAttr(name="is_cpu_op", bool_val=not json_node.is_gpu_op()), ] ) - if pytorch_node.stream is not None: - chakra_node.attr.append(ChakraAttr(name="stream", int64_val=pytorch_node.stream)) - return chakra_node + if json_node.stream is not None: + protobuf_node.attr.append(ChakraAttr(name="stream", int64_val=json_node.stream)) + + return protobuf_node - def get_chakra_node_type_from_pytorch_node( - self, pytorch_nodes: Dict[int, PyTorchNode], pytorch_node: PyTorchNode + def get_protobuf_node_type_from_json_node( + self, json_node_map: Dict[int, PyTorchNode], json_node: PyTorchNode ) -> int: """ - Determine the Chakra node type from a PyTorch node. + Determine the Protobuf node type from a Chakra node. Args: - pytorch_nodes (Dict[int, PyTorchNode]): Dictionary of PyTorch nodes. - pytorch_node (PyTorchNode): The PyTorch node to determine the type of. + json_node_map (Dict[int, PyTorchNode]): Dictionary of JSON nodes. + json_node (PyTorchNode): The JSON node to determine the type of. Returns: int: The corresponding Chakra node type. """ - if pytorch_node.is_gpu_op(): - if "ncclDevKernel_SendRecv" in pytorch_node.name: - parent_node = pytorch_nodes[pytorch_node.parent] + if json_node.is_gpu_op(): + if "ncclDevKernel_SendRecv" in json_node.name: + parent_node = json_node_map[json_node.parent] keyword = ( - pytorch_nodes[parent_node.parent].name + json_node_map[parent_node.parent].name if parent_node.name == "record_param_comms" else parent_node.name ) @@ -369,7 +338,7 @@ def get_chakra_node_type_from_pytorch_node( return COMM_SEND_NODE if "recv" in keyword: return COMM_RECV_NODE - if "ncclKernel" in pytorch_node.name or "ncclDevKernel" in pytorch_node.name: + if "ncclKernel" in json_node.name or "ncclDevKernel" in json_node.name: return COMM_COLL_NODE return COMP_NODE @@ -399,70 +368,39 @@ def get_collective_comm_type(self, name: str) -> int: if key in normalized_name: return comm_type_mapping[key] raise ValueError( - f"'{name}' not found in collective communication mapping. " - "Please add this collective communication name to the mapping." + f"The name '{name}' does not correspond to a recognized collective communication type. " + "The converter determines collective communication types based on the node name of a GPU operator. " + f"However, it failed to identify the type for '{name}'. " + "If this is a valid collective communication type, please update the converter code to include " + "the appropriate mapping in comm_type_mapping. " + "Investigate this issue or report it on GitHub for further assistance." ) - def is_root_node(self, node: ChakraNode) -> bool: - """ - Determine whether a given node is a root node in the execution trace. - - In the context of PyTorch execution traces, root nodes are the starting points of execution graphs or execution - traces. These nodes typically do not have parent nodes and act as the original sources of execution flow. This - method identifies such root nodes based on their names. Specifically, nodes with names indicating they are part - of the PyTorch execution graph or execution trace threads are considered root nodes. - - Args: - node (ChakraNode): The node to be evaluated. - - Returns: - bool: True if the node is a root node, False otherwise. - """ - return node.name in ["[pytorch|profiler|execution_graph|thread]", "[pytorch|profiler|execution_trace|thread]"] - - def convert_ctrl_dep_to_data_dep( # noqa: C901 + def convert_ctrl_dep_to_data_dep( self, - pytorch_nodes: Dict[int, PyTorchNode], - chakra_nodes: Dict[int, ChakraNode], + json_node_map: Dict[int, PyTorchNode], + protobuf_node_map: Dict[int, ChakraNode], chakra_node: ChakraNode, ) -> None: """ Convert control dependencies to data dependencies in Chakra nodes. - Traverse nodes based on control dependencies (parent nodes) and encode data dependencies appropriately. This - method is crucial for converting the dependency structure from PyTorch execution traces to Chakra execution - traces. In PyTorch traces, control dependencies are represented by a parent field in each node, denoting the - parent node ID. This structure indicates which functions (operators) are called by a particular operator. - - In contrast, Chakra execution traces, while retaining control dependencies for compatibility, primarily rely on - data dependencies to represent relationships between nodes. Data dependencies in Chakra are more broadly - defined compared to those in PyTorch, where they are implicitly encoded in tensor input-output relationships. - In Chakra, data dependencies are explicit and represent a general dependency between nodes. - - To convert PyTorch's control dependencies to Chakra's data dependencies, a Depth-First Search (DFS) is - performed. The DFS traversal starts from a given Chakra node, traversing through its children (based on control - dependencies). During traversal, data dependencies are encoded by linking nodes that have been visited in - sequence. These dependencies form a chain, mirroring the function call order from the PyTorch trace. - - Special attention is given to the types of nodes involved. CPU and label nodes (non-GPU) in PyTorch can only - depend on other CPU or label nodes. However, GPU nodes can depend on any type of node. Thus, while traversing, - if a GPU node is encountered, it can establish a data dependency with the last visited node of any type. For - CPU and label nodes, the dependency is only established with the last visited non-GPU node. This distinction - ensures that the converted dependencies accurately reflect the execution dynamics of the original PyTorch trace - within the Chakra framework. - - Additionally, this method enforces sequential dependencies between GPU operators within the same stream. It - ensures that the execution order of GPU operators is preserved in the Chakra trace, reflecting the sequential - execution within the same GPU stream in the original PyTorch trace. - - Furthermore, inter-thread dependencies are explicitly encoded in the Chakra execution traces. This feature - allows for the representation of dependencies across different CPU threads, which are observed in Kineto traces - via chrome://tracing. These dependencies are crucial for understanding the interaction between CPU threads and - ensuring accurate modeling and analysis of concurrent operations within the Chakra framework. + This method converts the control dependencies found in Chakra host traces collected from PyTorch + into data dependencies, which are required by most simulators. In Chakra host traces, control dependencies + represent the caller-callee relationship during execution. When an operator calls other operators, + the caller becomes the parent, and the called operators become children. The order of these function calls + is reflected in their node IDs, with lower IDs indicating earlier execution. + + Simulators, however, expect dependencies to represent the actual execution order, which is encoded in the + data dependency field. This method performs this conversion by traversing the control dependencies and + encoding them as data dependencies. + + Additionally, this method handles inter-thread dependencies. These dependencies are captured and encoded to + ensure that the execution flow across multiple threads is correctly represented. Args: - pytorch_nodes (Dict[int, PyTorchNode]): Dictionary of PyTorch nodes. - chakra_nodes (Dict[int, ChakraNode]): Dictionary of Chakra nodes. + json_node_map (Dict[int, PyTorchNode]): Dictionary of PyTorch nodes. + protobuf_node_map (Dict[int, ChakraNode]): Dictionary of Chakra nodes. chakra_node (ChakraNode): The starting node for the traversal and dependency processing. """ visited: Set[int] = set() @@ -476,11 +414,11 @@ def convert_ctrl_dep_to_data_dep( # noqa: C901 continue visited.add(current_node.id) - pytorch_node = pytorch_nodes.get(current_node.id) - if not pytorch_node: + json_node = json_node_map.get(current_node.id) + if not json_node: continue - node_op_type = pytorch_node.get_op_type() + node_op_type = json_node.get_op_type() if node_op_type == PyTorchNodeType.GPU_OP: if last_visited_any and last_visited_any.id not in current_node.data_deps: @@ -490,12 +428,13 @@ def convert_ctrl_dep_to_data_dep( # noqa: C901 ) last_visited_any = last_visited_non_gpu else: - if pytorch_node.inter_thread_dep: - dep_id = pytorch_node.inter_thread_dep + if json_node.inter_thread_dep: + dep_id = json_node.inter_thread_dep if dep_id not in current_node.data_deps: current_node.data_deps.append(dep_id) logging.debug( - f"CPU Node ID {current_node.id} now has an inter-thread data dependency on Node ID {dep_id}" + f"CPU Node ID {current_node.id} now has an inter-thread data dependency on Node ID " + f"{dep_id}" ) if last_visited_non_gpu and last_visited_non_gpu.id not in current_node.data_deps: current_node.data_deps.append(last_visited_non_gpu.id) @@ -506,65 +445,70 @@ def convert_ctrl_dep_to_data_dep( # noqa: C901 last_visited_non_gpu = current_node last_visited_any = current_node - children_chakra_ids = [child.id for child in pytorch_node.children] + children_chakra_ids = [child.id for child in json_node.children] for child_chakra_id in sorted(children_chakra_ids, reverse=True): - child_chakra_node = chakra_nodes.get(child_chakra_id) + child_chakra_node = protobuf_node_map.get(child_chakra_id) if child_chakra_node and child_chakra_node.id not in visited: stack.append(child_chakra_node) - def remove_dangling_nodes(self, chakra_nodes: Dict[int, ChakraNode]) -> Dict[int, ChakraNode]: + def remove_dangling_nodes(self, protobuf_node_map: Dict[int, ChakraNode]) -> Dict[int, ChakraNode]: """ - Remove any dangling nodes from the chakra_nodes dictionary. + Remove any dangling nodes from the protobuf_node_map dictionary. - A node is considered dangling if it has no parents and no children. + Dangling nodes are nodes that have neither children nor parents. These nodes are identified after the + conversion and are typically unnecessary. Removing these nodes simplifies simulation and avoids potential + complications. Args: - chakra_nodes (Dict[int, ChakraNode]): Dictionary of Chakra nodes. + protobuf_node_map (Dict[int, ChakraNode]): Dictionary of protobuf nodes. Returns: - Dict[int, ChakraNode]: Updated dictionary of Chakra nodes with dangling nodes removed. + Dict[int, ChakraNode]: Updated dictionary of protobuf nodes with dangling nodes removed. """ parent_ids = set() - for node in chakra_nodes.values(): + for node in protobuf_node_map.values(): parent_ids.update(node.data_deps) dangling_nodes = [ - node_id for node_id, node in chakra_nodes.items() if node_id not in parent_ids and not node.data_deps + node_id for node_id, node in protobuf_node_map.items() if node_id not in parent_ids and not node.data_deps ] for node_id in dangling_nodes: - del chakra_nodes[node_id] + del protobuf_node_map[node_id] if dangling_nodes: - logging.info(f"Identified and removed {len(dangling_nodes)} dangling nodes:") + logging.debug(f"Identified and removed {len(dangling_nodes)} dangling nodes:") for node_id in dangling_nodes: - logging.info(f" - Node ID {node_id}") + logging.debug(f" - Node ID {node_id}") - return chakra_nodes + return protobuf_node_map - def update_parent_to_children_map(self, chakra_nodes: Dict[int, ChakraNode]) -> Dict[int, List[int]]: + def update_parent_to_children_map(self, protobuf_node_map: Dict[int, ChakraNode]) -> Dict[int, List[int]]: """ Update the parent_to_children_map based on the data dependencies of each node. This map is used to efficiently simulate node execution based on data dependencies. """ parent_to_children_map = {} - for node_id, node in chakra_nodes.items(): + for node_id, node in protobuf_node_map.items(): for dep_id in node.data_deps: if dep_id not in parent_to_children_map: parent_to_children_map[dep_id] = [] parent_to_children_map[dep_id].append(node_id) return parent_to_children_map - def identify_cyclic_dependencies(self, chakra_nodes: Dict[int, ChakraNode]) -> None: + def identify_cyclic_dependencies(self, protobuf_node_map: Dict[int, ChakraNode]) -> None: """ - Identify if there are any cyclic dependencies among Chakra nodes. + Identify if there are any cyclic dependencies among protobuf nodes. - This method checks for cycles in the graph of Chakra nodes using a depth-first search (DFS) algorithm. It logs - an error message and raises an exception if a cycle is detected, ensuring the graph is a Directed Acyclic Graph - (DAG). + This method checks for cycles in the graph of protobuf nodes using a depth-first search (DFS) algorithm. It + logs an error message and raises an exception if a cycle is detected, ensuring the graph is a Directed Acyclic + Graph (DAG). + + Args: + protobuf_node_map (Dict[int, ChakraNode]): Dictionary of protobuf nodes to check for cyclic dependencies. - Raises - Exception: If a cyclic dependency is detected among the Chakra nodes. + Raises: + Exception: If a cyclic dependency is detected among the protobuf nodes. """ visited = set() stack = set() @@ -581,8 +525,15 @@ def dfs(node_id: int, path: List[int]) -> bool: bool: True if a cycle is detected, False otherwise. """ if node_id in stack: - cycle_nodes = " -> ".join([chakra_nodes[n].name for n in path + [node_id]]) - logging.error(f"Cyclic dependency detected: {cycle_nodes}") + cycle_nodes = " -> ".join([protobuf_node_map[n].name for n in path + [node_id]]) + err_msg = ( + f"Cyclic dependency detected: {cycle_nodes}. The conversion failed because a cyclic dependency " + f"was detected. Cyclic dependencies should not exist. The input and output traces must form a " + f"Directed Acyclic Graph (DAG). This is essential for simulation; otherwise, simulators cannot " + f"resolve the next dependency-free node and will hang. This indicates a bug in the conversion " + f"process. Please investigate or report this issue on GitHub." + ) + logging.error(err_msg) return True if node_id in visited: return False @@ -590,103 +541,104 @@ def dfs(node_id: int, path: List[int]) -> bool: visited.add(node_id) stack.add(node_id) path.append(node_id) - for child_id in chakra_nodes[node_id].data_deps: + for child_id in protobuf_node_map[node_id].data_deps: if dfs(child_id, path.copy()): return True stack.remove(node_id) path.pop() return False - for node_id in chakra_nodes: + for node_id in protobuf_node_map: if dfs(node_id, []): - raise Exception(f"Cyclic dependency detected starting from node {chakra_nodes[node_id].name}") + err_msg = ( + "Cyclic dependency detected. The conversion failed because a cyclic dependency " + "was detected. Cyclic dependencies should not exist. The input and output traces must form a " + "Directed Acyclic Graph (DAG). This is essential for simulation; otherwise, simulators cannot " + "resolve the next dependency-free node and will hang. This indicates a bug in the conversion " + "process. Please investigate or report this issue on GitHub." + ) + logging.error(err_msg) + raise Exception(err_msg) - def write_chakra_et( + def write_protobuf_execution_trace( self, - chakra_et: IO[bytes], - pytorch_schema: str, - pytorch_pid: int, - pytorch_time: str, - pytorch_start_ts: int, - pytorch_finish_ts: int, - chakra_nodes: Dict[int, ChakraNode], + output_filename: str, + json_metadata: Dict, + protobuf_node_map: Dict[int, ChakraNode], ) -> None: """ Write the Chakra execution trace by encoding global metadata and nodes. - Encode and write both the metadata and individual nodes to create a - complete execution trace. - """ - logging.info("Writing Chakra execution trace.") - self._write_global_metadata( - chakra_et, pytorch_schema, pytorch_pid, pytorch_time, pytorch_start_ts, pytorch_finish_ts - ) - self._encode_and_write_nodes(chakra_et, chakra_nodes) - logging.info("Chakra execution trace writing completed.") + Encode and write both the metadata and individual nodes to create a complete execution trace. - def _write_global_metadata( + Args: + output_filename (str): The name of the output file for the protobuf execution trace. + json_metadata (Dict): The metadata from the JSON trace. + protobuf_node_map (Dict[int, ChakraNode]): The converted Chakra nodes. + """ + logging.debug(f"Opening Chakra execution trace file: {output_filename}") + with open(output_filename, "wb") as protobuf_et: + logging.debug("Writing Chakra execution trace.") + self.write_global_metadata(protobuf_et, json_metadata) + self.encode_and_write_nodes(protobuf_et, protobuf_node_map) + logging.debug("Chakra execution trace writing completed.") + + def write_global_metadata( self, - chakra_et: IO[bytes], - pytorch_schema: str, - pytorch_pid: int, - pytorch_time: str, - pytorch_start_ts: int, - pytorch_finish_ts: int, + protobuf_et: IO[bytes], + metadata: Dict, ) -> None: """ Encode and write global metadata for the Chakra execution trace. - This process includes encoding metadata like schema, process ID, timestamps, - and other relevant information for the Chakra execution trace. + Args: + protobuf_et (IO[bytes]): The output file handle for the protobuf execution trace. + metadata (Dict): The metadata dictionary containing schema, pid, time, start_ts, and finish_ts. """ - logging.info("Encoding global metadata for Chakra execution trace.") + logging.debug("Encoding global metadata for Chakra execution trace.") global_metadata = GlobalMetadata( attr=[ - ChakraAttr(name="schema", string_val=pytorch_schema), - ChakraAttr(name="pid", uint64_val=pytorch_pid), - ChakraAttr(name="time", string_val=pytorch_time), - ChakraAttr(name="start_ts", uint64_val=pytorch_start_ts), - ChakraAttr(name="finish_ts", uint64_val=pytorch_finish_ts), + ChakraAttr(name="schema", string_val=metadata["schema"]), + ChakraAttr(name="pid", uint64_val=metadata["pid"]), + ChakraAttr(name="time", string_val=metadata["time"]), + ChakraAttr(name="start_ts", uint64_val=metadata["start_ts"]), + ChakraAttr(name="finish_ts", uint64_val=metadata["finish_ts"]), ] ) - encode_message(chakra_et, global_metadata) + encode_message(protobuf_et, global_metadata) - def _encode_and_write_nodes(self, chakra_et: IO[bytes], chakra_nodes: Dict[int, ChakraNode]) -> None: + def encode_and_write_nodes(self, protobuf_et: IO[bytes], protobuf_node_map: Dict[int, ChakraNode]) -> None: """ - Encode and write nodes for the Chakra execution trace. + Encode and write nodes for the Chakra host + device execution trace in the protobuf format. - Each node from the PyTorch execution trace is encoded and written into the Chakra format. This includes node + Each node from the JSON execution trace is encoded and written into the protobuf format. This includes node IDs, names, types, dependencies, and other attributes. + + Args: + protobuf_et (IO[bytes]): The output file handle for the protobuf execution trace. + protobuf_node_map (Dict[int, ChakraNode]): Dictionary of protobuf nodes to be encoded and written. """ - logging.info("Encoding and writing nodes for Chakra execution trace.") + logging.debug("Encoding and writing nodes for Chakra execution trace.") seen_nids = set() - for nid in sorted(chakra_nodes.keys()): + for nid in sorted(protobuf_node_map.keys()): if nid in seen_nids: - err_msg = f"Duplicate NID {nid} detected in Chakra nodes." + err_msg = ( + f"Duplicate NID {nid} detected in Chakra nodes while writing nodes to the file. " + f"Node IDs are expected to be unique, and encountering a duplicate indicates an issue in the " + f"conversion process. Please check if the same node was registered twice. If not, report this " + f"issue for further investigation." + ) logging.error(err_msg) raise ValueError(err_msg) seen_nids.add(nid) - chakra_node = chakra_nodes[nid] - encode_message(chakra_et, chakra_node) - - def close_chakra_execution_trace(self, chakra_et: IO[bytes]) -> None: - """ - Close the Chakra execution trace file if it is open. - - Ensure proper closure of the trace file to preserve data integrity. - - Args: - chakra_et (IO[bytes]): File handle for the Chakra execution trace output file. - """ - logging.info("Closing Chakra execution trace file.") - if chakra_et and not chakra_et.closed: - chakra_et.close() + chakra_node = protobuf_node_map[nid] + encode_message(protobuf_et, chakra_node) # ruff: noqa: C901 def simulate_execution( self, - chakra_nodes: Dict[int, ChakraNode], - pytorch_nodes: Dict[int, PyTorchNode], + json_node_map: Dict[int, PyTorchNode], + protobuf_node_map: Dict[int, ChakraNode], parent_to_children_map: Dict[int, List[int]], ) -> None: """ @@ -696,21 +648,21 @@ def simulate_execution( by dependency resolution. A simplistic global clock is used to model the execution time. Args: - chakra_nodes (Dict[int, ChakraNode]): The Chakra nodes to be simulated. - pytorch_nodes (Dict[int, PyTorchNode]): The PyTorch nodes to reference for additional information. + json_node_map (Dict[int, PyTorchNode]): The PyTorch nodes to reference for additional debugrmation. + protobuf_node_map (Dict[int, ChakraNode]): The Chakra nodes to be simulated. parent_to_children_map (Dict[int, List[int]]): Mapping from parent node IDs to their child node IDs. """ - logging.info("Simulating execution of Chakra nodes based on data dependencies.") + logging.debug("Simulating execution of Chakra nodes based on data dependencies.") ready_cpu_nodes = [ - (node_id, chakra_nodes[node_id]) - for node_id in chakra_nodes - if not chakra_nodes[node_id].data_deps and not pytorch_nodes[node_id].is_gpu_op() + (node_id, protobuf_node_map[node_id]) + for node_id in protobuf_node_map + if not protobuf_node_map[node_id].data_deps and not json_node_map[node_id].is_gpu_op() ] ready_gpu_nodes = [ - (node_id, chakra_nodes[node_id]) - for node_id in chakra_nodes - if not chakra_nodes[node_id].data_deps and pytorch_nodes[node_id].is_gpu_op() + (node_id, protobuf_node_map[node_id]) + for node_id in protobuf_node_map + if not protobuf_node_map[node_id].data_deps and json_node_map[node_id].is_gpu_op() ] ready_cpu_nodes.sort(key=lambda x: x[1].id) ready_gpu_nodes.sort(key=lambda x: x[1].id) @@ -726,22 +678,22 @@ def simulate_execution( cpu_node_id, cpu_node = ready_cpu_nodes.pop(0) current_cpu_node = (cpu_node_id, current_time) issued_nodes.add(cpu_node_id) - tid = pytorch_nodes[cpu_node_id].tid - logging.info( + tid = json_node_map[cpu_node_id].tid + logging.debug( f"Issuing CPU Node ID {cpu_node_id} ({cpu_node.name}) at {current_time}us with duration " f"{cpu_node.duration_micros}us, tid: {tid}" ) if ready_gpu_nodes: for gpu_node_id, gpu_node in ready_gpu_nodes[:]: - pytorch_node = pytorch_nodes[gpu_node_id] - stream_id = pytorch_node.stream + json_node = json_node_map[gpu_node_id] + stream_id = json_node.stream if stream_id not in current_gpu_nodes: ready_gpu_nodes.remove((gpu_node_id, gpu_node)) current_gpu_nodes[stream_id] = (gpu_node_id, current_time) issued_nodes.add(gpu_node_id) tid = f"stream {stream_id}" - logging.info( + logging.debug( f"Issuing GPU Node ID {gpu_node_id} ({gpu_node.name}) at {current_time}us on stream " f"{stream_id} with duration {gpu_node.duration_micros}us, tid: {tid}" ) @@ -750,17 +702,17 @@ def simulate_execution( if ( current_cpu_node - and current_time - current_cpu_node[1] >= chakra_nodes[current_cpu_node[0]].duration_micros + and current_time - current_cpu_node[1] >= protobuf_node_map[current_cpu_node[0]].duration_micros ): cpu_node_id, _ = current_cpu_node - tid = pytorch_nodes[cpu_node_id].tid - logging.info(f"CPU Node ID {cpu_node_id} completed at {current_time}us, tid: {tid}") + tid = json_node_map[cpu_node_id].tid + logging.debug(f"CPU Node ID {cpu_node_id} completed at {current_time}us, tid: {tid}") current_cpu_node = None completed_streams = [] for stream_id, (gpu_node_id, start_time) in current_gpu_nodes.items(): - if current_time - start_time >= chakra_nodes[gpu_node_id].duration_micros: - logging.info( + if current_time - start_time >= protobuf_node_map[gpu_node_id].duration_micros: + logging.debug( f"GPU Node ID {gpu_node_id} on stream {stream_id} completed at {current_time}us, " f"tid: stream {stream_id}" ) @@ -772,14 +724,14 @@ def simulate_execution( for node_id in list(issued_nodes): children_ids = parent_to_children_map.get(node_id, []) for child_id in children_ids: - child_node = chakra_nodes[child_id] + child_node = protobuf_node_map[child_id] child_node.data_deps.remove(node_id) if not child_node.data_deps: - if not pytorch_nodes[child_id].is_gpu_op(): + if not json_node_map[child_id].is_gpu_op(): ready_cpu_nodes.append((child_id, child_node)) else: ready_gpu_nodes.append((child_id, child_node)) issued_nodes.clear() - logging.info("Simulation of Chakra node execution completed.") + logging.debug("Simulation of Chakra node execution completed.") diff --git a/src/converter/pytorch_node.py b/src/converter/pytorch_node.py index 878ef435..89b29dcb 100644 --- a/src/converter/pytorch_node.py +++ b/src/converter/pytorch_node.py @@ -88,7 +88,13 @@ def parse_data(self, node_data: Dict[str, Any]) -> None: else: raise ValueError( f"Unsupported schema version '{self.schema}'. Please check if the schema version is in the list of " - f"supported versions: {self.SUPPORTED_VERSIONS}" + f"supported versions: {self.SUPPORTED_VERSIONS}. The schema version of the trace is not supported by " + f"the converter. The schema version is determined by the PyTorch version used to collect Chakra host " + f"execution traces. Please consider changing the PyTorch version you are using. For more details, you " + f"can follow the git history of the relevant file: " + f"https://github.com/pytorch/pytorch/blob/7cd48df2dae7e2194438b162968c47d1f05bf20e/torch/csrc/" + f"profiler/standalone/execution_trace_observer.cpp#L308. Check which PyTorch versions generate Chakra " + f"host traces that are supported by the converter." ) def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None: diff --git a/src/converter/pytorch_tensor.py b/src/converter/pytorch_tensor.py index e120b175..e706cc89 100644 --- a/src/converter/pytorch_tensor.py +++ b/src/converter/pytorch_tensor.py @@ -5,9 +5,14 @@ class PyTorchTensor: """ Represents a tensor with its associated properties. - Attributes + Attributes: tensor_data (List[int]): Data of the tensor including tensor_id, storage_id, offset, number of elements, and size of each element in bytes. + + Note: + For more details on the tensor data structure, refer to: + https://github.com/pytorch/pytorch/blob/7cd48df2dae7e2194438b162968c47d1f05bf20e/torch/csrc/profiler/ + standalone/execution_trace_observer.cpp#L400 """ def __init__(self, tensor_data: List[int]) -> None: diff --git a/tests/converter/test_pytorch_converter.py b/tests/converter/test_pytorch_converter.py index d2dd1ff7..88f2abf9 100644 --- a/tests/converter/test_pytorch_converter.py +++ b/tests/converter/test_pytorch_converter.py @@ -73,39 +73,27 @@ def mock_chakra_node() -> ChakraNode: return node -def test_initialization() -> None: - converter = PyTorchConverter("input.json", "output.json") - assert converter.input_filename == "input.json" - assert converter.output_filename == "output.json" - - @patch("builtins.open", new_callable=mock_open) -def test_load_pytorch_execution_traces(mock_file: MagicMock, sample_pytorch_data: Dict) -> None: +def test_load_json_execution_traces(mock_file: MagicMock, sample_pytorch_data: Dict) -> None: mock_file.return_value.read.return_value = json.dumps(sample_pytorch_data) - converter = PyTorchConverter("input.json", "output.json") - data = converter.load_pytorch_execution_traces() + converter = PyTorchConverter() + data = converter.load_json_execution_traces("input.json") assert data == sample_pytorch_data mock_file.assert_called_once_with("input.json", "r") -def test_parse_and_instantiate_nodes(sample_pytorch_data: Dict) -> None: - converter = PyTorchConverter("input.json", "output.json") - ( - pytorch_schema, - pytorch_pid, - pytorch_time, - pytorch_start_ts, - pytorch_finish_ts, - pytorch_nodes, - ) = converter._parse_and_instantiate_nodes(sample_pytorch_data) - assert pytorch_schema == "1.0.2-chakra.0.0.4" - assert pytorch_pid == 1234 - assert pytorch_time == "2023-01-01 12:00:00" - assert pytorch_start_ts == 1000 - assert pytorch_finish_ts == 2000 - assert len(pytorch_nodes) == 2 - assert pytorch_nodes[1].id == 1 - assert pytorch_nodes[2].id == 2 +def test_parse_json_trace(sample_pytorch_data: Dict) -> None: + converter = PyTorchConverter() + json_metadata, json_node_map = converter.parse_json_trace(sample_pytorch_data) + + assert json_metadata["schema"] == "1.0.2-chakra.0.0.4" + assert json_metadata["pid"] == 1234 + assert json_metadata["time"] == "2023-01-01 12:00:00" + assert json_metadata["start_ts"] == 1000 + assert json_metadata["finish_ts"] == 2000 + assert len(json_node_map) == 2 + assert json_node_map[1].id == 1 + assert json_node_map[2].id == 2 def create_sample_graph(parent_id: int = 0, expected_child_id: int = 0) -> Dict[int, PyTorchNode]: @@ -132,90 +120,50 @@ def create_sample_graph(parent_id: int = 0, expected_child_id: int = 0) -> Dict[ @pytest.mark.parametrize("parent_id, expected_child_id", [(1, 2), (None, None)]) def test_establish_parent_child_relationships(parent_id: int, expected_child_id: int) -> None: - converter = PyTorchConverter("input.json", "output.json") - pytorch_nodes = create_sample_graph(parent_id, expected_child_id) + converter = PyTorchConverter() + json_node_map = create_sample_graph(parent_id, expected_child_id) - pytorch_nodes = converter._establish_parent_child_relationships(pytorch_nodes, []) + json_node_map = converter.establish_parent_child_relationships(json_node_map, []) if expected_child_id: - assert pytorch_nodes[parent_id].children[0].id == expected_child_id + assert json_node_map[parent_id].children[0].id == expected_child_id else: - assert len(pytorch_nodes[1].children) == 0 + assert len(json_node_map[1].children) == 0 -def test_convert_nodes(sample_pytorch_data: Dict) -> None: - converter = PyTorchConverter("input.json", "output.json") - ( - pytorch_schema, - pytorch_pid, - pytorch_time, - pytorch_start_ts, - pytorch_finish_ts, - pytorch_nodes, - ) = converter._parse_and_instantiate_nodes(sample_pytorch_data) - pytorch_nodes = converter._establish_parent_child_relationships(pytorch_nodes, []) +def test_convert_json_to_protobuf_nodes(sample_pytorch_data: Dict) -> None: + converter = PyTorchConverter() + json_metadata, json_node_map = converter.parse_json_trace(sample_pytorch_data) + json_node_map = converter.establish_parent_child_relationships(json_node_map, []) chakra_nodes = {} - converter.convert_nodes(pytorch_nodes, chakra_nodes) + converter.convert_json_to_protobuf_nodes(json_node_map, chakra_nodes) assert len(chakra_nodes) == 2 assert chakra_nodes[1].id == 1 assert chakra_nodes[2].id == 2 def test_convert_ctrl_dep_to_data_dep(sample_pytorch_data: Dict) -> None: - converter = PyTorchConverter("input.json", "output.json") - ( - pytorch_schema, - pytorch_pid, - pytorch_time, - pytorch_start_ts, - pytorch_finish_ts, - pytorch_nodes, - ) = converter._parse_and_instantiate_nodes(sample_pytorch_data) - pytorch_nodes = converter._establish_parent_child_relationships(pytorch_nodes, []) + converter = PyTorchConverter() + json_metadata, json_node_map = converter.parse_json_trace(sample_pytorch_data) + json_node_map = converter.establish_parent_child_relationships(json_node_map, []) chakra_nodes = {} - converter.convert_nodes(pytorch_nodes, chakra_nodes) + converter.convert_json_to_protobuf_nodes(json_node_map, chakra_nodes) root_node = chakra_nodes[1] - converter.convert_ctrl_dep_to_data_dep(pytorch_nodes, chakra_nodes, root_node) + converter.convert_ctrl_dep_to_data_dep(json_node_map, chakra_nodes, root_node) assert root_node.data_deps == [] @patch("builtins.open", new_callable=mock_open) def test_write_chakra_et(mock_file: MagicMock, sample_pytorch_data: Dict) -> None: - converter = PyTorchConverter("input.json", "output.json") - converter.chakra_et = mock_file() - ( - pytorch_schema, - pytorch_pid, - pytorch_time, - pytorch_start_ts, - pytorch_finish_ts, - pytorch_nodes, - ) = converter._parse_and_instantiate_nodes(sample_pytorch_data) - pytorch_nodes = converter._establish_parent_child_relationships(pytorch_nodes, []) + converter = PyTorchConverter() + json_metadata, json_node_map = converter.parse_json_trace(sample_pytorch_data) + json_node_map = converter.establish_parent_child_relationships(json_node_map, []) chakra_nodes = {} - converter.convert_nodes(pytorch_nodes, chakra_nodes) - converter.write_chakra_et( - converter.chakra_et, - pytorch_schema, - pytorch_pid, - pytorch_time, - pytorch_start_ts, - pytorch_finish_ts, - chakra_nodes, - ) + converter.convert_json_to_protobuf_nodes(json_node_map, chakra_nodes) + converter.write_protobuf_execution_trace("output.et", json_metadata, chakra_nodes) assert mock_file().write.called -@patch("builtins.open", new_callable=mock_open) -def test_close_chakra_execution_trace(mock_file: MagicMock) -> None: - converter = PyTorchConverter("input.json", "output.json") - file_handle = mock_file() - file_handle.closed = False # Simulate an open file - converter.chakra_et = file_handle - converter.close_chakra_execution_trace(converter.chakra_et) - file_handle.close.assert_called_once() - - @pytest.mark.parametrize( "pytorch_node_data, expected_type", [ @@ -225,13 +173,13 @@ def test_close_chakra_execution_trace(mock_file: MagicMock) -> None: ({"name": "other_op", "is_gpu_op": False}, COMP_NODE), ], ) -def test_get_chakra_node_type_from_pytorch_node(pytorch_node_data: Dict, expected_type: int) -> None: +def test_get_protobuf_node_type_from_json_node(pytorch_node_data: Dict, expected_type: int) -> None: # Create a mock PyTorchNode with the required attributes pytorch_node = MagicMock(spec=PyTorchNode) pytorch_node.name = pytorch_node_data["name"] pytorch_node.is_gpu_op = MagicMock(return_value=pytorch_node_data["is_gpu_op"]) - # Create a mock pytorch_nodes dictionary with actual PyTorchNode instances + # Create a mock json_node_map dictionary with actual PyTorchNode instances mock_pytorch_node_data = { "id": 0, "name": "mock_node", @@ -242,10 +190,10 @@ def test_get_chakra_node_type_from_pytorch_node(pytorch_node_data: Dict, expecte "attrs": [], } mock_pytorch_node = PyTorchNode("1.0.2-chakra.0.0.4", mock_pytorch_node_data) - pytorch_nodes = {0: mock_pytorch_node, 1: pytorch_node} + json_node_map = {0: mock_pytorch_node, 1: pytorch_node} - converter = PyTorchConverter("input.json", "output.json") - node_type = converter.get_chakra_node_type_from_pytorch_node(pytorch_nodes, pytorch_node) + converter = PyTorchConverter() + node_type = converter.get_protobuf_node_type_from_json_node(json_node_map, pytorch_node) assert node_type == expected_type @@ -260,6 +208,6 @@ def test_get_chakra_node_type_from_pytorch_node(pytorch_node_data: Dict, expecte ], ) def test_get_collective_comm_type(name: str, expected_comm_type: int) -> None: - converter = PyTorchConverter("input.json", "output.json") + converter = PyTorchConverter() comm_type = converter.get_collective_comm_type(name) assert comm_type == expected_comm_type