diff --git a/dali/python/nvidia/dali/external_source.py b/dali/python/nvidia/dali/external_source.py index 8f27fa196f3..4563784e2ca 100644 --- a/dali/python/nvidia/dali/external_source.py +++ b/dali/python/nvidia/dali/external_source.py @@ -27,18 +27,27 @@ ) +def _get_shape(data): + if hasattr(data, "shape"): + return data.shape() if callable(data.shape) else data.shape + elif hasattr(data, "__array_interface__"): + return data.__array_interface__["shape"] + elif hasattr(data, "__cuda_array_interface__"): + return data.__cuda_array_interface__["shape"] + elif hasattr(data, "__array__"): + return data.__array__().shape + else: + raise RuntimeError(f"Don't know how to extract the shape out of {type(data)}") + + def _get_batch_shape(data): if isinstance(data, (list, tuple, _tensors.TensorListCPU, _tensors.TensorListGPU)): if len(data) == 0: return [], True - if callable(data[0].shape): - return [x.shape() for x in data], False else: - return [x.shape for x in data], False + return [_get_shape(x) for x in data], False else: - shape = data.shape - if callable(shape): - shape = data.shape() + shape = _get_shape(data) return [shape[1:]] * shape[0], True @@ -68,6 +77,8 @@ def to_numpy(x): return x.asnumpy() elif _types._is_torch_tensor(x): return x.numpy() + elif hasattr(x, "__array__"): + return x.__array__() else: return x @@ -79,7 +90,7 @@ def to_numpy(x): if layout is not None: _check_data_batch(data, batch_size, layout) data = type(data)(data, layout) - elif isinstance(data, list): + elif isinstance(data, (list, tuple)): inputs = [] checked = False for datum in data: diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index 0b909fc4fc9..c07a59e908d 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -478,6 +478,15 @@ def is_restored_from_checkpoint(self): """If True, this pipeline was restored from checkpoint.""" return self._is_restored_from_checkpoint + @property + def num_outputs(self) -> int: + """ + Number of pipeline outputs. + """ + self.build() + # output_dtype is a list with the dtype for each output, so we can simply take the length + return len(self._pipe.output_dtype()) + def output_dtype(self) -> list: """Data types expected at the outputs.""" self.build() @@ -854,6 +863,7 @@ def contains_nested_datanode(nested): self._require_no_foreign_ops("The pipeline does not support checkpointing") self._graph_outputs = outputs + self._num_outputs = len(self._graph_outputs) self._setup_input_callbacks() self._disable_pruned_external_source_instances() self._py_graph_built = True diff --git a/dali/python/nvidia/dali/plugin/pytorch/__init__.py b/dali/python/nvidia/dali/plugin/pytorch/__init__.py index 50d5827b484..786035cafac 100644 --- a/dali/python/nvidia/dali/plugin/pytorch/__init__.py +++ b/dali/python/nvidia/dali/plugin/pytorch/__init__.py @@ -15,12 +15,11 @@ import sys from typing import Union, Optional -from typing import Any, Dict, List +from typing import Dict, List from nvidia.dali import internal as _internal from nvidia.dali import ops -from nvidia.dali import types -from nvidia.dali.backend import TensorCPU, TensorGPU, TensorListCPU, TensorListGPU +from nvidia.dali.backend import TensorCPU, TensorGPU, TensorListGPU from nvidia.dali.pipeline import Pipeline from nvidia.dali.plugin.base_iterator import _DaliBaseIterator @@ -28,11 +27,12 @@ import torch import torch.utils.dlpack as torch_dlpack # noqa: F401 -import ctypes import numpy as np from . import fn # noqa: F401 +from . import experimental # noqa: F401 +from nvidia.dali.plugin.pytorch.torch_utils import to_torch_type, feed_ndarray from nvidia.dali.plugin.pytorch._torch_function import TorchPythonFunction as TorchPythonFunction _internal._adjust_operator_module(TorchPythonFunction, sys.modules[__name__], []) @@ -40,64 +40,6 @@ ops._wrap_op(TorchPythonFunction, "fn", __name__) -to_torch_type = { - types.DALIDataType.FLOAT: torch.float32, - types.DALIDataType.FLOAT64: torch.float64, - types.DALIDataType.FLOAT16: torch.float16, - types.DALIDataType.UINT8: torch.uint8, - types.DALIDataType.INT8: torch.int8, - types.DALIDataType.BOOL: torch.bool, - types.DALIDataType.INT16: torch.int16, - types.DALIDataType.INT32: torch.int32, - types.DALIDataType.INT64: torch.int64, -} - - -def feed_ndarray( - dali_tensor: Union[TensorCPU, TensorGPU, TensorListCPU, TensorListGPU], - arr: torch.Tensor, - cuda_stream: Union[torch.cuda.Stream, Any, None] = None, -) -> torch.Tensor: - """ - Copy contents of DALI tensor to PyTorch's Tensor. - - Parameters - ---------- - dali_tensor : nvidia.dali.backend.TensorCPU or nvidia.dali.backend.TensorGPU - Tensor from which to copy - arr : torch.Tensor - Destination of the copy - cuda_stream : torch.cuda.Stream, cudaStream_t or any value that can be cast to cudaStream_t. - CUDA stream to be used for the copy - (if not provided, an internal user stream will be selected) - In most cases, using pytorch's current stream is expected (for example, - if we are copying to a tensor allocated with torch.zeros(...)) - """ - dali_type = to_torch_type[dali_tensor.dtype] - - assert dali_type == arr.dtype, ( - "The element type of DALI Tensor/TensorList" - " doesn't match the element type of the target PyTorch Tensor: " - "{} vs {}".format(dali_type, arr.dtype) - ) - assert dali_tensor.shape() == list( - arr.size() - ), "Shapes do not match: DALI tensor has size {0}, but PyTorch Tensor has size {1}".format( - dali_tensor.shape(), list(arr.size()) - ) - - non_blocking = cuda_stream is not None - cuda_stream = types._raw_cuda_stream_ptr(cuda_stream) - - # turn raw int to a c void pointer - c_type_pointer = ctypes.c_void_p(arr.data_ptr()) - if isinstance(dali_tensor, (TensorGPU, TensorListGPU)): - dali_tensor.copy_to_external(c_type_pointer, cuda_stream, non_blocking=non_blocking) - else: - dali_tensor.copy_to_external(c_type_pointer) - return arr - - class DALIGenericIterator(_DaliBaseIterator): """ General DALI iterator for PyTorch. It can return any number of diff --git a/dali/python/nvidia/dali/plugin/pytorch/experimental/__init__.py b/dali/python/nvidia/dali/plugin/pytorch/experimental/__init__.py new file mode 100644 index 00000000000..645f7e9f1c2 --- /dev/null +++ b/dali/python/nvidia/dali/plugin/pytorch/experimental/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import proxy # noqa: F401 diff --git a/dali/python/nvidia/dali/plugin/pytorch/experimental/proxy/__init__.py b/dali/python/nvidia/dali/plugin/pytorch/experimental/proxy/__init__.py new file mode 100644 index 00000000000..63e1f488886 --- /dev/null +++ b/dali/python/nvidia/dali/plugin/pytorch/experimental/proxy/__init__.py @@ -0,0 +1,821 @@ +# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = ["DALIServer", "DataLoader"] + +from torch.cuda import nvtx as _nvtx +import torch.multiprocessing as _mp +from torch.utils import data as _torchdata +from torch.utils.data._utils.collate import default_collate_fn_map as _default_collate_fn_map +from nvidia.dali import Pipeline as _Pipeline +from nvidia.dali.external_source import ExternalSource as _ExternalSource +import threading +import queue +from queue import Empty +from nvidia.dali.plugin.pytorch.torch_utils import to_torch_tensor +from inspect import Parameter, Signature + +# DALI proxy is a PyTorch specific layer that connects a multi-process PyTorch DataLoader with +# a single DALI pipeline. This allows to run CUDA processing in a single process, avoiding the +# problem of having multiple CUDA contexts which hurts the performance. +# +# The diagram below shows how the different processes and thread interact with each other +# via shared queues. req_n_k represents the k-th processing request from data worker n, +# consisting of a batch identifier (n, k) and a set of inputs. data_n_k represents the +# outputs of a DALI pipeline corresponding to the same batch identifier, consisting of the +# batch identifier and a set of outputs. +# +# +-------+ +---------------+ +-------------+ +---------------+ +-----------+ +-----------+ +# | main | | dali_output_q | | data_thread | | dali_input_q | | worker_0 | | worker_1 | +# +-------+ +---------------+ +-------------+ +---------------+ +-----------+ +-----------+ +# |~~~get()~~~~~~>| | | | | +# | | |~~~get()~~~~~~~>| | | +# | | | | | | +# | | | | ------------- | +# | | | | | collate | | +# | | | | ------------- | +# | | | | | | +# | | | |<~~put(req_0_0)~~| | +# | | | | | | +# | | | |---------------->| | +# | | | | | | +# | | |<--req_0_0------| | | +# | | | | | | +# | | --------------- | | | +# | | | run | | | | +# | | --------------- | | | +# | | | | | ------------- +# | | | | | | collate | +# | | | | | ------------- +# | | | | | | +# | | | |<~~put(req_1_0)~~~~~~~~~~~~~~~~| +# | | | | | | +# | | | |------------------------------>| +# | | | | | | +# | |<~~put(data_0_0)~~| | | | +# | | | | | | +# | |----------------->| | | | +# | | | | | | +# | | | | | ------------- +# | | | | | | collate | +# | | | | | ------------- +# | | | | | | +# | | | |<~~put(req_1_1)~~~~~~~~~~~~~~~~| +# | | | | | | +# | | | |------------------------------>| +# | | | | | | +# |<--data_0_0----| | | | | +# | | | | | | +# |~~~get()~~~~~~>| | | | | +# | | |~~~get()~~~~~~~>| | | +# | | | | | | +# | | |<--req_1_0------| | | +# | | | | | | +# | | --------------- | ------------- | +# | | | run | | | collate | | +# | | --------------- | ------------- | +# | | | | | | +# | | | |<~~put(req_0_1)~~| | +# | | | | | | +# | | | |-----------------| | +# | | | | | | +# | |<~~put(data_1_0)~~| | | | +# | | | | | | +# | |----------------->| | | | +# | | | | | | +# |<--data_1_0----| | | | | +# | | | | | | +# +-------+ +---------------+ +-------------+ +---------------+ +-----------+ +-----------+ +# | main | | dali_output_q | | data_thread | | dali_input_q | | worker_0 | | worker_1 | +# +-------+ +---------------+ +-------------+ +---------------+ +-----------+ +-----------+ + + +def _modify_signature(new_sig): + from functools import wraps + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + bound_args = new_sig.bind(*args, **kwargs) + bound_args.apply_defaults() + return func(*bound_args.args, **bound_args.kwargs) + except Exception as err: + args_str = ", ".join([f"{type(arg)}" for arg in args]) + kwargs_str = ", ".join([f"{key}={type(arg)}" for key, arg in kwargs.items()]) + raise ValueError( + f"Expected signature is: {new_sig}. " + f"Got: args=({args_str}), kwargs={kwargs_str}, error: {err}" + ) + + wrapper.__signature__ = new_sig + return wrapper + + return decorator + + +def _external_source_node_names(pipeline): + """ + extract the names of all the ExternalSource nodes in the pipeline + """ + # TODO(janton): Add a native function to query those names, so that we can do it + # also on deserialized pipelines + if pipeline._deserialized: + raise RuntimeError( + "Not able to find the external source " + "operator names, since the pipeline was deserialized" + ) + if not pipeline._py_graph_built: + pipeline._build_graph() + input_node_names = [] + for op in pipeline._ops: + if isinstance(op._op, _ExternalSource): + input_node_names.append(op.name) + return input_node_names + + +class DALIOutputSampleRef: + """ + Reference for a single sample output bound to a pipeline run. + """ + + def __init__(self, proxy, pipe_run_ref, output_idx, sample_idx): + """ + Args: + proxy (_DALIProxy): The proxy object used for communication or data handling. + pipe_run_ref (DALIPipelineRunRef): A reference to the pipeline run. + output_idx (int): The index of the output in the pipeline. + sample_idx (int): The index of the sample within the batch. + """ + self.proxy = proxy + self.pipe_run_ref = pipe_run_ref + self.output_idx = output_idx + self.sample_idx = sample_idx + + def __repr__(self): + return ( + f"DALIOutputSampleRef({self.pipe_run_ref}, " + + f"output_idx={self.output_idx}, sample_idx={self.sample_idx})" + ) + + +class DALIPipelineRunRef: + """ + Reference for a DALI pipeline run iteration. + """ + + def __init__(self, batch_id): + """ + Args: + batch_id (tuple(int, int)): A tuple that uniquely identifies the batch. The first + element represent the worker, and the second the batch index for that worker + """ + self.batch_id = batch_id + self.call_args = {} + self.is_scheduled = False + self.is_complete = False + + def __repr__(self): + return ( + f"DALIPipelineRunRef(batch_id={self.batch_id}, call_args={self.call_args} " + f"is_scheduled={self.is_scheduled}, is_complete={self.is_complete})" + ) + + +class DALIOutputBatchRef: + """ + Reference for a batched output bound to a pipeline run. + """ + + def __init__(self, pipe_run_ref, output_idx): + """ + Args: + pipe_run_ref (DALIPipelineRunRef): A reference to the pipeline run. + output_idx (int): The index of the output in the pipeline. + """ + self.pipe_run_ref = pipe_run_ref + self.output_idx = output_idx + + def __repr__(self): + return f"DALIOutputBatchRef(pipe_run_ref={self.pipe_run_ref}, output_idx={self.output_idx})" + + +def _collate_dali_output_sample_ref_fn(samples, *, collate_fn_map=None): + """ + Special collate function that schedules a DALI iteration for execution + """ + assert len(samples) > 0 + pipe_run_ref = samples[0].pipe_run_ref + output_idx = samples[0].output_idx + proxy = samples[0].proxy + for i, sample in enumerate(samples): + if ( + sample.proxy != proxy + or sample.pipe_run_ref != pipe_run_ref + or sample.output_idx != output_idx + ): + raise RuntimeError("All samples should belong to the same batch") + + if sample.sample_idx != i: + raise RuntimeError("Unexpected sample order") + + pipe_run_ref.is_complete = True + if not proxy._deterministic and not pipe_run_ref.is_scheduled: + pipe_run_ref = proxy._schedule_batch(pipe_run_ref) + return DALIOutputBatchRef(pipe_run_ref, output_idx) + + +# In-place modify `default_collate_fn_map` to handle DALIOutputSampleRef +_default_collate_fn_map.update({DALIOutputSampleRef: _collate_dali_output_sample_ref_fn}) + + +class _DALIProxy: + class _WorkerData: + def __init__(self, worker_id): + self.worker_id = worker_id + self.pipe_run_ref = None + self.next_worker_batch_idx = 0 + self.batch_sample_idx = 0 + + def __init__(self, signature, dali_input_q, deterministic): + # If True, the request is not sent to DALI upon creation, so that it can be scheduled + # always in the same order by the main process. This comes at a cost of performance + self._deterministic = deterministic + # Shared queue with the server + self._dali_input_q = dali_input_q + # Per worker + self._worker_data = None + # Override callable signature + self._signature = signature + # Current batch + self._curr_batch_params = {} + # get num outputs + self._num_outputs = self._get_num_outputs() + + def _get_num_outputs(self): + return_annotation = self._signature.return_annotation + if isinstance(return_annotation, tuple): + return len(return_annotation) + elif return_annotation is Signature.empty: + return 0 + else: + return 1 + + def _get_worker_id(self): + """ + Getter for 'worker_id'. In case of torch data worker it is the worker info, and + in case of a thread the thread identifier + """ + worker_info = _torchdata.get_worker_info() + return worker_info.id if worker_info else threading.get_ident() + + def _get_worker_data(self): + if self._worker_data is None: + self._worker_data = _DALIProxy._WorkerData(self._get_worker_id()) + return self._worker_data + + def _add_sample(self, inputs): + """ + Adds a sample to the current batch. In the collate function, we mark the batch as + complete. When a completed batch is encountered, a new batch should be started. + """ + worker_data = self._get_worker_data() + if worker_data.pipe_run_ref is None or worker_data.pipe_run_ref.is_complete: + worker_data.pipe_run_ref = DALIPipelineRunRef( + batch_id=(worker_data.worker_id, worker_data.next_worker_batch_idx) + ) + worker_data.next_worker_batch_idx += 1 + worker_data.batch_sample_idx = 0 + + for name, value in inputs.items(): + # we want to transfer only the arguments to the caller side, not the the self reference + if name == "self": + continue + if name not in worker_data.pipe_run_ref.call_args: + worker_data.pipe_run_ref.call_args[name] = [] + worker_data.pipe_run_ref.call_args[name].append(value) + + ret = tuple( + DALIOutputSampleRef( + self, + pipe_run_ref=worker_data.pipe_run_ref, + output_idx=i, + sample_idx=worker_data.batch_sample_idx, + ) + for i in range(self._num_outputs) + ) + # unpack single element tuple + if len(ret) == 1: + ret = ret[0] + worker_data.batch_sample_idx += 1 + return ret + + def _schedule_batch(self, pipe_run_ref): + """ + Schedules a batch for execution by appending it to the DALI input queue. + """ + if not pipe_run_ref.call_args: + raise RuntimeError("No inputs for the pipeline to run (was it already scheduled?)") + if not pipe_run_ref.is_complete: + raise RuntimeError("Batch is not marked as complete") + _nvtx.range_push(f"dali_input_q put {pipe_run_ref.batch_id}") + + dali_input_q_item = DALIPipelineRunRef(pipe_run_ref.batch_id) + dali_input_q_item.call_args = pipe_run_ref.call_args + dali_input_q_item.is_complete = True + dali_input_q_item.is_scheduled = True + self._dali_input_q.put(dali_input_q_item) + + pipe_run_ref.call_args = {} + pipe_run_ref.is_scheduled = True + _nvtx.range_pop() + return pipe_run_ref + + +class DALIServer: + def __init__(self, pipeline, deterministic=False): + """ + Initializes a new DALI server instance. + + Args: + pipeline (Pipeline): DALI pipeline to run. + deterministic (bool): If True, it ensures that the order of execution is always + the same, which is important when the pipeline has a state + and we are interested in obtaining reproducible results. + Also, if enabled, the execution will be less performant, as + the DALI processing can be scheduled only after the data + loader has returned the batch information, and not as soon + as data worker collates the batch. + + Example 1 - Full integration with PyTorch via DALI proxy DataLoader: + + .. code-block:: python + + @pipeline_def + def rn50_train_pipe(): + rng = fn.random.coin_flip(probability=0.5) + filepaths = fn.external_source(name="images", no_copy=True) + jpegs = fn.io.file.read(filepaths) + images = fn.decoders.image_random_crop( + jpegs, + device="mixed", + output_type=types.RGB, + random_aspect_ratio=[0.75, 4.0 / 3.0], + random_area=[0.08, 1.0], + ) + images = fn.resize( + images, + size=[224, 224], + interp_type=types.INTERP_LINEAR, + antialias=False, + ) + output = fn.crop_mirror_normalize( + images, + dtype=types.FLOAT, + output_layout="CHW", + crop=(224, 224), + mean=[0.485 * 255, 0.456 * 255, 0.406 * 255], + std=[0.229 * 255, 0.224 * 255, 0.225 * 255], + mirror=rng, + ) + return output + + def read_filepath(path): + return np.frombuffer(path.encode(), dtype=np.int8) + + nworkers = 8 + pipe = rn50_train_pipe( + batch_size=16, num_threads=3, device_id=0, + prefetch_queue_depth=2*nworkers) + + # The scope makes sure the server starts and stops at enter/exit + with dali_proxy.DALIServer(pipe) as dali_server: + # DALI proxy instance can be used as a transform callable + dataset = torchvision.datasets.ImageFolder( + jpeg, transform=dali_server.proxy, loader=read_filepath) + + # Same interface as torch DataLoader, but takes a dali_server as first argument + loader = nvidia.dali.plugin.pytorch.experimental.proxy.DataLoader( + dali_server, + dataset, + batch_size=batch_size, + num_workers=nworkers, + drop_last=True, + ) + + for data, target in loader: + # consume it + + Example 2 - Manual execution using DALI proxy / DALI server and PyTorch's default_collate: + + .. code-block:: python + + @pipeline_def + def my_pipe(): + a = fn.external_source(name="a", no_copy=True) + b = fn.external_source(name="b", no_copy=True) + return a + b, a - b + + with dali_proxy.DALIServer( + my_pipe(device='cpu', batch_size=batch_size, + num_threads=3, device_id=None)) as dali_server: + + outs = [] + for _ in range(batch_size): + a = np.array(np.random.rand(3, 3), dtype=np.float32) + b = np.array(np.random.rand(3, 3), dtype=np.float32) + out0, out1 = dali_server.proxy(a=a, b=b) + outs.append((a, b, out0, out1)) + + outs = torch.utils.data.dataloader.default_collate(outs) + + a, b, a_plus_b, a_minus_b = dali_server.produce_data(outs) + + Example 3 - Full integration with PyTorch but using the original PyTorch DataLoader + + .. code-block:: python + + pipe = rn50_train_pipe(...) + with dali_proxy.DALIServer(pipe) as dali_server: + dataset = torchvision.datasets.ImageFolder( + jpeg, transform=dali_server.proxy, loader=read_filepath) + + # Using PyTorch DataLoader directly + loader = torch.utils.data.DataLoader( + dataset, + batch_size=batch_size, + num_workers=nworkers, + drop_last=True, + ) + + for data, target in loader: + # replaces the output reference with actual data + data = dali_server.produce_data(data) + ... + """ + if not isinstance(pipeline, _Pipeline): + raise TypeError(f"Expected an NVIDIA DALI pipeline, got: {pipeline}") + else: + self._pipe = pipeline + + # get the dali pipeline input names + self._dali_input_names = _external_source_node_names(self._pipe) + num_inputs = len(self._dali_input_names) + if num_inputs == 0: + raise RuntimeError("The provided pipeline doesn't have any inputs") + + parameters = [Parameter("self", Parameter.POSITIONAL_OR_KEYWORD)] + parameter_kind = ( + Parameter.POSITIONAL_OR_KEYWORD if num_inputs == 1 else Parameter.KEYWORD_ONLY + ) + for input_name in self._dali_input_names: + parameters.append(Parameter(input_name, parameter_kind)) + return_annotation = tuple(DALIOutputSampleRef for _ in range(self._pipe.num_outputs)) + self._signature = Signature(parameters, return_annotation=return_annotation) + + # Multi-process queue used to transfer data from the pytorch workers to the main process + self._dali_input_q = _mp.Queue() + # Multi-process queue used by the main process to consume outputs from the DALI pipeline + self._dali_output_q = queue.Queue() + # Thread + self._thread = None + self._thread_stop_event = None + # Cache + self._cache_outputs = dict() + # Whether we want the order of DALI execution to be reproducible + self._deterministic = deterministic + # Proxy + self._proxy = None + + def __del__(self): + self.stop_thread() + + @property + def proxy(self): + if not self._proxy: + + class _DALIProxyCallable(_DALIProxy): + def __init__(self, signature, dali_input_q, deterministic): + super().__init__(signature, dali_input_q, deterministic) + + @_modify_signature(self._signature) + def __call__(self, *args, **kwargs): + bound_args = self._signature.bind(self, *args, **kwargs) + return self._add_sample(bound_args.arguments) + + self._proxy = _DALIProxyCallable( + self._signature, self._dali_input_q, self._deterministic + ) + return self._proxy + + def _get_outputs(self, pipe_run_ref: DALIPipelineRunRef): + """ + Gets the pipeline outputs for a specific batch id. It will keep reading data until the + right batch is found, caching the results that were not consumed until a future call + """ + req_batch_id = pipe_run_ref.batch_id + + # In case we haven't scheduled the iteration yet (i.e. deterministic config), do it now + if not pipe_run_ref.is_scheduled: + _nvtx.range_push(f"dali_input_q put {pipe_run_ref.batch_id}") + self._dali_input_q.put(pipe_run_ref) + pipe_run_ref.is_scheduled = True + _nvtx.range_pop() + + # Wait for the requested output to be ready + req_outputs = None + # If the data was already read, just return it (and clear the cache entry) + if req_batch_id in self._cache_outputs: + req_outputs = self._cache_outputs[req_batch_id] + del self._cache_outputs[req_batch_id] + + else: + curr_batch_id = None + # If not the data we are looking for, store it and keep processing until we find it + while req_batch_id != curr_batch_id: + _nvtx.range_push("dali_output_q.get") + curr_batch_id, curr_processed_outputs, err = self._dali_output_q.get() + _nvtx.range_pop() + + if err is not None: + raise err + + if curr_batch_id == req_batch_id: + req_outputs = curr_processed_outputs + else: + self._cache_outputs[curr_batch_id] = curr_processed_outputs + return req_outputs + + @staticmethod + def _need_conversion(obj, need_conversion_cache): + """Return True if the object or any of its members need conversion.""" + obj_id = id(obj) + if obj_id in need_conversion_cache: + return need_conversion_cache[obj_id] + + if isinstance(obj, DALIOutputBatchRef): + need_conversion_cache[obj_id] = True + return True + + need_conversion_cache[obj_id] = False # Prevent infinite recursion + for item in ( + obj + if isinstance(obj, (list, tuple)) + else obj.values() if isinstance(obj, dict) else getattr(obj, "__dict__", {}).values() + ): + if DALIServer._need_conversion(item, need_conversion_cache): + need_conversion_cache[obj_id] = True + return True + + return False + + def _produce_data_impl(self, obj, cache, need_conversion_cache): + """ + Recursive single-pass implementation of produce_data with in-place modifications. + """ + + obj_id = id(obj) + if obj_id in cache: # Return cached result to prevent infinite recursion + return cache[obj_id] + + # If it doesn't need conversion, return immediately + if not DALIServer._need_conversion(obj, need_conversion_cache): + cache[obj_id] = obj + return obj + + # Handle DALIOutputBatchRef + if isinstance(obj, DALIOutputBatchRef): + pipe_run_ref_id = id(obj.pipe_run_ref) + if pipe_run_ref_id not in cache: + cache[pipe_run_ref_id] = self._get_outputs(obj.pipe_run_ref) + outputs = cache[pipe_run_ref_id] + result = outputs[obj.output_idx] + cache[obj_id] = result + return result + + # Handle lists (modify in place) + if isinstance(obj, list): + cache[obj_id] = obj # Cache before recursion to prevent infinite loops + for i in range(len(obj)): + obj[i] = self._produce_data_impl(obj[i], cache, need_conversion_cache) + return obj + + # Handle tuples (regular, named, or custom) + if isinstance(obj, tuple): + # Named tuple: Reconstruct using `_replace` + if hasattr(obj, "_replace") and hasattr(obj, "_fields"): + result = obj._replace( + **{ + field: self._produce_data_impl( + getattr(obj, field), cache, need_conversion_cache + ) + for field in obj._fields + } + ) + # Regular or custom tuple: Reconstruct using type(obj) + else: + result = type(obj)( + self._produce_data_impl(item, cache, need_conversion_cache) for item in obj + ) + cache[obj_id] = result + return result + + # Handle dictionaries (modify in place) + if isinstance(obj, dict): + cache[obj_id] = obj # Cache before recursion to prevent infinite loops + for key in list(obj.keys()): # Ensure we handle dynamic changes in the dictionary + obj[key] = self._produce_data_impl(obj[key], cache, need_conversion_cache) + return obj + + # Handle custom objects with attributes (modify in place) + if hasattr(obj, "__dict__"): + cache[obj_id] = obj # Cache before recursion to prevent infinite loops + for attr_name, attr_value in obj.__dict__.items(): + setattr( + obj, + attr_name, + self._produce_data_impl(attr_value, cache, need_conversion_cache), + ) + return obj + + # Default case (shouldn't be reached since we exited early in case of no-op) + cache[obj_id] = obj + return obj + + def produce_data(self, obj): + """ + A generic function to recursively visits all elements in a nested structure and replace + instances of DALIOutputBatchRef with the actual data provided by the DALI server + See :class:`nvidia.dali.plugin.pytorch.experimental.proxy.DALIServer` for a full example. + + Args: + obj: The object to map (can be an instance of any class). + + Returns: + A new object where any instance of DALIOutputBatchRef has been replaced with actual + data. + + """ + cache = {} + need_conversion_cache = {} + ret = self._produce_data_impl(obj, cache, need_conversion_cache) + return ret + + def _get_input_batches(self, max_num_batches, timeout=None): + _nvtx.range_push("dali_input_q.get") + count = 0 + batches = [] + if timeout is not None: + try: + batches.append(self._dali_input_q.get(timeout=timeout)) + count = count + 1 + except Empty: + return None + except _mp.TimeoutError: + return None + + while count < max_num_batches: + try: + batches.append(self._dali_input_q.get_nowait()) + count = count + 1 + except Empty: + break + _nvtx.range_pop() + return batches + + def _thread_fn(self): + """ + Asynchronous DALI thread that gets iteration data from the queue and schedules it + for execution + """ + fed_batches = [] + while not self._thread_stop_event.is_set(): + _nvtx.range_push("get_input_batches") + timeout = 5 if len(fed_batches) == 0 else None + # We try to feed as many batches as the prefetch queue (if available) + batches = self._get_input_batches( + self._pipe.prefetch_queue_depth - len(fed_batches), timeout=timeout + ) + _nvtx.range_pop() + if batches is not None and len(batches) > 0: + _nvtx.range_push("feed_pipeline") + for pipe_run_ref in batches: + for name, data in pipe_run_ref.call_args.items(): + self._pipe.feed_input(name, data) + self._pipe._run_once() + fed_batches.append(pipe_run_ref.batch_id) + _nvtx.range_pop() + + # If no batches to consume, continue + if len(fed_batches) == 0: + continue + + _nvtx.range_push("outputs") + batch_id = fed_batches.pop(0) # we are sure there's at least one + err = None + torch_outputs = None + try: + pipe_outputs = self._pipe.outputs() + torch_outputs = tuple( + [ + to_torch_tensor(out.as_tensor(), not self._pipe.exec_dynamic) + for out in pipe_outputs + ] + ) + except Exception as exception: + err = exception + + self._dali_output_q.put((batch_id, torch_outputs, err)) + _nvtx.range_pop() + + def start_thread(self): + """ + Starts the DALI pipeline thread. Note: Using scope's __enter__/__exit__ is preferred + """ + if self._thread is not None: + return + self._thread = threading.Thread(target=DALIServer._thread_fn, args=(self,)) + self._thread_stop_event = threading.Event() + self._thread.start() + + def stop_thread(self): + """ + Stops the DALI pipeline thread. Note: Using scope's __enter__/__exit__ is preferred + """ + if self._thread_stop_event is None: + return + self._thread_stop_event.set() + self._thread.join() + self._thread = None + self._thread_stop_event = None + + def _is_thread_running(self): + return self._thread is not None + + def __enter__(self): + """ + Starts the DALI pipeline thread + """ + self.start_thread() + return self + + def __exit__(self, exc_type, exc_value, tb): + """ + Stops the DALI pipeline thread + """ + self.stop_thread() + return False # Return False to propagate exceptions + + +class DataLoader(_torchdata.dataloader.DataLoader): + """ + DALI data loader to be used in the main loop, which replaces the pipeline run references + with actual data produced by the DALI server. + See :class:`nvidia.dali.plugin.pytorch.experimental.proxy.DALIServer` for a full example. + """ + + class _Iter(_torchdata.dataloader._MultiProcessingDataLoaderIter): + """ + Data loader iterator used by the DALI proxy data loader + """ + + def __init__(self, loader): + super().__init__(loader) + self.loader = loader + + def _next_data(self): + self.loader.ensure_server_listening() + data = super()._next_data() + return self.loader.dali_server.produce_data(data) + + def __init__(self, dali_server, *args, **kwargs): + """ + Same interface as PyTorch's DataLoader except for the extra DALIServer argument + """ + super().__init__(*args, **kwargs) + self.dali_server = dali_server + self.server_started_by_loader = False + + def __del__(self): + if self.server_started_by_loader and self.dali_server._is_thread_running(): + print("Stop") + self.dali_server.stop_thread() + + def ensure_server_listening(self): + if not self.dali_server._is_thread_running(): + self.dali_server.start_thread() + self.server_started_by_loader = True + + def _get_iterator(self): + return DataLoader._Iter(self) diff --git a/dali/python/nvidia/dali/plugin/pytorch/torch_utils.py b/dali/python/nvidia/dali/plugin/pytorch/torch_utils.py new file mode 100644 index 00000000000..5a5c79f09f4 --- /dev/null +++ b/dali/python/nvidia/dali/plugin/pytorch/torch_utils.py @@ -0,0 +1,102 @@ +# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import ctypes +from nvidia.dali import types +from nvidia.dali.tensors import TensorListGPU, TensorListCPU, TensorGPU, TensorCPU +from typing import Union, Any + +to_torch_type = { + types.DALIDataType.FLOAT: torch.float32, + types.DALIDataType.FLOAT64: torch.float64, + types.DALIDataType.FLOAT16: torch.float16, + types.DALIDataType.UINT8: torch.uint8, + types.DALIDataType.INT8: torch.int8, + types.DALIDataType.BOOL: torch.bool, + types.DALIDataType.INT16: torch.int16, + types.DALIDataType.INT32: torch.int32, + types.DALIDataType.INT64: torch.int64, +} + + +def feed_ndarray( + dali_tensor: Union[TensorCPU, TensorGPU, TensorListCPU, TensorListGPU], + arr: torch.Tensor, + cuda_stream: Union[torch.cuda.Stream, Any, None] = None, +) -> torch.Tensor: + """ + Copy contents of DALI tensor to PyTorch's Tensor. + + Parameters + ---------- + dali_tensor : nvidia.dali.backend.TensorCPU or nvidia.dali.backend.TensorGPU + Tensor from which to copy + arr : torch.Tensor + Destination of the copy + cuda_stream : torch.cuda.Stream, cudaStream_t or any value that can be cast to cudaStream_t. + CUDA stream to be used for the copy + (if not provided, an internal user stream will be selected) + In most cases, using pytorch's current stream is expected (for example, + if we are copying to a tensor allocated with torch.zeros(...)) + """ + dali_type = to_torch_type[dali_tensor.dtype] + + assert dali_type == arr.dtype, ( + "The element type of DALI Tensor/TensorList" + " doesn't match the element type of the target PyTorch Tensor: " + "{} vs {}".format(dali_type, arr.dtype) + ) + assert dali_tensor.shape() == list( + arr.size() + ), "Shapes do not match: DALI tensor has size {0}, but PyTorch Tensor has size {1}".format( + dali_tensor.shape(), list(arr.size()) + ) + + non_blocking = cuda_stream is not None + cuda_stream = types._raw_cuda_stream_ptr(cuda_stream) + + # turn raw int to a c void pointer + c_type_pointer = ctypes.c_void_p(arr.data_ptr()) + if isinstance(dali_tensor, (TensorGPU, TensorListGPU)): + dali_tensor.copy_to_external(c_type_pointer, cuda_stream, non_blocking=non_blocking) + else: + dali_tensor.copy_to_external(c_type_pointer) + return arr + + +def to_torch_tensor(dali_tensor, copy): + """ + Converts to torch.Tensor, either copying the data or using dlpack + """ + if copy: + torch_dtype = to_torch_type[dali_tensor.dtype] + if isinstance(dali_tensor, TensorGPU): + torch_device = torch.device("cuda", dali_tensor.device_id()) + else: + torch_device = torch.device("cpu") + torch_output = torch.empty( + dali_tensor.shape(), + dtype=torch_dtype, + device=torch_device, + ) + cuda_stream = ( + torch.cuda.current_stream(device=torch_device) + if isinstance(dali_tensor, TensorGPU) + else None + ) + feed_ndarray(dali_tensor, torch_output, cuda_stream=cuda_stream) + return torch_output + else: + return torch.from_dlpack(dali_tensor) diff --git a/dali/test/python/test_dali_proxy.py b/dali/test/python/test_dali_proxy.py new file mode 100644 index 00000000000..3c02d157443 --- /dev/null +++ b/dali/test/python/test_dali_proxy.py @@ -0,0 +1,625 @@ +# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nvidia.dali import pipeline_def, fn, types +import numpy as np +import os +from nose2.tools import params +from nose_utils import attr, assert_raises + + +def read_file(path): + return np.fromfile(path, dtype=np.uint8) + + +def read_filepath(path): + return np.frombuffer(path.encode(), dtype=np.int8) + + +dali_extra = os.environ["DALI_EXTRA_PATH"] +jpeg = os.path.join(dali_extra, "db", "single", "jpeg") +jpeg_113 = os.path.join(jpeg, "113") +test_files = [ + os.path.join(jpeg_113, f) + for f in ["snail-4291306_1280.jpg", "snail-4345504_1280.jpg", "snail-4368154_1280.jpg"] +] +test_input_filenames = [read_filepath(fname) for fname in test_files] + + +@pipeline_def +def image_pipe(dali_device="gpu", include_decoder=True, random_pipe=True): + if include_decoder: + filepaths = fn.external_source(name="images", no_copy=True) + jpegs = fn.io.file.read(filepaths) + decoder_device = "mixed" if dali_device == "gpu" else "cpu" + + if random_pipe: + images = fn.decoders.image_random_crop( + jpegs, + device=decoder_device, + output_type=types.RGB, + random_aspect_ratio=[0.75, 4.0 / 3.0], + random_area=[0.08, 1.0], + ) + else: + images = fn.decoders.image( + jpegs, + device=decoder_device, + output_type=types.RGB, + ) + else: + images = fn.external_source(name="images", no_copy=True) + if random_pipe: + shapes = images.shape() + crop_anchor, crop_shape = fn.random_crop_generator( + shapes, random_aspect_ratio=[0.75, 4.0 / 3.0], random_area=[0.08, 1.0] + ) + images = fn.slice(images, start=crop_anchor, shape=crop_shape, axes=[0, 1]) + + images = fn.resize( + images, + size=[224, 224], + interp_type=types.INTERP_LINEAR, + antialias=False, + ) + mirror = fn.random.coin_flip(probability=0.5) if random_pipe else False + output = fn.crop_mirror_normalize( + images, + dtype=types.FLOAT, + output_layout="CHW", + crop=(224, 224), + mean=[0.485 * 255, 0.456 * 255, 0.406 * 255], + std=[0.229 * 255, 0.224 * 255, 0.225 * 255], + mirror=mirror, + ) + return output + + +@attr("pytorch") +@params(("cpu", False), ("cpu", True), ("gpu", False), ("gpu", True)) +def test_dali_proxy_torch_data_loader(device, include_decoder, debug=False): + # Shows how DALI proxy is used in practice with a PyTorch data loader + + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + import torchvision.datasets as datasets + from torch.utils import data as torchdata + + batch_size = 4 + num_threads = 3 + device_id = 0 + nworkers = 4 + pipe = image_pipe( + dali_device=device, + include_decoder=include_decoder, + random_pipe=False, + batch_size=batch_size, + num_threads=num_threads, + device_id=device_id, + prefetch_queue_depth=2 + nworkers, + ) + + pipe_ref = image_pipe( + dali_device=device, + include_decoder=include_decoder, + random_pipe=False, + batch_size=batch_size, + num_threads=num_threads, + device_id=device_id, + prefetch_queue_depth=1, + ) + + dali_server = dali_proxy.DALIServer(pipe) + if include_decoder: + dataset = datasets.ImageFolder(jpeg, transform=dali_server.proxy, loader=read_filepath) + dataset_ref = datasets.ImageFolder(jpeg, transform=lambda x: x.copy(), loader=read_filepath) + else: + dataset = datasets.ImageFolder(jpeg, transform=dali_server.proxy) + dataset_ref = datasets.ImageFolder(jpeg, transform=lambda x: x.copy()) + + loader = dali_proxy.DataLoader( + dali_server, + dataset, + batch_size=batch_size, + num_workers=nworkers, + drop_last=True, + ) + + def ref_collate_fn(batch): + filepaths, labels = zip(*batch) # Separate the inputs and labels + # Just return the batch as they are, a list of individual tensors + return filepaths, labels + + loader_ref = torchdata.dataloader.DataLoader( + dataset_ref, + batch_size=batch_size, + num_workers=1, + collate_fn=ref_collate_fn, + shuffle=False, + ) + + for _, ((data, target), (ref_data, ref_target)) in enumerate(zip(loader, loader_ref)): + np.testing.assert_array_equal(target, ref_target) + pipe_ref.feed_input("images", ref_data) + (ref_data,) = pipe_ref.run() + for sample_idx in range(batch_size): + np.testing.assert_array_equal(ref_data[sample_idx].as_cpu(), data[sample_idx].cpu()) + + dali_server.stop_thread() # make sure we stop the thread before leaving the test + + +@attr("pytorch") +@params(("gpu",)) +def test_dali_proxy_manual_integration(device, debug=False): + # Shows how to integrate with DALI proxy manually with an existing data loader + + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + import torch + from torch.utils import data as torchdata + from PIL import Image + + batch_size = 4 + num_threads = 3 + device_id = 0 + nworkers = 4 + + class CustomDatasetOnlyDecoding(torchdata.Dataset): + def __init__(self, folder_path): + self.folder_path = folder_path + self.image_files = self._find_images_in_folder(folder_path) + + def _find_images_in_folder(self, folder_path): + """ + Recursively find all image files in the folder and its subdirectories. + """ + image_files = [] + + # Walk through all directories and subdirectories + for root, _, files in os.walk(folder_path): + for file in files: + if file.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".gif")): + image_files.append(os.path.join(root, file)) + + return image_files + + def __len__(self): + """Returns the number of images in the folder.""" + return len(self.image_files) + + def __getitem__(self, idx): + img_name = self.image_files[idx] + img_path = os.path.join(self.folder_path, img_name) + img = Image.open(img_path).convert("RGB") # Convert image to RGB (3 channels) + other = 1 + return np.array(img), other + + @pipeline_def + def processing_pipe(dali_device="gpu"): + images = fn.external_source(name="images", no_copy=True) + rng = fn.random.coin_flip(probability=0.5) + if dali_device == "gpu": + images = images.gpu() + images = fn.resize( + images, + device=dali_device, + size=[224, 224], + interp_type=types.INTERP_LINEAR, + antialias=False, + ) + images = fn.flip(images, horizontal=rng) + output = fn.crop_mirror_normalize( + images, + dtype=types.FLOAT, + output_layout="CHW", + crop=(224, 224), + mean=[0.485 * 255, 0.456 * 255, 0.406 * 255], + std=[0.229 * 255, 0.224 * 255, 0.225 * 255], + ) + return output + + plain_dataset = CustomDatasetOnlyDecoding(jpeg) + pipe = processing_pipe( + device, + batch_size=batch_size, + num_threads=num_threads, + device_id=device_id, + prefetch_queue_depth=2 + nworkers, + ) + + class CustomDatasetDALI(torchdata.Dataset): + def __init__(self, orig_dataset, dali_proxy): + self.dataset = orig_dataset + self.dali_proxy = dali_proxy + + def __len__(self): + return self.dataset.__len__() + + def __getitem__(self, idx): + img, other = self.dataset.__getitem__(idx) + img2 = self.dali_proxy(img) + return img2, other + + # This is just for educational purposes. It is recommended to rely + # default_collate_fn_map, which is updated to handle DALIOuputSampleRef + def custom_collate_fn(batch): + images, labels = zip(*batch) + return dali_proxy._collate_dali_output_sample_ref_fn(images), torch.tensor( + labels, dtype=torch.long + ) + + # Run the server (it also cleans up on scope exit) + with dali_proxy.DALIServer(pipe) as dali_server: + dataset = CustomDatasetDALI(plain_dataset, dali_server.proxy) + loader = torchdata.dataloader.DataLoader( + dataset, + batch_size=batch_size, + num_workers=nworkers, + drop_last=True, + collate_fn=custom_collate_fn, + ) + + assert len(loader) > 0 + for next_input, next_target in loader: + assert isinstance(next_input, dali_proxy.DALIOutputBatchRef) + next_input = dali_server.produce_data(next_input) + assert isinstance(next_input, torch.Tensor) + np.testing.assert_equal([batch_size, 3, 224, 224], next_input.shape) + np.testing.assert_equal( + [ + batch_size, + ], + next_target.shape, + ) + + +@attr("pytorch") +@params((False,), (True,)) +def test_dali_proxy_deterministic(deterministic, debug=False): + # Shows how DALI proxy can be configured for deterministic results + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + import torchvision.datasets as datasets + import torch + + # Use a high number of iterations for non-deterministic tests, even though + # we stop the test once we get different results (usually in the first iteration). + # For deterministic tests, we check that all runs produce the same results. + niterations = 3 if deterministic else 10 + num_workers = 4 + seed0 = 123456 + seed1 = 5555464 + seed2 = 775653 + + outputs = [] + for i in range(niterations): + pipe = image_pipe( + random_pipe=True, + dali_device="gpu", + batch_size=1, + num_threads=1, + device_id=0, + seed=seed0, + prefetch_queue_depth=1, + ) + outputs_i = [] + torch.manual_seed(seed2) + with dali_proxy.DALIServer(pipe, deterministic=deterministic) as dali_server: + dataset = datasets.ImageFolder(jpeg, transform=dali_server.proxy, loader=read_filepath) + # many workers so that we introduce a lot of variability in the order of arrival + loader = dali_proxy.DataLoader( + dali_server, + dataset, + batch_size=1, + num_workers=num_workers, + shuffle=True, + worker_init_fn=lambda worker_id: np.random.seed(seed1 + worker_id), + ) + outputs_i = [] + for _ in range(num_workers): + for data, _ in loader: + outputs_i.append(data.cpu()) + break + outputs.append(outputs_i) + + if i > 0: + if deterministic: + for k in range(num_workers): + assert np.array_equal(outputs[i][k], outputs[0][k]) + else: + for k in range(num_workers): + if not np.array_equal(outputs[i][k], outputs[0][k]): + return # OK + + pipe._shutdown() + del pipe + + if not deterministic: + assert False, "we got exactly the same results in all runs" + + +@attr("pytorch") +def test_dali_proxy_error_propagation(): + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + import torchvision.datasets as datasets + + batch_size = 4 + num_threads = 3 + device_id = 0 + nworkers = 2 + + @pipeline_def + def pipe_with_error(): + images = fn.external_source(name="images", no_copy=True) + error_anchor = types.Constant(np.array([-10], dtype=np.float32)) + return fn.crop( + images, crop=(224, 224), crop_pos_x=error_anchor, out_of_bounds_policy="error" + ) + + pipe = pipe_with_error( + batch_size=batch_size, + num_threads=num_threads, + device_id=device_id, + prefetch_queue_depth=3, + ) + with dali_proxy.DALIServer(pipe) as dali_server: + + dataset = datasets.ImageFolder(jpeg, transform=dali_server.proxy) + loader = dali_proxy.DataLoader( + dali_server, + dataset, + batch_size=batch_size, + num_workers=nworkers, + ) + + err_msg = "Critical error in pipeline:*Anchor for dimension 1*is out of range*" + with assert_raises(RuntimeError, glob=err_msg): + next(iter(loader)) + + # For some reason if we don't do this in this test, we see some ignored exception + # messages in the next test + pipe._shutdown() + del pipe + + +@attr("pytorch") +@params(("cpu",), ("gpu",)) +def test_dali_proxy_duplicated_outputs(device, debug=False): + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + from torch.utils import data as torchdata + from PIL import Image + + batch_size = 4 + num_threads = 3 + device_id = 0 + nworkers = 4 + pipe = image_pipe( + dali_device=device, + include_decoder=False, + random_pipe=False, + batch_size=batch_size, + num_threads=num_threads, + device_id=device_id, + prefetch_queue_depth=2 + nworkers, + ) + + class MyDataset(torchdata.Dataset): + def __init__(self, folder_path, transform): + self.folder_path = folder_path + self.image_files = self._find_images_in_folder(folder_path) + self.transform = transform + + def _find_images_in_folder(self, folder_path): + """ + Recursively find all image files in the folder and its subdirectories. + """ + image_files = [] + + # Walk through all directories and subdirectories + for root, _, files in os.walk(folder_path): + for file in files: + if file.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".gif")): + image_files.append(os.path.join(root, file)) + + return image_files + + def __len__(self): + """Returns the number of images in the folder.""" + return len(self.image_files) + + def __getitem__(self, idx): + img_name = self.image_files[idx] + img_path = os.path.join(self.folder_path, img_name) + img = Image.open(img_path).convert("RGB") # Convert image to RGB (3 channels) + img = self.transform(img) + return img, 1, img + + with dali_proxy.DALIServer(pipe) as dali_server: + dataset = MyDataset(jpeg, transform=dali_server.proxy) + loader = dali_proxy.DataLoader( + dali_server, + dataset, + batch_size=batch_size, + num_workers=nworkers, + drop_last=True, + ) + + for data1, _, data2 in loader: + np.testing.assert_array_equal(data1, data2) + + +@pipeline_def +def pipe_2_outputs(device): + a = fn.external_source(name="a", no_copy=True) + b = fn.external_source(name="b", no_copy=True) + if device == "gpu": + a = a.gpu() + b = b.gpu() + return a + b, b - a + + +@attr("pytorch") +@params(("cpu",), ("gpu",)) +def test_dali_proxy_rearrange_output_order_and_positional_args(device, debug=False): + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + from torch.utils import data as torchdata + + batch_size = 4 + num_threads = 3 + device_id = 0 + nworkers = 4 + arrs = np.random.rand(20, 3) + + pipe1 = pipe_2_outputs( + device=device, + batch_size=batch_size, + num_threads=num_threads, + device_id=device_id, + prefetch_queue_depth=2 + nworkers, + ) + pipe2 = pipe_2_outputs( + device=device, + batch_size=batch_size, + num_threads=num_threads, + device_id=device_id, + prefetch_queue_depth=2 + nworkers, + ) + + class MyDataset(torchdata.Dataset): + def __init__(self, arrs, transform, reverse_order): + self.arrs = arrs + self.n = len(arrs) + self.transform = transform + self.reverse_order = reverse_order + + def __len__(self): + """Returns the number of images in the folder.""" + return self.n + + def __getitem__(self, idx): + a = self.arrs[idx] + b = self.arrs[idx + 1 if idx < self.n - 1 else 0] + a_plus_b, b_minus_a = self.transform(b=b, a=a) # reverse order in purpose + return (b_minus_a, 1, a_plus_b) if self.reverse_order else (a_plus_b, 1, b_minus_a) + + with dali_proxy.DALIServer(pipe1) as dali_server1, dali_proxy.DALIServer(pipe2) as dali_server2: + loader1 = dali_proxy.DataLoader( + dali_server1, + MyDataset(arrs, dali_server1.proxy, reverse_order=False), + batch_size=batch_size, + num_workers=nworkers, + drop_last=True, + ) + loader2 = dali_proxy.DataLoader( + dali_server2, + MyDataset(arrs, dali_server2.proxy, reverse_order=True), + batch_size=batch_size, + num_workers=nworkers, + drop_last=True, + ) + + for data1, data2 in zip(loader1, loader2): + np.testing.assert_array_equal(data1[0].cpu(), data2[2].cpu()) + np.testing.assert_array_equal(data1[1].cpu(), data2[1].cpu()) + np.testing.assert_array_equal(data1[2].cpu(), data2[0].cpu()) + + +@attr("pytorch") +@params((4,)) +def test_dali_proxy_proxy_callable_2_args(batch_size, debug=False): + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + from torch.utils.data.dataloader import default_collate as default_collate + + with dali_proxy.DALIServer( + pipe_2_outputs(device="cpu", batch_size=batch_size, num_threads=3, device_id=None) + ) as dali_server: + + # Positional inputs are not supported when more than one input + a = np.array(np.random.rand(3, 3), dtype=np.float32) + b = np.array(np.random.rand(3, 3), dtype=np.float32) + with assert_raises(ValueError, glob="*too many positional arguments*"): + _, _ = dali_server.proxy(a, b) + + outs = [] + for _ in range(batch_size): + a = np.array(np.random.rand(3, 3), dtype=np.float32) + b = np.array(np.random.rand(3, 3), dtype=np.float32) + out0, out1 = dali_server.proxy(a=a, b=b) + outs.append((a, b, out0, out1)) + + outs = default_collate(outs) + + a, b, a_plus_b, a_minus_b = dali_server.produce_data(outs) + + np.testing.assert_array_almost_equal(a_plus_b, a + b) + np.testing.assert_array_almost_equal(a_minus_b, b - a) + + +@pipeline_def +def square(device): + a = fn.external_source(name="a", no_copy=True) + if device == "gpu": + a = a.gpu() + return a**2 + + +@attr("pytorch") +@params(("cpu",), ("gpu",)) +def test_dali_proxy_restart_server(device, debug=False): + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + from torch.utils import data as torchdata + + class MyDataset(torchdata.Dataset): + def __init__(self, transform_fn): + self.transform_fn = transform_fn + + def __len__(self): + return 10 + + def __getitem__(self, idx): + return np.array(idx), self.transform_fn(np.array(idx)) + + batch_size = 4 + dali_server = dali_proxy.DALIServer( + square(device="cpu", batch_size=batch_size, num_threads=3, device_id=None) + ) + + dataset = MyDataset(dali_server.proxy) + loader = dali_proxy.DataLoader( + dali_server, dataset, batch_size=batch_size, num_workers=2, drop_last=True + ) + for _ in range(3): # 3 epochs + assert dali_server._thread is None + for data0, data1 in iter(loader): + np.testing.assert_array_almost_equal(data0**2, data1.cpu()) + assert dali_server._thread is not None + dali_server.stop_thread() + + +@attr("pytorch") +@params((1,)) +def test_dali_proxy_produce_data_circular_dependencies(batch_size, debug=False): + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + + with dali_proxy.DALIServer( + square(device="cpu", batch_size=batch_size, num_threads=3, device_id=None) + ) as dali_server: + + class B: + def __init__(self, parent): + self.parent = parent + + class A: + def __init__(self): + self.child = B(self) + + objs = [A() for _ in range(3)] + dali_server.produce_data(objs) diff --git a/docs/plugins/pytorch_dali_proxy.rst b/docs/plugins/pytorch_dali_proxy.rst new file mode 100644 index 00000000000..bbe9a6c1692 --- /dev/null +++ b/docs/plugins/pytorch_dali_proxy.rst @@ -0,0 +1,240 @@ +PyTorch DALI Proxy +================== + +Overview +-------- + +**DALI Proxy** is a tool designed to integrate NVIDIA DALI pipelines with PyTorch data workers while maintaining the simplicity of PyTorch's dataset logic. The key features of DALI Proxy include: + +- **Efficient GPU Utilization**: DALI Proxy ensures GPU data processing occurs in the process running the main loop. This avoids performance degradation caused by multiple CUDA contexts for the same GPU. +- **Selective Offloading**: Users can offload parts of the data processing pipeline to DALI while retaining PyTorch Dataset logic, making it ideal for multi-modal applications. + +This tutorial will explain the key components, workflow, and usage of DALI Proxy in PyTorch. + +.. note:: + + **Disclaimer**: At present, data produced by the DALI proxy cannot be further processed within the Dataset. It must be passed as-is to the main loop. + If post-processing outside of DALI is needed, it should occur only after the data has been generated by the iterator. + +DALI Proxy Workflow +------------------- + +**Key Components** + +1. **DALI Pipeline** + A user-defined DALI pipeline processes input data. + +2. **DALI Server** + The server runs a background thread to execute the DALI pipeline asynchronously. + +3. **DALI Proxy** + A callable interface between PyTorch data workers and the DALI Server. + +4. **PyTorch Dataset and DataLoader** + The Dataset remains agnostic of DALI internals and uses the Proxy for preprocessing. + +**Workflow Summary** + +- A DALI pipeline is defined and connected to a **DALI Server**, which executes the pipeline in a background thread. +- The **DALI Proxy** provides an interface for PyTorch data workers to request DALI processing asynchronously. +- Each data worker invokes the proxy, which returns a **reference to a future processed sample**. +- During batch collation, the proxy groups data into a batch and sends it to the server for execution. +- The server processes the batch asynchronously and outputs the actual data to an output queue. +- The PyTorch DataLoader retrieves either the processed data or references to pending pipeline runs. The pending pipeline run references are then replaced with actual data, waiting for the data if necessary. + +API +--- + +.. autoclass:: nvidia.dali.plugin.pytorch.experimental.proxy.DALIServer + :members: + :special-members: __init__, __enter__, __exit__ + + +.. autoclass:: nvidia.dali.plugin.pytorch.experimental.proxy.DataLoader + :members: + :special-members: __getitem__, __init__ + +Example Usage +------------- + +DALI Proxy in a Nutshell +^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + from torchvision import datasets, transforms + from nvidia.dali import pipeline_def, fn, types + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + + # Step 1: Define a DALI pipeline + @pipeline_def + def my_dali_pipeline(): + images = fn.external_source(name="images", no_copy=True) + images = fn.resize(images, size=[224, 224]) + return fn.crop_mirror_normalize( + images, dtype=types.FLOAT, output_layout="CHW", + mean=[0.485 * 255, 0.456 * 255, 0.406 * 255], + std=[0.229 * 255, 0.224 * 255, 0.225 * 255], + ) + + # Step 2: Initialize DALI server. The scope makes sure to start and stop the background thread + with dali_proxy.DALIServer(my_dali_pipeline(batch_size=64, num_threads=3, device_id=0)) as dali_server: + # Step 3: Define a PyTorch Dataset using the DALI proxy + dataset = datasets.ImageFolder("/path/to/images", transform=dali_server.proxy) + + # Step 4: Use DALI proxy DataLoader + loader = dali_proxy.DataLoader(dali_server, dataset, batch_size=64, num_workers=8, drop_last=True) + + # Step 5: Consume data + for data, target in loader: + print(data.shape) # Processed data ready + +How It Works +------------ + +**1. DALI Pipeline** + +The DALI pipeline defines the data processing steps. Input data is fed using :meth:`~nvidia.dali.fn.external_source`. + +.. code-block:: python + + from nvidia.dali import pipeline_def, fn, types + + @pipeline_def + def example_pipeline(): + images = fn.external_source(name="images", no_copy=True) + images = fn.io.file.read(images) + images = fn.decoders.image(images, device="mixed", output_type=types.RGB) + return fn.resize(images, size=[224, 224]) + + pipeline = example_pipeline(batch_size=32, num_threads=2, device_id=0) + +**2. DALI Server and Proxy** + +The :class:`nvidia.dali.plugin.pytorch.experimental.proxy.DALIServer` manages the execution of the pipeline. The Proxy acts +as an interface for PyTorch data workers. +Note that the DALI pipeline should contain at least one input (an :meth:`~nvidia.dali.fn.external_source` instance), and +that the names of those nodes then become the inputs to the DALI proxy callable. + +.. code-block:: python + + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + with dali_proxy.DALIServer(pipeline) as dali_server: + future_samples = [dali_server.proxy(image) for image in images] + +With more than one input, we can choose to use positional arguments, keyword arguments: + +.. code-block:: python + + import numpy as np + from nvidia.dali import pipeline_def, fn, types + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + + @pipeline_def + def example_pipeline2(device): + a = fn.external_source(name="a", no_copy=True) + b = fn.external_source(name="b", no_copy=True) + return a + b, b - a + + with dali_proxy.DALIServer(example_pipeline2(...)) as dali_server: + a = np.array(...) + b = np.array(...) + + # Option 1: positional arguments + a_plus_b, b_minus_a = dali_server.proxy(a, b) + + # Option 2: named arguments + a_plus_b, b_minus_a = dali_server.proxy(b=b, a=a) + +It is also possible to start and stop the server explicitly: + +.. code-block:: python + + dali_server = dali_proxy.DALIServer(example_pipeline2(...)) + dataset = datasets.ImageFolder("/path/to/images", transform=dali_server.proxy) + loader = dali_proxy.DataLoader(dali_server, dataset, batch_size=64, num_workers=8, drop_last=True) + + # Optional, it will be started on first attempt to get data from the loader anyway + dali_server.start_thread() + + for data in loader: + ... + + # This is needed to make sure we have stopped the thread + dali_server.stop_thread() + +When possible, use the ``with`` scope. + +**3. Integration with PyTorch DataLoader** + +The :class:`nvidia.dali.plugin.pytorch.experimental.proxy.DataLoader` wrapper provided by DALI Proxy simplifies the integration process. + +.. code-block:: python + + from nvidia.dali.plugin.pytorch.experimental import proxy as dali_proxy + + with dali_proxy.DALIServer(pipeline) as dali_server: + dataset = CustomDataset(dali_server.proxy, data=images) + loader = dali_proxy.DataLoader(dali_server, dataset, batch_size=32, num_workers=4) + for data, _ in loader: + print(data.shape) # Ready-to-use processed batch + +If using a custom :class:`nvidia.dali.plugin.pytorch.experimental.proxy.DataLoader`, call the DALI server explicitly: + +.. code-block:: python + + with dali_proxy.DALIServer(pipeline) as dali_server: + dataset = CustomDataset(dali_server.proxy, data=images) + loader = MyCustomDataloader(...) + for data, _ in loader: + # Replaces instances of ``DALIOutputBatchRef`` with actual data + processed_data = dali_server.produce_data(data) + print(processed_data.shape) # data is now ready + +**4. Integration with PyTorch Dataset** + +The PyTorch Dataset can directly use the proxy as a transform function. Note that we can choose to offload only part of the +processing to DALI, while keeping some of the original data intact. + +.. code-block:: python + + class CustomDataset(torch.utils.data.Dataset): + def __init__(self, transform_fn, data): + self.data = data + self.transform_fn = transform_fn + + def __len__(self): + return len(self.data) + + def __getitem__(self, idx): + filename, label = self.data[idx] + return self.transform_fn(filename), label # Returns processed sample and the original label + +**5. Data Collation and Execution** + +This step is usually abstracted away inside the PyTorch DataLoader and the user doesn't need to take care of it explicitly. +The ``default_collate`` function combines processed samples into a batch. DALI executes the pipeline asynchronously when a batch is collated. + +.. code-block:: python + + from torch.utils.data.dataloader import default_collate as default_collate + + with dali_proxy.DALIServer(example_pipeline2(...)) as dali_server: + outs = [] + for _ in range(10): + a = np.array(np.random.rand(3, 3), dtype=np.float32) + b = np.array(np.random.rand(3, 3), dtype=np.float32) + a_plus_b, b_minus_a = dali_server.proxy(a, b) + outs.append((a_plus_b, b_minus_a)) + + # Collate into a single batch run reference + outs = default_collate(outs) + + # And we can now replace the run reference with actual data + outs = dali_server.produce_data(outs) + +Summary +------- + +DALI Proxy provides a clean and efficient way to integrate NVIDIA DALI with PyTorch. By offloading computationally intensive tasks to DALI while keeping PyTorch's Dataset and DataLoader interface intact, it ensures flexibility and maximum performance. +This approach is particularly powerful in large-scale data pipelines and multi-modal workflows. \ No newline at end of file diff --git a/docs/plugins/pytorch_tutorials.rst b/docs/plugins/pytorch_tutorials.rst index e4c3b83bc1e..498e351033e 100644 --- a/docs/plugins/pytorch_tutorials.rst +++ b/docs/plugins/pytorch_tutorials.rst @@ -6,5 +6,6 @@ PyTorch :glob: pytorch_plugin_api + pytorch_dali_proxy ../examples/frameworks/pytorch/index diff --git a/qa/TL0_python_self_test_frameworks/test_pytorch.sh b/qa/TL0_python_self_test_frameworks/test_pytorch.sh index bca497df458..048d81fbd40 100755 --- a/qa/TL0_python_self_test_frameworks/test_pytorch.sh +++ b/qa/TL0_python_self_test_frameworks/test_pytorch.sh @@ -14,6 +14,7 @@ test_body() { ${python_invoke_test} test_backend_impl_torch_dlpack.py ${python_invoke_test} test_dali_fork_torch.py ${python_invoke_test} test_copy_to_external_torch.py + ${python_invoke_test} test_dali_proxy.py ${python_invoke_test} --attr 'pytorch' test_external_source_impl_utils.py ${python_invoke_test} --attr 'pytorch' test_pipeline_debug.py ${python_invoke_test} --attr 'pytorch' test_functional_api.py