diff --git a/coremltools/models/utils.py b/coremltools/models/utils.py index 97408611e..34e7d8b23 100644 --- a/coremltools/models/utils.py +++ b/coremltools/models/utils.py @@ -6,8 +6,9 @@ """ Utilities for the entire package. """ - +from collections import OrderedDict as _OrderedDict import copy as _copy +import gc as _gc import math as _math import os as _os import shutil as _shutil @@ -19,6 +20,7 @@ from functools import lru_cache as _lru_cache from typing import Callable as _Callable from typing import Dict as _Dict +from typing import List as _List from typing import Optional as _Optional from typing import Tuple as _Tuple from typing import Union as _Union @@ -26,18 +28,22 @@ import numpy as _np import coremltools as _ct +from coremltools import _logger from coremltools import _SPECIFICATION_VERSION_IOS_16, _SPECIFICATION_VERSION_IOS_18 from coremltools import ComputeUnit as _ComputeUnit from coremltools import proto as _proto from coremltools.converters.mil import mil as _mil from coremltools.converters.mil.frontend.milproto import load as _milproto_to_pymil +from coremltools.converters.mil.mil import Builder as _mb from coremltools.converters.mil.mil import Program as _Program from coremltools.converters.mil.mil.passes.defs.preprocess import NameSanitizer as _NameSanitizer from coremltools.converters.mil.mil.passes.defs.randomize import ( WeightRandomizer as _WeightRandomizer, ) from coremltools.converters.mil.mil.passes.graph_pass import AbstractGraphPass as _AbstractGraphPass +from coremltools.converters.mil.mil.passes.helper import block_context_manager as _block_context_manager from coremltools.converters.mil.mil.passes.pass_registry import PASS_REGISTRY as _PASS_REGISTRY +from coremltools.converters.mil.mil.program import Placeholder as _Placeholder from .._deps import _HAS_SCIPY @@ -1675,3 +1681,373 @@ def randomize_weights(mlmodel: "_ct.models.MLModel"): ) return randomized_mlmodel + + +def bisect_model( + model_path: str, + output_dir: str, + remove_original: _Optional[bool] = False, + merge_chunks_to_pipeline: _Optional[bool] = False, + check_output_correctness: _Optional[bool] = False, +): + """ + Utility function to split a mlpackage model into two mlpackages of approximately same file size. + + The implementation of this util is mainly migrated from the below source: + https://github.com/apple/ml-stable-diffusion/blob/main/python_coreml_stable_diffusion/chunk_mlprogram.py + + Parameters + ---------- + model_path: str + Path to the mlpackage file to be split into two mlpackages of approximately same file size. + + output_dir: str + Path to output directory where the two model chunks / pipeline model should be saved. + If the `model_path` is `{path}/{model_name}.mlpackage`, the models are going to be saved as: + + 1. first chunk model: `{output_dir}/{model_name}_chunk1.mlpackage` + 2. second chunk model: `{output_dir}/{model_name}_chunk2.mlpackage` + 3. chunked pipeline model: `{output_dir}/{model_name}_chunked_pipeline.mlpackage` + + remove_original: bool + If True, removes the original (non-chunked) model to avoid duplicating storage. + + merge_chunks_to_pipeline: bool + If True, model chunks are managed inside a single pipeline model for easier asset maintenance. + + check_output_correctness: bool + - If True, compares the outputs of original Core ML model with that of pipelined CoreML model chunks and reports PSNR in dB. + - Enabling this feature uses more memory. Disable it if your machine runs out of memory. + """ + # We do the lazy import to prevent circular import + from coremltools.converters.mil.converter import mil_convert as _mil_convert + + def get_pymil_prog_and_spec_from_model_path(model_path: str) -> _mil.Program: + spec = load_spec(model_path) + + # convert the model spec into pymil program, + # we also convert operations into type of List + prog = _milproto_to_pymil.load( + spec, + spec.specificationVersion, + _try_get_weights_dir_path(model_path), + ) + if len(prog.functions) > 1 or "main" not in prog.functions: + raise ValueError("'bisect_model' only support model with a single 'main' function.") + + func = prog.functions["main"] + func.operations = list(func.operations) + + return prog, spec + + # The below implementation assumes that the model is single function, with a "main" function. + prog, spec = get_pymil_prog_and_spec_from_model_path(model_path) + spec_version = spec.specificationVersion + + # Compute the incision point by bisecting the program based on weights size + op_idx, first_chunk_weights_size, total_weights_size = _get_op_idx_split_location(prog) + main_block = prog.functions["main"] + incision_op = main_block.operations[op_idx] + _logger.info(f"{model_path} will chunked into two pieces.") + _logger.info( + f"The incision op: name={incision_op.name}, type={incision_op.op_type}, index={op_idx}/{len(main_block.operations)}" + ) + _logger.info(f"First chunk size = {first_chunk_weights_size:.2f} MB") + _logger.info(f"Second chunk size = {total_weights_size - first_chunk_weights_size:.2f} MB") + + # Build first chunk (in-place modifies prog by declaring early exits and removing unused subgraph) + prog_chunk1 = _make_first_chunk_prog(prog, op_idx) + + # Build the second chunk + # when the first chunk is created, the prog is modified in-place, so we need to re-convert a new pymil + # program for the second chunk. + prog_chunk2 = _make_second_chunk_prog( + get_pymil_prog_and_spec_from_model_path(model_path)[0], + op_idx, + ) + + # Convert the MIL Program objects into MLModels + # We skip_model_load if check_output_correctness=False + _logger.info("Converting the two programs") + model_chunk1 = _mil_convert( + prog_chunk1, + convert_to="mlprogram", + convert_from="milinternal", + specification_version=spec_version, + compute_units=_ct.ComputeUnit.CPU_ONLY, + skip_model_load=(not check_output_correctness), + ) + del prog_chunk1 + _gc.collect() + _logger.info("Conversion of first chunk done.") + + model_chunk2 = _mil_convert( + prog_chunk2, + convert_to="mlprogram", + convert_from="milinternal", + specification_version=spec_version, + compute_units=_ct.ComputeUnit.CPU_ONLY, + skip_model_load=(not check_output_correctness), + ) + del prog_chunk2 + _gc.collect() + _logger.info("Conversion of second chunk done.") + + # Verify output correctness + if check_output_correctness: + _logger.info("Verifying output correctness of chunks") + model = _ct.models.MLModel(model_path, compute_units=_ct.ComputeUnit.CPU_ONLY) + _verify_output_correctness_of_chunks( + full_model=model, + first_chunk_model=model_chunk1, + second_chunk_model=model_chunk2, + ) + + # Remove original (non-chunked) model if requested + if remove_original: + _logger.info("Removing original (non-chunked) model at {args.mlpackage_path}") + _shutil.rmtree(model_path) + _logger.info("Done.") + + # save model chunks + _os.makedirs(output_dir, exist_ok=True) + mlpackage_name = _os.path.basename(model_path) + name, _ = _os.path.splitext(mlpackage_name) + + if merge_chunks_to_pipeline: + # Make a single pipeline model to manage the model chunks + pipeline_model = make_pipeline(model_chunk1, model_chunk2) + out_path_pipeline = _os.path.join(output_dir, name + "_chunked_pipeline.mlpackage") + pipeline_model.save(out_path_pipeline) + + # reload to ensure CPU placement + if check_output_correctness: + _logger.info("Verifying output correctness of pipeline model") + pipeline_model = _ct.models.MLModel( + out_path_pipeline, compute_units=_ct.ComputeUnit.CPU_ONLY + ) + _verify_output_correctness_of_chunks( + full_model=model, + pipeline_model=pipeline_model, + ) + else: + # Save the chunked models to disk + out_path_chunk1 = _os.path.join(output_dir, name + "_chunk1.mlpackage") + out_path_chunk2 = _os.path.join(output_dir, name + "_chunk2.mlpackage") + model_chunk1.save(out_path_chunk1) + model_chunk2.save(out_path_chunk2) + _logger.info( + f"Saved chunks in {output_dir} with the suffix _chunk1.mlpackage and _chunk2.mlpackage" + ) + +def _verify_output_correctness_of_chunks( + full_model: "_ct.models.MLModel", + first_chunk_model: _Optional["_ct.models.MLModel"] = None, + second_chunk_model: _Optional["_ct.models.MLModel"] = None, + pipeline_model: _Optional["_ct.models.MLModel"] = None, +) -> None: + """Verifies the end-to-end output correctness of full (original) model versus chunked models""" + + + # lazy import avoids circular error + from coremltools.converters.mil.testing_utils import random_gen_input_feature_type as random_gen_input_feature_type + from coremltools.converters.mil.testing_utils import compute_snr_and_psnr + + def report_correctness(original_outputs: _np.ndarray, final_outputs: _np.ndarray, log_prefix: str): + """ Report PSNR values across two compatible tensors. + This util is from https://github.com/apple/ml-stable-diffusion/blob/main/python_coreml_stable_diffusion/torch2coreml.py#L80, + with a splitly modification. + """ + ABSOLUTE_MIN_PSNR = 35 + + _, original_psnr = compute_snr_and_psnr(original_outputs, original_outputs) + _, final_psnr = compute_snr_and_psnr(original_outputs, final_outputs) + + dB_change = final_psnr - original_psnr + _logger.info( + f"{log_prefix}: PSNR changed by {dB_change:.1f} dB ({original_psnr:.1f} -> {final_psnr:.1f})" + ) + + if final_psnr < ABSOLUTE_MIN_PSNR: + raise ValueError(f"{final_psnr:.1f} dB is too low!") + else: + _logger.info( + f"{final_psnr:.1f} dB > {ABSOLUTE_MIN_PSNR} dB (minimum allowed) parity check passed" + ) + return final_psnr + + + # Generate inputs for first chunk and full model + input_dict = {} + for input_desc in full_model._spec.description.input: + input_dict[input_desc.name] = random_gen_input_feature_type(input_desc) + + # Generate outputs for full model + outputs_from_full_model = full_model.predict(input_dict) + + if pipeline_model is not None: + outputs_from_pipeline_model = pipeline_model.predict(input_dict) + final_outputs = outputs_from_pipeline_model + + elif first_chunk_model is not None and second_chunk_model is not None: + # Generate outputs for first chunk + outputs_from_first_chunk_model = first_chunk_model.predict(input_dict) + + # Prepare inputs for second chunk model from first chunk's outputs and regular inputs + second_chunk_input_dict = {} + for input_desc in second_chunk_model._spec.description.input: + if input_desc.name in outputs_from_first_chunk_model: + second_chunk_input_dict[input_desc.name] = outputs_from_first_chunk_model[ + input_desc.name + ] + else: + second_chunk_input_dict[input_desc.name] = input_dict[input_desc.name] + + # Generate output for second chunk model + outputs_from_second_chunk_model = second_chunk_model.predict(second_chunk_input_dict) + final_outputs = outputs_from_second_chunk_model + else: + raise ValueError("Either a single Pipeline model or two model chunkc should be provided.") + + # Verify correctness across all outputs from second chunk and full model + for out_name in outputs_from_full_model.keys(): + report_correctness( + original_outputs=outputs_from_full_model[out_name], + final_outputs=final_outputs[out_name], + log_prefix=f"{out_name}", + ) + + +def _get_op_idx_split_location(prog: _mil.Program) -> _Tuple[int, int, int]: + """Find the op that approximately bisects the graph as measure by weights size on each side""" + main_block = prog.functions["main"] + total_size_in_mb = 0 + + for op in main_block.operations: + if op.op_type == "const" and isinstance(op.val.val, _np.ndarray): + size_in_mb = op.val.val.size * op.val.val.itemsize / (1024 * 1024) + total_size_in_mb += size_in_mb + half_size = total_size_in_mb / 2 + + # Find the first non const op (single child), where the total cumulative size exceeds + # the half size for the first time + cumulative_size_in_mb = 0 + for op in main_block.operations: + if op.op_type == "const" and isinstance(op.val.val, _np.ndarray): + size_in_mb = op.val.val.size * op.val.val.itemsize / (1024 * 1024) + cumulative_size_in_mb += size_in_mb + + # Note: The condition "not op.op_type.startswith("const")" is to make sure that the + # incision op is neither of type "const" nor "constexpr_*" ops that + # are used to store compressed weights + if ( + cumulative_size_in_mb >= half_size + and not op.op_type.startswith("const") + and len(op.outputs) == 1 + and len(op.outputs[0].child_ops) == 1 + ): + op_idx = main_block.operations.index(op) + return op_idx, cumulative_size_in_mb, total_size_in_mb + + raise ValueError("Not able to find the bisect point in the model.") + + +def _get_first_chunk_outputs(block: _mil.Block, op_idx: int) -> _List[_mil.Var]: + # Get the list of all vars that go across from first program (all ops from 0 to op_idx (inclusive)) + # to the second program (all ops from op_idx+1 till the end). These all vars need to be made the output + # of the first program and the input of the second program + boundary_vars = set() + for i in range(op_idx + 1): + op = block.operations[i] + if not op.op_type.startswith("const"): + for var in op.outputs: + if var.val is None: # only consider non const vars + for child_op in var.child_ops: + child_op_idx = block.operations.index(child_op) + if child_op_idx > op_idx: + boundary_vars.add(var) + return list(boundary_vars) + + +@_block_context_manager +def _add_fp32_casts(block: _mil.Block, boundary_vars: _List[_mil.Var]) -> None: + new_boundary_vars = [] + for var in boundary_vars: + if var.dtype != _mil.types.fp16: + new_boundary_vars.append(var) + else: + fp32_var = _mb.cast(x=var, dtype="fp32", name=var.name) + new_boundary_vars.append(fp32_var) + return new_boundary_vars + + +def _make_first_chunk_prog( + prog: _mil.Program, + op_idx: int, +) -> _mil.Program: + """Build first chunk by declaring early outputs and removing unused subgraph""" + block = prog.functions["main"] + boundary_vars = _get_first_chunk_outputs(block, op_idx) + + # Due to possible numerical issues, cast any fp16 var to fp32 + new_boundary_vars = _add_fp32_casts(block, boundary_vars) + + block.outputs.clear() + block.set_outputs(new_boundary_vars) + _PASS_REGISTRY["common::dead_code_elimination"](prog) + return prog + + +def _make_second_chunk_prog(prog: _mil.Program, op_idx: int) -> _mil.Program: + """Build second chunk by rebuilding a pristine MIL Program from MLModel""" + block = prog.functions["main"] + block.opset_version = _ct.target.iOS16 + + # First chunk outputs are second chunk inputs (e.g. skip connections) + boundary_vars = _get_first_chunk_outputs(block, op_idx) + + # This op will not be included in this program. Its output var will be made into an input + boundary_op = block.operations[op_idx] + + # Add all boundary ops as inputs + with block: + for var in boundary_vars: + new_placeholder = _Placeholder( + sym_shape=var.shape, + dtype=var.dtype if var.dtype != _mil.types.fp16 else _mil.types.fp32, + name=var.name, + ) + + block._input_dict[new_placeholder.outputs[0].name] = new_placeholder.outputs[0] + + block.function_inputs = tuple(block._input_dict.values()) + new_var = None + if var.dtype == _mil.types.fp16: + new_var = _mb.cast(x=new_placeholder.outputs[0], dtype="fp16", before_op=var.op) + else: + new_var = new_placeholder.outputs[0] + + block.replace_uses_of_var_after_op( + anchor_op=boundary_op, + old_var=var, + new_var=new_var, + # This is needed if the program contains "constexpr_*" ops. In normal cases, there are stricter + # rules for removing them, and their presence may prevent replacing this var. + # However in this case, since we want to remove all the ops in chunk 1, we can safely + # set this to True. + force_replace=True, + ) + + _PASS_REGISTRY["common::dead_code_elimination"](prog) + + # Remove any unused inputs + new_input_dict = _OrderedDict() + for k, v in block._input_dict.items(): + if len(v.child_ops) > 0: + new_input_dict[k] = v + block._input_dict = new_input_dict + block.function_inputs = tuple(block._input_dict.values()) + + return prog + + diff --git a/coremltools/test/ml_program/test_utils.py b/coremltools/test/ml_program/test_utils.py index b982e70a1..2b61dad1d 100644 --- a/coremltools/test/ml_program/test_utils.py +++ b/coremltools/test/ml_program/test_utils.py @@ -17,7 +17,7 @@ from coremltools.converters.mil import mil from coremltools.converters.mil.converter import mil_convert as _mil_convert from coremltools.converters.mil.mil.builder import Builder as mb -from coremltools.models.utils import MultiFunctionDescriptor, load_spec, save_multifunction +from coremltools.models.utils import bisect_model, MultiFunctionDescriptor, load_spec, save_multifunction, load_spec @pytest.mark.skipif(ct.utils._macos_version() < (15, 0), @@ -892,3 +892,146 @@ def test_10_duplicated_model(self): == 10 ) shutil.rmtree(saved_package_path) + + +class TestBisectModel: + + @staticmethod + def get_test_model_path(minimum_deployment_target=ct.target.iOS16): + # pytorch model and tracing + class Model(torch.nn.Module): + def __init__(self): + super().__init__() + self.linear1 = torch.nn.Linear(6000, 6000) + self.relu = torch.nn.ReLU() + self.linear2 = torch.nn.Linear(6000, 6000) + + def forward(self, x): + x = self.linear1(x) + x = self.relu(x) + x = self.linear2(x) + x = torch.sin(x) + return x + + example_input = torch.rand(1, 6000) + model = Model().eval() + traced_model = torch.jit.trace(model, example_input) + + # convert to mlpackage and save it on the disk + mlmodel = ct.convert( + traced_model, + inputs=[ct.TensorType(shape=(1, 6000), name="input")], + minimum_deployment_target=minimum_deployment_target, + ) + package_path = tempfile.mkdtemp(suffix=".mlpackage") + mlmodel.save(package_path) + + return package_path + + def test_remove_original_model(self): + model_path = self.get_test_model_path() + output_dir = str(tempfile.TemporaryDirectory()) + + # By sepcifying remove_original = True, the API would delete the original model + bisect_model( + model_path, + output_dir, + remove_original=True, + ) + assert not os.path.isdir(model_path) + + # cleanup + shutil.rmtree(output_dir) + + def test_pipeline(self): + model_path = self.get_test_model_path() + output_dir = str(tempfile.TemporaryDirectory()) + + # The API will bisect the model into two chunks, and produces a pipeline model + bisect_model( + model_path, + output_dir, + merge_chunks_to_pipeline=True, + ) + + # check the file name is correct + mlpackage_name = os.path.basename(model_path) + name, _ = os.path.splitext(mlpackage_name) + pipeline_path = os.path.join(output_dir, f"{name}_chunked_pipeline.mlpackage") + assert os.path.isdir(pipeline_path) + + # check the Core ML model is a pipeline model + spec = load_spec(pipeline_path) + assert spec.WhichOneof("Type") == "pipeline" + + # cleanup + shutil.rmtree(model_path) + shutil.rmtree(output_dir) + + + def test_basic(self): + def check_spec_op_type(model_path, expected_ops): + spec = load_spec(model_path) + mil = spec.mlProgram + for function in mil.functions.values(): + for block in function.block_specializations.values(): + ops = list(block.operations) + for i, op_type in enumerate(expected_ops): + assert ops[i].type == op_type + + def check_spec_version(model_path, expected_spec_version): + spec = load_spec(model_path) + assert spec.specificationVersion == expected_spec_version + + model_path = self.get_test_model_path(ct.target.iOS17) + output_dir = str(tempfile.TemporaryDirectory()) + + # By bisecting the model into half, there will be two new mlpackages, with suffix `_chunk1.mlpackage` and `_chunk2.mlpackage` + # in the target `output_dir`. + bisect_model( + model_path, + output_dir, + ) + + # check the API doesn't delete the original mlpackage + assert os.path.isdir(model_path) + + # check the file names are correct + mlpackage_name = os.path.basename(model_path) + name, _ = os.path.splitext(mlpackage_name) + chunk1_path = os.path.join(output_dir, f"{name}_chunk1.mlpackage") + chunk2_path = os.path.join(output_dir, f"{name}_chunk2.mlpackage") + assert os.path.isdir(chunk1_path) + assert os.path.isdir(chunk2_path) + + # check the model op type + check_spec_op_type( + chunk1_path, + [ + "const", + "const", + "linear", + "const", + "cast", + ] + ) + check_spec_op_type( + chunk2_path, + [ + "const", + "cast", + "relu", + "const", + "const", + "linear", + "sin", + ] + ) + + # check the spec has the correct version + check_spec_version(chunk1_path, ct.target.iOS17) + check_spec_version(chunk2_path, ct.target.iOS17) + + # cleanup + shutil.rmtree(model_path) + shutil.rmtree(output_dir)