diff --git a/coremltools/models/utils.py b/coremltools/models/utils.py index 97408611e..7026c10bf 100644 --- a/coremltools/models/utils.py +++ b/coremltools/models/utils.py @@ -6,8 +6,9 @@ """ Utilities for the entire package. """ - +from collections import OrderedDict as _OrderedDict import copy as _copy +import gc as _gc import math as _math import os as _os import shutil as _shutil @@ -19,6 +20,7 @@ from functools import lru_cache as _lru_cache from typing import Callable as _Callable from typing import Dict as _Dict +from typing import List as _List from typing import Optional as _Optional from typing import Tuple as _Tuple from typing import Union as _Union @@ -26,18 +28,22 @@ import numpy as _np import coremltools as _ct +from coremltools import _logger from coremltools import _SPECIFICATION_VERSION_IOS_16, _SPECIFICATION_VERSION_IOS_18 from coremltools import ComputeUnit as _ComputeUnit from coremltools import proto as _proto from coremltools.converters.mil import mil as _mil from coremltools.converters.mil.frontend.milproto import load as _milproto_to_pymil +from coremltools.converters.mil.mil import Builder as _mb from coremltools.converters.mil.mil import Program as _Program from coremltools.converters.mil.mil.passes.defs.preprocess import NameSanitizer as _NameSanitizer from coremltools.converters.mil.mil.passes.defs.randomize import ( WeightRandomizer as _WeightRandomizer, ) from coremltools.converters.mil.mil.passes.graph_pass import AbstractGraphPass as _AbstractGraphPass +from coremltools.converters.mil.mil.passes.helper import block_context_manager as _block_context_manager from coremltools.converters.mil.mil.passes.pass_registry import PASS_REGISTRY as _PASS_REGISTRY +from coremltools.converters.mil.mil.program import Placeholder as _Placeholder from .._deps import _HAS_SCIPY @@ -772,7 +778,7 @@ def evaluate_transformer(model, input_data, reference_output, verbose=False): Parameters ---------- - spec: list of str or list of MLModel + model: list of str or list of MLModel File to load the Model from, or a loaded version of the MLModel. @@ -1675,3 +1681,416 @@ def randomize_weights(mlmodel: "_ct.models.MLModel"): ) return randomized_mlmodel + + +def bisect_model( + model: _Union[str, "_ct.models.MLModel"], + output_dir: str, + merge_chunks_to_pipeline: _Optional[bool] = False, + check_output_correctness: _Optional[bool] = True, +): + """ + Utility function to split a mlpackage model into two mlpackages of approximately same file size. + + Parameters + ---------- + model: str or MLModel + Path to the mlpackage file, or a Core ML model, to be split into two mlpackages of approximately same file size. + + output_dir: str + Path to output directory where the two model chunks / pipeline model would be saved. + + If the `model` is `{path}/{model_name}.mlpackage`, the chunk models are going to be saved as: + 1. first chunk model: `{output_dir}/{model_name}_chunk1.mlpackage` + 2. second chunk model: `{output_dir}/{model_name}_chunk2.mlpackage` + 3. chunked pipeline model: `{output_dir}/{model_name}_chunked_pipeline.mlpackage` + + If the `model` is type of `MLModel`, the chunk models are saved as: + 1. first chunk model: `{output_dir}/chunk1.mlpackage` + 2. second chunk model: `{output_dir}/chunk2.mlpackage` + 3. chunked pipeline model: `{output_dir}/chunked_pipeline.mlpackage` + + merge_chunks_to_pipeline: bool + If True, model chunks are managed inside a single pipeline model for easier asset maintenance. + + check_output_correctness: bool + - If True, compares the outputs of original Core ML model with that of pipelined CoreML model chunks and reports PSNR in dB. + - Enabling this feature uses more memory. Disable it if your machine runs out of memory. + + Examples + -------- + .. sourcecode:: python + + import coremltools as ct + + model_path = "my_model.mlpackage" + output_dir = "./output/" + + # The following code will produce two smaller models: + # `./output/my_model_chunk1.mlpackage` and `./output/my_model_chunk2.mlpackage` + # It also compares the output numerical of the original Core ML model with the chunked models. + ct.models.utils.bisect_model( + model_path, + output_dir, + ) + + # The following code will produce a single pipeline model `./output/my_model_chunked_pipeline.mlpackage` + ct.models.utils.bisect_model( + model_path, + output_dir, + merge_chunks_to_pipeline=True, + ) + + # You can also pass the MLModel object directly + mlmodel = ct.models.MLModel(model_path) + ct.models.utils.bisect_model( + mlmodel, + output_dir, + merge_chunks_to_pipeline=True, + ) + """ + # We do the lazy import to prevent circular import + from . import MLModel + from coremltools.converters.mil.converter import mil_convert as _mil_convert + + def get_pymil_prog_and_spec_from_model(model): + + # get the model spec and weight directory + if isinstance(model, str): + spec = load_spec(model) + weights_dir = _try_get_weights_dir_path(model) + else: + spec = model._spec + weights_dir = model.weights_dir + + # convert the model spec into pymil program, + # we also convert operations into type of List + prog = _milproto_to_pymil.load( + spec, + spec.specificationVersion, + weights_dir, + ) + if len(prog.functions) > 1 or "main" not in prog.functions: + raise ValueError("'bisect_model' only support model with a single 'main' function.") + + func = prog.functions["main"] + func.operations = list(func.operations) + + return prog, spec + + # check the input type of model + if not isinstance(model, (str, MLModel)): + raise ValueError(f"'model' must be type of [str, MLModel]. Got {type(model)}.") + + # The below implementation assumes that the model is single function, with a "main" function. + prog, spec = get_pymil_prog_and_spec_from_model(model) + spec_version = spec.specificationVersion + + # Compute the incision point by bisecting the program based on weights size + op_idx, first_chunk_weights_size, total_weights_size = _get_op_idx_split_location(prog) + main_block = prog.functions["main"] + incision_op = main_block.operations[op_idx] + _logger.info( + f"The incision op: name={incision_op.name}, type={incision_op.op_type}, index={op_idx}/{len(main_block.operations)}" + ) + _logger.info(f"First chunk size = {first_chunk_weights_size:.2f} MB") + _logger.info(f"Second chunk size = {total_weights_size - first_chunk_weights_size:.2f} MB") + + # Build first chunk (in-place modifies prog by declaring early exits and removing unused subgraph) + prog_chunk1 = _make_first_chunk_prog(prog, op_idx) + + # Build the second chunk + # when the first chunk is created, the prog is modified in-place, so we need to re-convert a new pymil + # program for the second chunk. + prog_chunk2 = _make_second_chunk_prog( + get_pymil_prog_and_spec_from_model(model)[0], + op_idx, + ) + + # Convert the MIL Program objects into MLModels + # We skip_model_load if check_output_correctness=False + _logger.info("Converting the two programs") + model_chunk1 = _mil_convert( + prog_chunk1, + convert_to="mlprogram", + convert_from="milinternal", + specification_version=spec_version, + compute_units=_ct.ComputeUnit.CPU_ONLY, + skip_model_load=(not check_output_correctness), + ) + del prog_chunk1 + _gc.collect() + _logger.info("Conversion of first chunk done.") + + model_chunk2 = _mil_convert( + prog_chunk2, + convert_to="mlprogram", + convert_from="milinternal", + specification_version=spec_version, + compute_units=_ct.ComputeUnit.CPU_ONLY, + skip_model_load=(not check_output_correctness), + ) + del prog_chunk2 + _gc.collect() + _logger.info("Conversion of second chunk done.") + + # Verify output correctness + if check_output_correctness: + _logger.info("Verifying output correctness of chunks") + + if isinstance(model, str): + mlmodel = _ct.models.MLModel(model, compute_units=_ct.ComputeUnit.CPU_ONLY) + else: + mlmodel = model + + _verify_output_correctness_of_chunks( + full_model=mlmodel, + first_chunk_model=model_chunk1, + second_chunk_model=model_chunk2, + ) + + # save model chunks + _os.makedirs(output_dir, exist_ok=True) + + if isinstance(model, str): + mlpackage_name = _os.path.basename(model) + name, _ = _os.path.splitext(mlpackage_name) + name += "_" + else: + name = "" + + if merge_chunks_to_pipeline: + # Make a single pipeline model to manage the model chunks + pipeline_model = make_pipeline(model_chunk1, model_chunk2) + out_path_pipeline = _os.path.join(output_dir, name + "chunked_pipeline.mlpackage") + pipeline_model.save(out_path_pipeline) + + # reload to ensure CPU placement + if check_output_correctness: + _logger.info("Verifying output correctness of pipeline model") + pipeline_model = _ct.models.MLModel( + out_path_pipeline, compute_units=_ct.ComputeUnit.CPU_ONLY + ) + _verify_output_correctness_of_chunks( + full_model=mlmodel, + pipeline_model=pipeline_model, + ) + else: + # Save the chunked models to disk + out_path_chunk1 = _os.path.join(output_dir, name + "chunk1.mlpackage") + out_path_chunk2 = _os.path.join(output_dir, name + "chunk2.mlpackage") + model_chunk1.save(out_path_chunk1) + model_chunk2.save(out_path_chunk2) + _logger.info( + f"Saved chunks in {output_dir} with the suffix _chunk1.mlpackage and _chunk2.mlpackage" + ) + +def _verify_output_correctness_of_chunks( + full_model: "_ct.models.MLModel", + first_chunk_model: _Optional["_ct.models.MLModel"] = None, + second_chunk_model: _Optional["_ct.models.MLModel"] = None, + pipeline_model: _Optional["_ct.models.MLModel"] = None, +) -> None: + """Verifies the end-to-end output correctness of full (original) model versus chunked models""" + # lazy import avoids circular error + from coremltools.converters.mil.testing_utils import random_gen_input_feature_type as random_gen_input_feature_type + from coremltools.converters.mil.testing_utils import compute_snr_and_psnr + + def report_correctness(original_outputs: _np.ndarray, final_outputs: _np.ndarray, log_prefix: str): + """ Report PSNR values across two compatible tensors. + This util is from https://github.com/apple/ml-stable-diffusion/blob/main/python_coreml_stable_diffusion/torch2coreml.py#L80, + with a slightly modification. + """ + ABSOLUTE_MIN_PSNR = 35 + + _, original_psnr = compute_snr_and_psnr(original_outputs, original_outputs) + _, final_psnr = compute_snr_and_psnr(original_outputs, final_outputs) + + dB_change = final_psnr - original_psnr + _logger.info( + f"{log_prefix}: PSNR changed by {dB_change:.1f} dB ({original_psnr:.1f} -> {final_psnr:.1f})" + ) + + if final_psnr < ABSOLUTE_MIN_PSNR: + _logger.warning(f"{final_psnr:.1f} dB is low!") + else: + _logger.info( + f"{final_psnr:.1f} dB > {ABSOLUTE_MIN_PSNR} dB (minimum allowed) parity check passed" + ) + return final_psnr + + + # Generate inputs for first chunk and full model + input_dict = {} + for input_desc in full_model._spec.description.input: + input_dict[input_desc.name] = random_gen_input_feature_type(input_desc) + + # Generate outputs for full model + outputs_from_full_model = full_model.predict(input_dict) + + if pipeline_model is not None: + outputs_from_pipeline_model = pipeline_model.predict(input_dict) + final_outputs = outputs_from_pipeline_model + + elif first_chunk_model is not None and second_chunk_model is not None: + # Generate outputs for first chunk + outputs_from_first_chunk_model = first_chunk_model.predict(input_dict) + + # Prepare inputs for second chunk model from first chunk's outputs and regular inputs + second_chunk_input_dict = {} + for input_desc in second_chunk_model._spec.description.input: + if input_desc.name in outputs_from_first_chunk_model: + second_chunk_input_dict[input_desc.name] = outputs_from_first_chunk_model[ + input_desc.name + ] + else: + second_chunk_input_dict[input_desc.name] = input_dict[input_desc.name] + + # Generate output for second chunk model + outputs_from_second_chunk_model = second_chunk_model.predict(second_chunk_input_dict) + final_outputs = outputs_from_second_chunk_model + else: + raise ValueError("Either a single Pipeline model or two model chunks should be provided.") + + # Verify correctness across all outputs from second chunk and full model + for out_name in outputs_from_full_model.keys(): + report_correctness( + original_outputs=outputs_from_full_model[out_name], + final_outputs=final_outputs[out_name], + log_prefix=f"{out_name}", + ) + + +def _get_op_idx_split_location(prog: _mil.Program) -> _Tuple[int, int, int]: + """Find the op that approximately bisects the graph as measure by weights size on each side""" + main_block = prog.functions["main"] + total_size_in_mb = 0 + + for op in main_block.operations: + if op.op_type == "const" and isinstance(op.val.val, _np.ndarray): + size_in_mb = op.val.val.size * op.val.val.itemsize / (1024 * 1024) + total_size_in_mb += size_in_mb + half_size = total_size_in_mb / 2 + + # Find the first non const op (single child), where the total cumulative size exceeds + # the half size for the first time + cumulative_size_in_mb = 0 + for op in main_block.operations: + if op.op_type == "const" and isinstance(op.val.val, _np.ndarray): + size_in_mb = op.val.val.size * op.val.val.itemsize / (1024 * 1024) + cumulative_size_in_mb += size_in_mb + + # Note: The condition "not op.op_type.startswith("const")" is to make sure that the + # incision op is neither of type "const" nor "constexpr_*" ops that + # are used to store compressed weights + if ( + cumulative_size_in_mb >= half_size + and not op.op_type.startswith("const") + and len(op.outputs) == 1 + and len(op.outputs[0].child_ops) == 1 + ): + op_idx = main_block.operations.index(op) + return op_idx, cumulative_size_in_mb, total_size_in_mb + + raise ValueError("Not able to find the bisect point in the model.") + + +def _get_first_chunk_outputs(block: _mil.Block, op_idx: int) -> _List[_mil.Var]: + # Get the list of all vars that go across from first program (all ops from 0 to op_idx (inclusive)) + # to the second program (all ops from op_idx+1 till the end). These all vars need to be made the output + # of the first program and the input of the second program + boundary_vars = set() + for i in range(op_idx + 1): + op = block.operations[i] + if not op.op_type.startswith("const"): + for var in op.outputs: + if var.val is None: # only consider non const vars + for child_op in var.child_ops: + child_op_idx = block.operations.index(child_op) + if child_op_idx > op_idx: + boundary_vars.add(var) + return list(boundary_vars) + + +@_block_context_manager +def _add_fp32_casts(block: _mil.Block, boundary_vars: _List[_mil.Var]) -> None: + new_boundary_vars = [] + for var in boundary_vars: + if var.dtype != _mil.types.fp16: + new_boundary_vars.append(var) + else: + fp32_var = _mb.cast(x=var, dtype="fp32", name=var.name) + new_boundary_vars.append(fp32_var) + return new_boundary_vars + + +def _make_first_chunk_prog( + prog: _mil.Program, + op_idx: int, +) -> _mil.Program: + """Build first chunk by declaring early outputs and removing unused subgraph""" + block = prog.functions["main"] + boundary_vars = _get_first_chunk_outputs(block, op_idx) + + # Due to possible numerical issues, cast any fp16 var to fp32 + new_boundary_vars = _add_fp32_casts(block, boundary_vars) + + block.outputs.clear() + block.set_outputs(new_boundary_vars) + _PASS_REGISTRY["common::dead_code_elimination"](prog) + return prog + + +def _make_second_chunk_prog(prog: _mil.Program, op_idx: int) -> _mil.Program: + """Build second chunk by rebuilding a pristine MIL Program from MLModel""" + block = prog.functions["main"] + block.opset_version = _ct.target.iOS16 + + # First chunk outputs are second chunk inputs (e.g. skip connections) + boundary_vars = _get_first_chunk_outputs(block, op_idx) + + # This op will not be included in this program. Its output var will be made into an input + boundary_op = block.operations[op_idx] + + # Add all boundary ops as inputs + with block: + for var in boundary_vars: + new_placeholder = _Placeholder( + sym_shape=var.shape, + dtype=var.dtype if var.dtype != _mil.types.fp16 else _mil.types.fp32, + name=var.name, + ) + + block._input_dict[new_placeholder.outputs[0].name] = new_placeholder.outputs[0] + + block.function_inputs = tuple(block._input_dict.values()) + new_var = None + if var.dtype == _mil.types.fp16: + new_var = _mb.cast(x=new_placeholder.outputs[0], dtype="fp16", before_op=var.op) + else: + new_var = new_placeholder.outputs[0] + + block.replace_uses_of_var_after_op( + anchor_op=boundary_op, + old_var=var, + new_var=new_var, + # This is needed if the program contains "constexpr_*" ops. In normal cases, there are stricter + # rules for removing them, and their presence may prevent replacing this var. + # However in this case, since we want to remove all the ops in chunk 1, we can safely + # set this to True. + force_replace=True, + ) + + _PASS_REGISTRY["common::dead_code_elimination"](prog) + + # Remove any unused inputs + new_input_dict = _OrderedDict() + for k, v in block._input_dict.items(): + if len(v.child_ops) > 0: + new_input_dict[k] = v + block._input_dict = new_input_dict + block.function_inputs = tuple(block._input_dict.values()) + + return prog + + diff --git a/coremltools/test/api/test_api_visibilities.py b/coremltools/test/api/test_api_visibilities.py index 49fc0e401..3d22f979e 100644 --- a/coremltools/test/api/test_api_visibilities.py +++ b/coremltools/test/api/test_api_visibilities.py @@ -71,6 +71,7 @@ def test_utils(self): "save_multifunction", "MultiFunctionDescriptor", "randomize_weights", + "bisect_model", ] _check_visible_modules(_get_visible_items(ct.utils), expected) diff --git a/coremltools/test/ml_program/test_utils.py b/coremltools/test/ml_program/test_utils.py index b982e70a1..77ad29cc5 100644 --- a/coremltools/test/ml_program/test_utils.py +++ b/coremltools/test/ml_program/test_utils.py @@ -17,8 +17,9 @@ from coremltools.converters.mil import mil from coremltools.converters.mil.converter import mil_convert as _mil_convert from coremltools.converters.mil.mil.builder import Builder as mb -from coremltools.models.utils import MultiFunctionDescriptor, load_spec, save_multifunction - +from coremltools.converters.mil.testing_utils import assert_spec_input_type, assert_spec_output_type, DTYPE_TO_FEATURE_TYPE_MAP +from coremltools.models.utils import bisect_model, MultiFunctionDescriptor, load_spec, save_multifunction, load_spec +import coremltools.optimize as cto @pytest.mark.skipif(ct.utils._macos_version() < (15, 0), reason="Multi-function only supported on macOS 15+") @@ -892,3 +893,299 @@ def test_10_duplicated_model(self): == 10 ) shutil.rmtree(saved_package_path) + + +class TestBisectModel: + + @staticmethod + def check_spec_op_type(model_path, expected_ops): + spec = load_spec(model_path) + mil = spec.mlProgram + for function in mil.functions.values(): + for block in function.block_specializations.values(): + ops = list(block.operations) + for i, op_type in enumerate(expected_ops): + assert ops[i].type == op_type + + @staticmethod + def get_test_model_path(minimum_deployment_target=ct.target.iOS16, return_as_mlmodel=False): + # pytorch model and tracing + class Model(torch.nn.Module): + def __init__(self): + super().__init__() + self.linear1 = torch.nn.Linear(6000, 6000) + self.relu = torch.nn.ReLU() + self.linear2 = torch.nn.Linear(6000, 6000) + + def forward(self, x): + x = self.linear1(x) + x = self.relu(x) + x = self.linear2(x) + x = torch.sin(x) + return x + + example_input = torch.rand(1, 6000) + model = Model().eval() + traced_model = torch.jit.trace(model, example_input) + + # convert to mlpackage + mlmodel = ct.convert( + traced_model, + inputs=[ct.TensorType(shape=(1, 6000), name="input")], + minimum_deployment_target=minimum_deployment_target, + ) + + # return as mlmodel + if return_as_mlmodel: + return mlmodel + + # save on disk and return the model path + package_path = tempfile.mkdtemp(suffix=".mlpackage") + mlmodel.save(package_path) + + return package_path + + def test_invalid_mlpackage(self): + traced_model = TestMultiFunctionModelEnd2End._get_test_model() + input = np.random.rand(1, 1, 28, 28) + mlmodel = ct.convert( + traced_model, + inputs=[ct.TensorType(name="x", shape=(1, 1, 28, 28))], + outputs=[ct.TensorType(name="out")], + convert_to="mlprogram", + minimum_deployment_target=ct.target.iOS16, + ) + package_path = tempfile.mkdtemp(suffix=".mlpackage") + mlmodel.save(package_path) + + # function name other than "main" will error out + desc = MultiFunctionDescriptor() + desc.add_function(package_path, "main", "main_1") + desc.default_function_name = "main_1" + saved_package_path = tempfile.mkdtemp(suffix=".mlpackage") + save_multifunction(desc, saved_package_path) + + with tempfile.TemporaryDirectory() as output_dir: + with pytest.raises(ValueError, match="only support model with a single"): + bisect_model( + saved_package_path, + output_dir=output_dir, + ) + shutil.rmtree(saved_package_path) + + # multi-function model is not supported + desc = MultiFunctionDescriptor() + desc.add_function(package_path, "main", "main") + desc.add_function(package_path, "main", "main_1") + desc.default_function_name = "main" + saved_package_path = tempfile.mkdtemp(suffix=".mlpackage") + save_multifunction(desc, saved_package_path) + with pytest.raises(ValueError, match="only support model with a single"): + bisect_model( + saved_package_path, + output_dir=output_dir, + ) + shutil.rmtree(saved_package_path) + shutil.rmtree(package_path) + + @pytest.mark.parametrize( + "mlmodel_as_input", + [True, False], + ) + def test_pipeline(self, mlmodel_as_input): + model = self.get_test_model_path(return_as_mlmodel=mlmodel_as_input) + output_dir = str(tempfile.TemporaryDirectory()) + + # The API will bisect the model into two chunks, and produces a pipeline model + bisect_model( + model, + output_dir, + merge_chunks_to_pipeline=True, + ) + + # check the file name is correct + if mlmodel_as_input: + name = "" + else: + mlpackage_name = os.path.basename(model) + name, _ = os.path.splitext(mlpackage_name) + name += "_" + + pipeline_path = os.path.join(output_dir, f"{name}chunked_pipeline.mlpackage") + assert os.path.isdir(pipeline_path) + + # check the Core ML model is a pipeline model + spec = load_spec(pipeline_path) + assert spec.WhichOneof("Type") == "pipeline" + + # cleanup + if not mlmodel_as_input: + shutil.rmtree(model) + shutil.rmtree(output_dir) + + def test_compressed_model(self): + # use coremltools.optimizee to palettize a Core ML model + model = self.get_test_model_path(return_as_mlmodel=True) + op_config = cto.coreml.OpPalettizerConfig(mode="kmeans", nbits=8) + config = cto.coreml.OptimizationConfig(global_config=op_config) + model = cto.coreml.palettize_weights(model, config) + + # test that the bisect API works + output_dir = str(tempfile.TemporaryDirectory()) + bisect_model( + model, + output_dir, + ) + + # test the models contain correct ops + name = "" + chunk1_path = os.path.join(output_dir, f"{name}chunk1.mlpackage") + chunk2_path = os.path.join(output_dir, f"{name}chunk2.mlpackage") + assert os.path.isdir(chunk1_path) + assert os.path.isdir(chunk2_path) + + self.check_spec_op_type( + chunk1_path, + [ + "constexpr_lut_to_dense", + "const", + "linear", + "const", + "cast", + ] + ) + self.check_spec_op_type( + chunk2_path, + [ + "const", + "cast", + "relu", + "constexpr_lut_to_dense", + "const", + "linear", + "sin", + ] + ) + + # cleanup + shutil.rmtree(output_dir) + + + @pytest.mark.parametrize( + "mlmodel_as_input", + [True, False], + ) + def test_basic(self, mlmodel_as_input): + def check_spec_version(model_path, expected_spec_version): + spec = load_spec(model_path) + assert spec.specificationVersion == expected_spec_version + + def check_output_dtype(model_path, expected_output_dtype): + spec = load_spec(model_path) + assert_spec_output_type(spec, DTYPE_TO_FEATURE_TYPE_MAP[expected_output_dtype]) + + def check_input_dtype(model_path, expected_input_dtype): + spec = load_spec(model_path) + assert_spec_input_type(spec, DTYPE_TO_FEATURE_TYPE_MAP[expected_input_dtype]) + + + model = self.get_test_model_path(ct.target.iOS17, return_as_mlmodel=mlmodel_as_input) + output_dir = str(tempfile.TemporaryDirectory()) + + # By bisecting the model into half, there will be two new mlpackages, with suffix `_chunk1.mlpackage` and `_chunk2.mlpackage` + # in the target `output_dir`. + bisect_model( + model, + output_dir, + ) + + # check the API doesn't delete the original mlpackage + if not mlmodel_as_input: + assert os.path.isdir(model) + + # check the file names are correct + if mlmodel_as_input: + name = "" + else: + mlpackage_name = os.path.basename(model) + name, _ = os.path.splitext(mlpackage_name) + name += "_" + + chunk1_path = os.path.join(output_dir, f"{name}chunk1.mlpackage") + chunk2_path = os.path.join(output_dir, f"{name}chunk2.mlpackage") + assert os.path.isdir(chunk1_path) + assert os.path.isdir(chunk2_path) + + # check the model op type + self.check_spec_op_type( + chunk1_path, + [ + "const", + "const", + "linear", + "const", + "cast", + ] + ) + self.check_spec_op_type( + chunk2_path, + [ + "const", + "cast", + "relu", + "const", + "const", + "linear", + "sin", + ] + ) + + # check the spec has the correct version + check_spec_version(chunk1_path, ct.target.iOS17) + check_spec_version(chunk2_path, ct.target.iOS17) + + # the i/o dtype of the two chunk models should be: + # 1. fp16 -> fp32 + # 2. fp32 -> fp16 + check_input_dtype(chunk1_path, "fp16") + check_output_dtype(chunk1_path, "fp32") + + check_input_dtype(chunk2_path, "fp32") + check_output_dtype(chunk2_path, "fp16") + + # cleanup + if not mlmodel_as_input: + shutil.rmtree(model) + shutil.rmtree(output_dir) + + def test_api_example(self): + """ + Test the API example in https://apple.github.io/coremltools/docs-guides/source/mlmodel-utilities.html + """ + model_path = self.get_test_model_path() + output_dir = str(tempfile.TemporaryDirectory()) + + # The following code will produce two chunks models: + # `./output/my_model_chunk1.mlpackage` and `./output/my_model_chunk2.mlpackage` + ct.models.utils.bisect_model( + model_path, + output_dir, + ) + + # The following code will produce a single pipeline model `./output/my_model_chunked_pipeline.mlpackage` + ct.models.utils.bisect_model( + model_path, + output_dir, + merge_chunks_to_pipeline=True, + ) + + # You can also pass the MLModel object directly + mlmodel = ct.models.MLModel(model_path) + ct.models.utils.bisect_model( + mlmodel, + output_dir, + ) + + # clean up + shutil.rmtree(output_dir) + shutil.rmtree(model_path) diff --git a/docs-guides/source/mlmodel-utilities.md b/docs-guides/source/mlmodel-utilities.md index 0b0f54ad3..76651ba41 100644 --- a/docs-guides/source/mlmodel-utilities.md +++ b/docs-guides/source/mlmodel-utilities.md @@ -172,3 +172,40 @@ config = cto.coreml.OptimizationConfig( compressed_mlmodel = cto.coreml.palettize_weights(mlmodel, config) ``` + +## Bisect Model + +In certain scenarios, you may want to break a large Core ML model into two smaller models. For instance, if you are deploying a model to run on neural engine on an iPhone, it cannot be larger than 1 GB. If you are working with, say, [Stable Diffusion](https://github.com/apple/ml-stable-diffusion) 1.5 model which is 1.72 GB large (Float 16 precision), then it needs to be broken up into two chunks, each less than 1 GB. The utility `ct.models.utils.bisect_model` will allow you to do exactly that. When using this API, you can also opt-in to package the two chunks of the model into a pipeline model, so that its still a single mlpackage file, with the two models arranged in a sequential manner. + +The example below shows how to bisect a model, test the accuracy, and save them on disk. + +```python + +import coremltools as ct + +model_path = "my_model.mlpackage" +output_dir = "./output/" + +# The following code will produce two smaller models: +# `./output/my_model_chunk1.mlpackage` and `./output/my_model_chunk2.mlpackage` +# It also compares the output numerical of the original Core ML model with the chunked models. +ct.models.utils.bisect_model( + model_path, + output_dir, +) + +# The following code will produce a single pipeline model `./output/my_model_chunked_pipeline.mlpackage` +ct.models.utils.bisect_model( + model_path, + output_dir, + merge_chunks_to_pipeline=True, +) + +# You can also pass the MLModel object directly +mlmodel = ct.models.MLModel(model_path) +ct.models.utils.bisect_model( + mlmodel, + output_dir, + merge_chunks_to_pipeline=True, +) +```