From 7935a9d7bf75b56ad9610b3e9432700a3db7a5a6 Mon Sep 17 00:00:00 2001 From: Karan Jariwala Date: Mon, 7 Aug 2023 08:54:27 -0700 Subject: [PATCH 1/5] Add iteration time test as part of regression testing (#358) * Add iteration time test as part of regression testing * Add logging and improved the existing test logic * Simplify code and fixed the unit test * Update logic for class dictionary * delete the directory, files, and sub-directories * Fixed test since directory clean up happens automatically in method local_remote_dir() --- regression/cloud_providers.yaml | 22 +- regression/iterate_data.py | 107 ++++----- regression/iterate_data.yaml | 54 +++-- regression/iteration_time.yaml | 22 ++ regression/synthetic_dataset.py | 379 ++++++++++++++++++++------------ regression/utils.py | 183 ++++++++++++++- streaming/base/stream.py | 5 +- tests/test_laziness.py | 2 - tests/test_stream.py | 6 +- tests/test_streaming.py | 2 +- tests/test_upload.py | 8 +- 11 files changed, 533 insertions(+), 257 deletions(-) create mode 100644 regression/iteration_time.yaml diff --git a/regression/cloud_providers.yaml b/regression/cloud_providers.yaml index 9357fd180..5306f0e8b 100644 --- a/regression/cloud_providers.yaml +++ b/regression/cloud_providers.yaml @@ -6,17 +6,19 @@ command: |- pip uninstall -y mosaicml-streaming cd streaming pip install -e '.[dev]' - composer -n 1 regression/synthetic_dataset.py --create --cloud_url $GS_CLOUD_URL --size_limit 4194304 # 4mb - composer -n 2 regression/iterate_data.py --cloud_url $GS_CLOUD_URL --check_download - composer -n 1 regression/synthetic_dataset.py --delete --cloud_url $GS_CLOUD_URL - composer -n 1 regression/synthetic_dataset.py --create --cloud_url $S3_URL --size_limit 134217728 # 128 mb - composer -n 4 regression/iterate_data.py --cloud_url $S3_URL --check_download --local - composer -n 1 regression/synthetic_dataset.py --delete --cloud_url $S3_URL - composer -n 1 regression/synthetic_dataset.py --create --cloud_url $OCI_URL --size_limit 268435456 # 256 mb - composer -n 8 regression/iterate_data.py --cloud_url $OCI_URL --check_download - composer -n 1 regression/synthetic_dataset.py --delete --cloud_url $OCI_URL + python regression/synthetic_dataset.py --create --name imagedataset --out $GCS_URL --size_limit 4194304 # 4mb + composer -n 2 regression/iterate_data.py --remote $GCS_URL --validate-files + python regression/synthetic_dataset.py --delete --out $GCS_URL -image: mosaicml/composer:0.15.0 + python regression/synthetic_dataset.py --create --name imagedataset --out $S3_URL --num_samples 500 --size_limit 134217728 # 128 mb + composer -n 4 regression/iterate_data.py --remote $S3_URL --local /tmp/local_dataset/ --validate-files + python regression/synthetic_dataset.py --delete --out $S3_URL + + python regression/synthetic_dataset.py --create --name imagedataset --out $OCI_URL --num_samples 500 --size_limit 268435456 # 256 mb + composer -n 8 regression/iterate_data.py --remote $OCI_URL --validate-files + python regression/synthetic_dataset.py --delete --out $OCI_URL + +image: mosaicml/composer:latest scheduling: resumable: true priority: medium diff --git a/regression/iterate_data.py b/regression/iterate_data.py index d6b533b83..1fccd0ea2 100644 --- a/regression/iterate_data.py +++ b/regression/iterate_data.py @@ -4,29 +4,16 @@ """Create a streaming dataset from toy data with various options for regression testing.""" import os -import tempfile +import time import urllib.parse from argparse import ArgumentParser, Namespace +from typing import Union -import utils from torch.utils.data import DataLoader +from utils import get_dataloader_params, get_kwargs, get_streaming_dataset_params from streaming import StreamingDataset -_TRAIN_EPOCHS = 2 - - -def get_kwargs(key: str) -> str: - """Parse key of named command-line arguments. - - Returns: - str: Key of named arguments. - """ - if key.startswith('--'): - key = key[2:] - key = key.replace('-', '_') - return key - def parse_args() -> tuple[Namespace, dict[str, str]]: """Parse command-line arguments. @@ -35,38 +22,33 @@ def parse_args() -> tuple[Namespace, dict[str, str]]: tuple(Namespace, dict[str, str]): Command-line arguments and named arguments. """ args = ArgumentParser() - args.add_argument('--cloud_url', type=str) - args.add_argument('--check_download', default=False, action='store_true') - args.add_argument('--local', default=False, action='store_true') - args.add_argument( - '--keep_zip', - default=False, - action='store_true', - help=('Whether to keep or delete the compressed form when decompressing' - ' downloaded shards for StreamingDataset.'), - ) - args.add_argument( - '--shuffle', - default=False, - action='store_true', - help=('Whether to iterate over the samples in randomized order for' - ' StreamingDataset.'), - ) + args.add_argument('--epochs', + type=int, + default=2, + help='Number of epochs to iterate over the data') + args.add_argument('--validate-files', default=False, action='store_true', help='Verify files') + args.add_argument('--validate-iter-time', + default=False, + action='store_true', + help='Test iter time') args, runtime_args = args.parse_known_args() kwargs = {get_kwargs(k): v for k, v in zip(runtime_args[::2], runtime_args[1::2])} return args, kwargs -def get_file_count(cloud_url: str) -> int: +def get_file_count(remote: str) -> Union[int, None]: """Get the number of files in a remote directory. Args: - cloud_url (str): Cloud provider url. + remote (str): Remote directory URL. """ - obj = urllib.parse.urlparse(cloud_url) + obj = urllib.parse.urlparse(remote) cloud = obj.scheme files = [] + if cloud == '': + return len( + [name for name in os.listdir(remote) if os.path.isfile(os.path.join(remote, name))]) if cloud == 'gs': from google.cloud.storage import Bucket, Client @@ -75,12 +57,14 @@ def get_file_count(cloud_url: str) -> int: bucket = Bucket(gcs_client, obj.netloc) files = bucket.list_blobs(prefix=obj.path.lstrip('/')) + return sum(1 for f in files if f.key[-1] != '/') elif cloud == 's3': import boto3 s3 = boto3.resource('s3') bucket = s3.Bucket(obj.netloc) files = bucket.objects.filter(Prefix=obj.path.lstrip('/')) + return sum(1 for f in files if f.key[-1] != '/') elif cloud == 'oci': import oci @@ -91,8 +75,9 @@ def get_file_count(cloud_url: str) -> int: objects = oci_client.list_objects(namespace, obj.netloc, prefix=obj.path.lstrip('/')) files = objects.data.objects - - return sum(1 for _ in files) + return sum(1 for _ in files) + else: + raise ValueError(f'Unsupported remote directory prefix {cloud} in {remote}') def main(args: Namespace, kwargs: dict[str, str]) -> None: @@ -102,41 +87,31 @@ def main(args: Namespace, kwargs: dict[str, str]) -> None: args (Namespace): Command-line arguments. kwargs (dict): Named arguments. """ - tmp_dir = tempfile.gettempdir() - tmp_download_dir = os.path.join(tmp_dir, 'test_iterate_data_download') - dataset = StreamingDataset( - remote=args.cloud_url if args.cloud_url is not None else utils.get_local_remote_dir(), - local=tmp_download_dir if args.local else None, - split=kwargs.get('split'), - download_retry=int(kwargs.get('download_retry', 2)), - download_timeout=float(kwargs.get('download_timeout', 60)), - validate_hash=kwargs.get('validate_hash'), - keep_zip=args.keep_zip, - epoch_size=int(kwargs['epoch_size']) if 'epoch_size' in kwargs else None, - predownload=int(kwargs['predownload']) if 'predownload' in kwargs else None, - cache_limit=kwargs.get('cache_limit'), - partition_algo=kwargs.get('partition_algo', 'orig'), - num_canonical_nodes=int(kwargs['num_canonical_nodes']) - if 'num_canonical_nodes' in kwargs else None, - batch_size=int(kwargs['batch_size']) if 'batch_size' in kwargs else None, - shuffle=args.shuffle, - shuffle_algo=kwargs.get('shuffle_algo', 'py1s'), - shuffle_seed=int(kwargs.get('shuffle_seed', 9176)), - shuffle_block_size=int(kwargs.get('shuffle_block_size', 1 << 18)), - ) - - dataloader = DataLoader(dataset) - for _ in range(_TRAIN_EPOCHS): + dataset_params = get_streaming_dataset_params(kwargs) + dataloader_params = get_dataloader_params(kwargs) + dataset = StreamingDataset(**dataset_params) + dataloader = DataLoader(dataset=dataset, **dataloader_params) + iter_time = [] + start_time = 0 + for epoch in range(args.epochs): + if epoch > 0: + start_time = time.time() for _ in dataloader: pass + if epoch > 0: + iter_time.append(time.time() - start_time) - if args.check_download and args.cloud_url is not None: - num_cloud_files = get_file_count(args.cloud_url) + if args.validate_files and dataset.streams[0].remote is not None: + num_remote_files = get_file_count(dataset.streams[0].remote) local_dir = dataset.streams[0].local num_local_files = len([ name for name in os.listdir(local_dir) if os.path.isfile(os.path.join(local_dir, name)) ]) - assert num_cloud_files == num_local_files + assert num_remote_files == num_local_files, f'Expected {num_remote_files} files, got {num_local_files}' + + # TODO: Assert the iteration time is within a certain threshold + if args.validate_iter_time: + print(f'Average iter time: {sum(iter_time) / len(iter_time)} secs.') if __name__ == '__main__': diff --git a/regression/iterate_data.yaml b/regression/iterate_data.yaml index a5254dfdb..677dd74e6 100644 --- a/regression/iterate_data.yaml +++ b/regression/iterate_data.yaml @@ -6,25 +6,41 @@ command: |- pip uninstall -y mosaicml-streaming cd streaming pip install -e '.[dev]' - composer -n 1 regression/synthetic_dataset.py --create --compression gz - composer -n 2 regression/iterate_data.py - composer -n 1 regression/synthetic_dataset.py --delete - composer -n 1 regression/synthetic_dataset.py --create --hashes sha1 xxh128 - composer -n 2 regression/iterate_data.py - composer -n 4 regression/iterate_data.py --local - composer -n 8 regression/iterate_data.py --download_retry 10 - composer -n 8 regression/iterate_data.py --download_timeout 120 - composer -n 8 regression/iterate_data.py --validate_hash sha1 - composer -n 8 regression/iterate_data.py --validate_hash xxh128 - composer -n 8 regression/iterate_data.py --keep_zip - composer -n 8 regression/iterate_data.py --predownload 1000 - composer -n 8 regression/iterate_data.py --cache_limit 64mb - composer -n 8 regression/iterate_data.py --num_canonical_nodes 16 - composer -n 8 regression/iterate_data.py --batch_size 1000 - composer -n 8 regression/iterate_data.py --shuffle --shuffle_algo py1b --shuffle_seed 12 --shuffle_block_size 10000 - composer -n 1 regression/synthetic_dataset.py --delete - -image: mosaicml/composer:0.15.0 + python regression/synthetic_dataset.py --create --name numberandsaydataset --out /tmp/streaming_dataset/ --num_samples 10_000 --size_limit 10240 + composer -n 2 regression/iterate_data.py --remote /tmp/streaming_dataset/ --batch_size 16 + composer -n 2 regression/iterate_data.py --remote /tmp/streaming_dataset/ --local /tmp/local_dataset/ --batch_size 32 + composer -n 4 regression/iterate_data.py --local /tmp/streaming_dataset/ + python regression/synthetic_dataset.py --delete --out /tmp/streaming_dataset/ + rm -rf /tmp/local_dataset/ + + python regression/synthetic_dataset.py --create --name numberandsaydataset --out /tmp/streaming_dataset_gz/ --hashes sha1,xxh128 --compression gz --num_samples 10_000 --size_limit 10240 + composer -n 2 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --batch_size 4 + composer -n 2 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --batch_size 1 + composer -n 4 regression/iterate_data.py --local /tmp/streaming_dataset_gz/ + rm -rf /tmp/local_dataset/ + + composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --download_retry 4 + rm -rf /tmp/local_dataset/ + + composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --download_timeout 120 + rm -rf /tmp/local_dataset/ + + composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --validate_hash sha1 + rm -rf /tmp/local_dataset/ + + composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --keep_zip + rm -rf /tmp/local_dataset/ + + composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --predownload 1000 + rm -rf /tmp/local_dataset/ + + composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --num_canonical_nodes 16 + rm -rf /tmp/local_dataset/ + + composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --shuffle_algo py1b --shuffle_seed 12 --shuffle_block_size 10000 + rm -rf /tmp/local_dataset/ + +image: mosaicml/composer:latest scheduling: resumable: true priority: medium diff --git a/regression/iteration_time.yaml b/regression/iteration_time.yaml new file mode 100644 index 000000000..ef2938bd6 --- /dev/null +++ b/regression/iteration_time.yaml @@ -0,0 +1,22 @@ +name: streaming-regression-test-iteration-time +compute: + gpus: 8 # Number of GPUs to use + # cluster: TODO # Name of the cluster to use for this run +command: |- + pip uninstall -y mosaicml-streaming + cd streaming + pip install -e '.[dev]' + python regression/synthetic_dataset.py --create --name imagedataset --out /tmp/streaming_dataset/ --num_samples 300 + composer -n 2 regression/iterate_data.py --remote /tmp/streaming_dataset/ --local /tmp/local_dataset/ --batch_size 32 + python regression/synthetic_dataset.py --delete --out /tmp/streaming_dataset/ + rm -rf /tmp/local_dataset/ + +image: mosaicml/composer:latest +scheduling: + resumable: true + priority: medium +integrations: + - integration_type: git_repo + git_repo: mosaicml/streaming + git_branch: main + ssh_clone: false diff --git a/regression/synthetic_dataset.py b/regression/synthetic_dataset.py index 93eb96018..9768deaaa 100644 --- a/regression/synthetic_dataset.py +++ b/regression/synthetic_dataset.py @@ -1,200 +1,295 @@ # Copyright 2023 MosaicML Streaming authors # SPDX-License-Identifier: Apache-2.0 -"""Create a toy dataset using MDSWriter for regression testing.""" +"""Create a dataset and save it to a directory.""" -import os -import random +import ast import shutil -import string +import sys import urllib.parse from argparse import ArgumentParser, Namespace -from typing import Union +from typing import Any, Dict, List, Sequence import numpy as np -import utils +import torch +from utils import delete_gcs, delete_oci, delete_s3, get_kwargs, get_writer_params -from streaming import MDSWriter +from streaming.base import MDSWriter -# Word representation of a number -_ONES = ('zero one two three four five six seven eight nine ten eleven twelve ' - 'thirteen fourteen fifteen sixteen seventeen eighteen nineteen').split() -_TENS = 'twenty thirty forty fifty sixty seventy eighty ninety'.split() - -_COLUMNS = { - 'number': 'int', - 'words': 'str', +_DATASET_MAP = { + 'sequencedataset': 'SequenceDataset', + 'numberandsaydataset': 'NumberAndSayDataset', + 'imagedataset': 'ImageDataset', } -def parse_args() -> Namespace: - """Parse command-line arguments. - - Returns: - Namespace: Command-line arguments. - """ - args = ArgumentParser() - args.add_argument('--cloud_url', type=str) - args.add_argument('--create', default=False, action='store_true') - args.add_argument('--delete', default=False, action='store_true') - args.add_argument( - '--compression', - type=str, - help='Compression or compression:level for MDSWriter.', - ) - args.add_argument( - '--hashes', - type=str, - nargs='+', - help='List of hash algorithms to apply to shard files for MDSWriter.', - ) - args.add_argument( - '--size_limit', - type=int, - default=1 << 26, - help=('Shard size limit, after which point to start a new shard for ' - 'MDSWriter. If ``None``, puts everything in one shard.'), - ) - args.add_argument('--num_samples', - type=int, - default=10000, - help='Number of samples to generate') - return args.parse_args() - - -def say(i: int) -> list[str]: - """Get the word form of a number. +class SequenceDataset: + """A Sequence dataset with incremental ID and a value with a multiple of 3. Args: - i (int): The number. - - Returns: - List[str]: The number in word form. + num_samples (int): number of samples. Defaults to 100. + column_names List[str]: A list of features' and target name. Defaults to ['id', 'sample']. """ - if i < 0: - return ['negative'] + say(-i) - elif i <= 19: - return [_ONES[i]] - elif i < 100: - return [_TENS[i // 10 - 2]] + ([_ONES[i % 10]] if i % 10 else []) - elif i < 1_000: - return [_ONES[i // 100], 'hundred'] + (say(i % 100) if i % 100 else []) - elif i < 1_000_000: - return (say(i // 1_000) + ['thousand'] + (say(i % 1_000) if i % 1_000 else [])) - elif i < 1_000_000_000: - return (say(i // 1_000_000) + ['million'] + (say(i % 1_000_000) if i % 1_000_000 else [])) - else: - assert False - - -def get_dataset(num_samples: int) -> list[dict[str, Union[int, str]]]: - """Generate a number-saying dataset of the given size. - Args: - num_samples (int): Number of samples. - - Returns: - list[dict[str, int | str]]: The two generated splits. - """ - numbers = [((np.random.random() < 0.8) * 2 - 1) * i for i in range(num_samples)] - samples = [] - for num in numbers: - words = ' '.join(say(num)) - sample = {'number': num, 'words': words} - samples.append(sample) - for num in range(num_samples): - sample = { - 'number': num, - 'words': ''.join([random.choice(string.ascii_lowercase) for _ in range(num_samples)]) + def __init__(self, num_samples: int = 100, column_names: List[str] = ['id', 'sample']) -> None: + self.num_samples = num_samples + self.column_encodings = ['str', 'int'] + self.column_sizes = [None, 8] + self.column_names = column_names + self._index = 0 + + def __len__(self) -> int: + return self.num_samples + + def __getitem__(self, index: int) -> Dict[str, Any]: + if index < self.num_samples: + return { + self.column_names[0]: f'{index:06}', + self.column_names[1]: 3 * index, + } + raise IndexError('Index out of bound') + + def __iter__(self): + return self + + def __next__(self) -> Dict[str, Any]: + if self._index >= self.num_samples: + raise StopIteration + id = f'{self._index:06}' + data = 3 * self._index + self._index += 1 + return { + self.column_names[0]: id, + self.column_names[1]: data, } - samples.append(sample) - return samples + def get_sample_in_bytes(self, index: int) -> Dict[str, Any]: + sample = self.__getitem__(index) + sample[self.column_names[0]] = sample[self.column_names[0]].encode('utf-8') + sample[self.column_names[1]] = np.int64(sample[self.column_names[1]]).tobytes() + return sample + + +class NumberAndSayDataset: + """Generate a synthetic number-saying dataset. -def delete_gcs(remote_dir: str) -> None: - """Delete a remote directory from gcs. + Converting a numbers from digits to words, for example, number 123 would spell as + `one hundred twenty three`. The numbers are generated randomly and it supports a number + up-to positive/negative approximately 99 Millions. Args: - remote_dir (str): Location of the remote directory. + num_samples (int): number of samples. Defaults to 100. + column_names List[str]: A list of features' and target name. Defaults to ['number', + 'words']. + seed (int): seed value for deterministic randomness. """ - from google.cloud.storage import Bucket, Client - service_account_path = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] - gcs_client = Client.from_service_account_json(service_account_path) - obj = urllib.parse.urlparse(remote_dir) + ones = ( + 'zero one two three four five six seven eight nine ten eleven twelve thirteen fourteen ' + + 'fifteen sixteen seventeen eighteen nineteen').split() + + tens = 'twenty thirty forty fifty sixty seventy eighty ninety'.split() + + def __init__(self, + num_samples: int = 100, + column_names: List[str] = ['number', 'words'], + seed: int = 987) -> None: + self.num_samples = num_samples + self.column_encodings = ['int', 'str'] + self.column_sizes = [8, None] + self.column_names = column_names + self._index = 0 + self.seed = seed + + def __len__(self) -> int: + return self.num_samples + + def _say(self, i: int) -> List[str]: + if i < 0: + return ['negative'] + self._say(-i) + elif i <= 19: + return [self.ones[i]] + elif i < 100: + return [self.tens[i // 10 - 2]] + ([self.ones[i % 10]] if i % 10 else []) + elif i < 1_000: + return [self.ones[i // 100], 'hundred'] + (self._say(i % 100) if i % 100 else []) + elif i < 1_000_000: + return self._say(i // 1_000) + ['thousand' + ] + (self._say(i % 1_000) if i % 1_000 else []) + elif i < 1_000_000_000: + return self._say( + i // 1_000_000) + ['million'] + (self._say(i % 1_000_000) if i % 1_000_000 else []) + else: + assert False + + def _get_number(self) -> int: + sign = (np.random.random() < 0.8) * 2 - 1 + mag = 10**np.random.uniform(1, 4) - 10 + return sign * int(mag**2) + + def __iter__(self): + return self + + def __next__(self) -> Dict[str, Any]: + if self._index >= self.num_samples: + raise StopIteration + number = self._get_number() + words = ' '.join(self._say(number)) + self._index += 1 + return { + self.column_names[0]: number, + self.column_names[1]: words, + } - bucket = Bucket(gcs_client, obj.netloc) - blobs = bucket.list_blobs(prefix=obj.path.lstrip('/')) + @property + def seed(self) -> int: + return self._seed - for blob in blobs: - blob.delete() + @seed.setter + def seed(self, value: int) -> None: + self._seed = value # pyright: ignore + np.random.seed(self._seed) -def delete_s3(remote_dir: str) -> None: - """Delete a remote directory from s3. +class ImageDataset: + """An Image dataset with values drawn from a normal distribution. Args: - remote_dir (str): Location of the remote directory. + num_samples (int): number of samples. Defaults to 100. + column_names List[str]: A list of features' and target name. Defaults to ['x']. + seed (int): seed value for deterministic randomness. + shape (Sequence[int]): shape of the image. Defaults to (3, 32, 32). """ - import boto3 - obj = urllib.parse.urlparse(remote_dir) + def __init__( + self, + num_samples: int = 100, + column_names: List[str] = ['x'], + seed: int = 987, + shape: Sequence[int] = (3, 32, 32), + ) -> None: + self.shape = shape + self.num_samples = num_samples + self.column_encodings = ['pkl'] + self.column_sizes = [None] + self.column_names = column_names + self.seed = seed + self._index = 0 + + def __len__(self) -> int: + return self.num_samples + + def __getitem__(self, index: int) -> Dict[str, Any]: + if index < self.num_samples: + return { + self.column_names[0]: torch.randn(self.num_samples, *self.shape), + } + raise IndexError(f'Index {index} out of bound for size {self.num_samples}') + + def __iter__(self): + return self + + def __next__(self) -> Dict[str, Any]: + if self._index >= self.num_samples: + raise StopIteration + x = torch.randn(self.num_samples, *self.shape) + self._index += 1 + return { + self.column_names[0]: x, + } + + @property + def seed(self) -> int: + return self._seed - s3 = boto3.resource('s3') - bucket = s3.Bucket(obj.netloc) - bucket.objects.filter(Prefix=obj.path.lstrip('/')).delete() + @seed.setter + def seed(self, value: int) -> None: + self._seed = value # pyright: ignore + torch.manual_seed(self._seed) -def delete_oci(remote_dir: str) -> None: - """Delete a remote directory from oci. +def get_dataset_params(kwargs: Dict[str, str]) -> Dict[str, Any]: + """Get the dataset parameters from command-line arguments. Args: - remote_dir (str): Location of the remote directory. - """ - import oci + kwargs (Dict[str, str]): Command-line arguments. - obj = urllib.parse.urlparse(remote_dir) + Returns: + Dict[str, Any]: Dataset parameters. + """ + dataset_params = {} + if 'num_samples' in kwargs: + dataset_params['num_samples'] = int(kwargs['num_samples']) + if 'seed' in kwargs: + dataset_params['seed'] = int(kwargs['seed']) + if 'shape' in kwargs: + dataset_params['shape'] = ast.literal_eval(kwargs['shape']) + if 'column_names' in kwargs: + dataset_params['column_names'] = ast.literal_eval(kwargs['column_names']) + return dataset_params + + +def parse_args() -> tuple[Namespace, dict[str, str]]: + """Parse command-line arguments. - config = oci.config.from_file() - oci_client = oci.object_storage.ObjectStorageClient( - config=config, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY) - namespace = oci_client.get_namespace().data - objects = oci_client.list_objects(namespace, obj.netloc, prefix=obj.path.lstrip('/')) + Returns: + tuple[Namespace, dict[str, str]]: Command-line arguments and named arguments. + """ + args = ArgumentParser() + args.add_argument( + '--name', + type=str, + default='SequenceDataset', + help='Dataset name. Supported: SequenceDataset, NumberAndSayDataset, ImageDataset', + ) + args.add_argument( + '--out', + type=str, + required=True, + help='Output dataset directory to store MDS shard files (local or remote)', + ) + # Create a mutually exclusive group to ensure only one can be specified at a time. + me_group = args.add_mutually_exclusive_group() + me_group.add_argument('--create', default=False, action='store_true', help='Create dataset') + me_group.add_argument('--delete', default=False, action='store_true', help='Delete dataset') - for filenames in objects.data.objects: - oci_client.delete_object(namespace, obj.netloc, filenames.name) + args, runtime_args = args.parse_known_args() + kwargs = {get_kwargs(k): v for k, v in zip(runtime_args[::2], runtime_args[1::2])} + return args, kwargs -def main(args: Namespace) -> None: - """Benchmark time taken to generate the epoch for a given dataset. +def main(args: Namespace, kwargs: Dict[str, str]) -> None: + """Create and delete a dataset. Args: - args (Namespace): Command-line arguments. + args (Namespace): Arguments. + kwargs (Dict[str, str]): Named arguments. """ - remote_dir = args.cloud_url if args.cloud_url is not None else utils.get_local_remote_dir() + dataset_params = get_dataset_params(kwargs) + writer_params = get_writer_params(kwargs) + if args.name.lower() not in _DATASET_MAP: + raise ValueError(f'Unsupported dataset {args.name}. Supported: {_DATASET_MAP.keys()}') + dataset = getattr(sys.modules[__name__], _DATASET_MAP[args.name.lower()])(**dataset_params) + columns = {name: dtype for name, dtype in zip(dataset.column_names, dataset.column_encodings)} + if args.create: - dataset = get_dataset(args.num_samples) - with MDSWriter( - out=remote_dir, - columns=_COLUMNS, - compression=args.compression, - hashes=args.hashes, - size_limit=args.size_limit, - ) as out: + with MDSWriter(out=args.out, columns=columns, **writer_params) as out: for sample in dataset: out.write(sample) if args.delete: - obj = urllib.parse.urlparse(remote_dir) + shutil.rmtree(args.out, ignore_errors=True) + obj = urllib.parse.urlparse(args.out) cloud = obj.scheme if cloud == '': - shutil.rmtree(remote_dir, ignore_errors=True) + shutil.rmtree(args.out, ignore_errors=True) elif cloud == 'gs': - delete_gcs(remote_dir) + delete_gcs(args.out) elif cloud == 's3': - delete_s3(remote_dir) + delete_s3(args.out) elif cloud == 'oci': - delete_oci(remote_dir) + delete_oci(args.out) if __name__ == '__main__': - main(parse_args()) + args, kwargs = parse_args() + main(args, kwargs) diff --git a/regression/utils.py b/regression/utils.py index efed4c349..30ac6b4d8 100644 --- a/regression/utils.py +++ b/regression/utils.py @@ -3,12 +3,183 @@ """Utility and helper functions for regression testing.""" +import logging import os -import tempfile +import urllib.parse +from typing import Any, Dict +logger = logging.getLogger(__name__) -def get_local_remote_dir() -> str: - """Get a local remote directory.""" - tmp_dir = tempfile.gettempdir() - tmp_remote_dir = os.path.join(tmp_dir, 'regression_remote') - return tmp_remote_dir + +def get_kwargs(kwargs: str) -> str: + """Parse key of named command-line arguments. + + Args: + kwargs (str): Command-line arguments. + + Returns: + str: Key of named arguments. + """ + if kwargs.startswith('--'): + kwargs = kwargs[2:] + kwargs = kwargs.replace('-', '_') + return kwargs + + +def get_streaming_dataset_params(kwargs: Dict[str, str]) -> Dict[str, Any]: + """Get the streaming dataset parameters from command-line arguments. + + Args: + kwargs (Dict[str, str]): Command-line arguments. + + Returns: + Dict[str, Any]: Dataset parameters. + """ + dataset_params = {} + if 'remote' in kwargs: + dataset_params['remote'] = kwargs['remote'] + if 'local' in kwargs: + dataset_params['local'] = kwargs['local'] + if 'split' in kwargs: + dataset_params['split'] = kwargs['split'] + if 'download_retry' in kwargs: + dataset_params['download_retry'] = int(kwargs['download_retry']) + if 'download_timeout' in kwargs: + dataset_params['download_timeout'] = float(kwargs['download_timeout']) + if 'validate_hash' in kwargs: + dataset_params['validate_hash'] = kwargs['validate_hash'] + if 'keep_zip' in kwargs: + dataset_params['keep_zip'] = kwargs['keep_zip'].lower().capitalize() == 'True' + if 'epoch_size' in kwargs: + dataset_params['epoch_size'] = kwargs['epoch_size'] + if 'predownload' in kwargs: + dataset_params['predownload'] = int(kwargs['predownload']) + if 'cache_limit' in kwargs: + dataset_params['cache_limit'] = kwargs['cache_limit'] + if 'partition_algo' in kwargs: + dataset_params['partition_algo'] = kwargs['partition_algo'] + if 'num_canonical_nodes' in kwargs: + dataset_params['num_canonical_nodes'] = int(kwargs['num_canonical_nodes']) + if 'batch_size' in kwargs: + dataset_params['batch_size'] = int(kwargs['batch_size']) + if 'shuffle' in kwargs: + dataset_params['shuffle'] = kwargs['shuffle'].lower().capitalize() == 'True' + if 'shuffle_algo' in kwargs: + dataset_params['shuffle_algo'] = kwargs['shuffle_algo'] + if 'shuffle_seed' in kwargs: + dataset_params['shuffle_seed'] = int(kwargs['shuffle_seed']) + if 'shuffle_block_size' in kwargs: + dataset_params['shuffle_block_size'] = int(kwargs['shuffle_block_size']) + if 'sampling_method' in kwargs: + dataset_params['sampling_method'] = kwargs['sampling_method'] + if 'proportion' in kwargs: + dataset_params['proportion'] = float(kwargs['proportion']) + if 'repeat' in kwargs: + dataset_params['repeat'] = float(kwargs['repeat']) + if 'choose' in kwargs: + dataset_params['choose'] = int(kwargs['choose']) + logger.debug(f'dataset_params: {dataset_params}') + return dataset_params + + +def get_dataloader_params(kwargs: Dict[str, str]) -> Dict[str, Any]: + """Get the dataloader parameters from command-line arguments. + + Args: + kwargs (Dict[str, str]): Command-line arguments. + + Returns: + Dict[str, Any]: Dataloader parameters. + """ + dataloader_params = {} + if 'num_workers' in kwargs: + dataloader_params['num_workers'] = int(kwargs['num_workers']) + if 'batch_size' in kwargs: + dataloader_params['batch_size'] = int(kwargs['batch_size']) + if 'pin_memory' in kwargs: + dataloader_params['pin_memory'] = kwargs['pin_memory'].lower().capitalize() == 'True' + if 'persistent_workers' in kwargs: + dataloader_params['persistent_workers'] = kwargs['persistent_workers'].lower().capitalize( + ) == 'True' + logger.debug(f'dataloader_params: {dataloader_params}') + return dataloader_params + + +def get_writer_params(kwargs: Dict[str, str]) -> Dict[str, Any]: + """Get the writer parameters from command-line arguments. + + Args: + kwargs (Dict[str, str]): Command-line arguments. + + Returns: + Dict[str, Any]: Writer parameters. + """ + writer_params = {} + if 'keep_local' in kwargs: + writer_params['keep_local'] = kwargs['keep_local'].lower().capitalize() == 'True' + if 'compression' in kwargs: + writer_params['compression'] = kwargs['compression'] + if 'hashes' in kwargs: + writer_params['hashes'] = kwargs['hashes'].split(',') + if 'size_limit' in kwargs: + writer_params['size_limit'] = str(kwargs['size_limit']) + if 'progress_bar' in kwargs: + writer_params['progress_bar'] = kwargs['progress_bar'].lower().capitalize() == 'True' + if 'max_workers' in kwargs: + writer_params['max_workers'] = int(kwargs['max_workers']) + logger.debug(f'writer_params: {writer_params}') + return writer_params + + +def delete_gcs(remote_dir: str) -> None: + """Delete a remote directory from gcs. + + Args: + remote_dir (str): Location of the remote directory. + """ + from google.cloud.storage import Bucket, Client + + service_account_path = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] + gcs_client = Client.from_service_account_json(service_account_path) + obj = urllib.parse.urlparse(remote_dir) + + bucket = Bucket(gcs_client, obj.netloc) + blobs = bucket.list_blobs(prefix=obj.path.lstrip('/')) + + for blob in blobs: + blob.delete() + + +def delete_s3(remote_dir: str) -> None: + """Delete a remote directory from s3. + + Args: + remote_dir (str): Location of the remote directory. + """ + import boto3 + + obj = urllib.parse.urlparse(remote_dir) + + s3 = boto3.resource('s3') + bucket = s3.Bucket(obj.netloc) + bucket.objects.filter(Prefix=obj.path.lstrip('/')).delete() + + +def delete_oci(remote_dir: str) -> None: + """Delete a remote directory from oci. + + Args: + remote_dir (str): Location of the remote directory. + """ + import oci + + obj = urllib.parse.urlparse(remote_dir) + + config = oci.config.from_file() + oci_client = oci.object_storage.ObjectStorageClient( + config=config, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY) + namespace = oci_client.get_namespace().data + objects = oci_client.list_objects(namespace, obj.netloc, prefix=obj.path.lstrip('/')) + + for filenames in objects.data.objects: + oci_client.delete_object(namespace, obj.netloc, filenames.name) diff --git a/streaming/base/stream.py b/streaming/base/stream.py index 6458b145a..6ba897b17 100644 --- a/streaming/base/stream.py +++ b/streaming/base/stream.py @@ -146,8 +146,9 @@ def __init__(self, if get_local_rank() == 0: if os.path.exists(self.local): raise ValueError( - f'Could not create a local directory. Specify a local directory with the `local` value.' - ) + f'Could not create a temporary local directory {self.local} . Either ' + + f'delete the directory or specify a unique local directory with the ' + + f'`local` value.') os.makedirs(self.local) barrier() else: diff --git a/tests/test_laziness.py b/tests/test_laziness.py index 2b0570bcd..dfd7d4643 100644 --- a/tests/test_laziness.py +++ b/tests/test_laziness.py @@ -1,7 +1,6 @@ # Copyright 2023 MosaicML Streaming authors # SPDX-License-Identifier: Apache-2.0 -from shutil import rmtree from typing import Any, Tuple import pytest @@ -36,7 +35,6 @@ def three(remote: str, local: str): for i in range(dataset.num_samples): sample = dataset[i] assert sample['value'] == i - rmtree(local) def four(remote: str, local: str): diff --git a/tests/test_stream.py b/tests/test_stream.py index da406fc07..4172fddde 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -51,10 +51,6 @@ def test_local_exists(split: Optional[str]) -> None: def test_existing_local_raises_exception(monkeypatch: MonkeyPatch) -> None: local = tempfile.mkdtemp() monkeypatch.setattr(tempfile, 'gettempdir', lambda: local) - with pytest.raises( - ValueError, - match= - f'Could not create a local directory. Specify a local directory with the `local` value.' - ): + with pytest.raises(ValueError, match=f'Could not create a temporary local directory.*'): _ = Stream() shutil.rmtree(local, ignore_errors=True) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 9511d8547..268138739 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -382,7 +382,7 @@ def test_multiple_dataset_instantiation(local_remote_dir: Any, shuffle_seed: tup for batch in train_dataloader: train_sample_order.extend(batch['id'][:]) - shutil.rmtree(local_dir) + shutil.rmtree(local_dir, ignore_errors=True) del train_dataloader del train_dataset diff --git a/tests/test_upload.py b/tests/test_upload.py index 6216a1470..59154f8a7 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -100,7 +100,7 @@ def test_instantiation(self, mocked_requests: Mock, out: Any): mocked_requests.side_effect = None _ = S3Uploader(out=out) if not isinstance(out, str): - shutil.rmtree(out[0]) + shutil.rmtree(out[0], ignore_errors=True) @pytest.mark.parametrize('out', ['ss4://bucket/dir']) def test_invalid_remote_str(self, out: str): @@ -166,7 +166,7 @@ def test_instantiation(self, mocked_requests: Mock, out: Any): mocked_requests.side_effect = None _ = GCSUploader(out=out) if not isinstance(out, str): - shutil.rmtree(out[0]) + shutil.rmtree(out[0], ignore_errors=True) @pytest.mark.parametrize('out', ['gcs://bucket/dir']) @pytest.mark.usefixtures('gcs_hmac_credentials') @@ -257,7 +257,7 @@ def test_instantiation(self, mocked_requests: Mock, out: Any): mocked_requests.side_effect = None _ = AzureUploader(out=out) if not isinstance(out, str): - shutil.rmtree(out[0]) + shutil.rmtree(out[0], ignore_errors=True) @pytest.mark.parametrize('out', ['ss4://bucket/dir']) def test_invalid_remote_str(self, out: str): @@ -290,7 +290,7 @@ def test_instantiation(self, mocked_requests: Mock, out: Any): mocked_requests.side_effect = None _ = AzureDataLakeUploader(out=out) if not isinstance(out, str): - shutil.rmtree(out[0]) + shutil.rmtree(out[0], ignore_errors=True) @pytest.mark.parametrize('out', ['ss4://container/dir']) def test_invalid_remote_str(self, out: str): From 72367635713ba80648e87565c10a02dd025c82bd Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 7 Aug 2023 23:20:53 +0000 Subject: [PATCH 2/5] Bump pydantic from 1.10.11 to 2.1.1 (#366) --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 39735ef9f..d165e7dd5 100644 --- a/setup.py +++ b/setup.py @@ -75,7 +75,7 @@ 'yamllint==1.32.0', 'moto>=4.0,<5', 'fastapi==0.100.0', - 'pydantic==1.10.11', + 'pydantic==2.1.1', 'uvicorn==0.23.1', 'pytest-split==0.8.1', ] From a7f44695bb2725822cd3ea265ba655147997dc77 Mon Sep 17 00:00:00 2001 From: Karan Jariwala Date: Mon, 7 Aug 2023 18:19:07 -0700 Subject: [PATCH 3/5] Fixed CI test to perform proper directory cleanup (#369) * Fixed CI test to perform proper directory cleanup * Replaced cleanup with shutil.rmtree since python 3.9 does not support ignore_cleanup_errors args --- tests/common/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/common/utils.py b/tests/common/utils.py index 5c95ce1ea..135fa531a 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -24,7 +24,7 @@ def local_remote_dir() -> Any: mock_remote_dir = os.path.join(mock_dir.name, 'remote') yield mock_local_dir, mock_remote_dir finally: - mock_dir.cleanup() # pyright: ignore + shutil.rmtree(mock_dir.name, ignore_errors=True) # pyright: ignore @pytest.fixture(scope='function') @@ -37,7 +37,7 @@ def compressed_local_remote_dir() -> Any: mock_remote_dir = os.path.join(mock_dir.name, 'remote') yield mock_compressed_dir, mock_local_dir, mock_remote_dir finally: - mock_dir.cleanup() # pyright: ignore + shutil.rmtree(mock_dir.name, ignore_errors=True) # pyright: ignore def convert_to_mds(**kwargs: Any): From a301cd01b032d890549854faa0f61b3a017ed033 Mon Sep 17 00:00:00 2001 From: snarayan21 Date: Mon, 7 Aug 2023 19:43:18 -0700 Subject: [PATCH 4/5] version bump to 0.5.2 (#370) --- streaming/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/_version.py b/streaming/_version.py index 367c89fa7..199b46fcb 100644 --- a/streaming/_version.py +++ b/streaming/_version.py @@ -3,4 +3,4 @@ """The Streaming Version.""" -__version__ = '0.5.1' +__version__ = '0.5.2' From 46ea47a2e04faedceef8c3b3965bafd8e3896308 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 8 Aug 2023 21:06:20 +0000 Subject: [PATCH 5/5] Bump fastapi from 0.100.0 to 0.101.0 (#367) --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index d165e7dd5..aaae20f7a 100644 --- a/setup.py +++ b/setup.py @@ -74,7 +74,7 @@ 'toml==0.10.2', 'yamllint==1.32.0', 'moto>=4.0,<5', - 'fastapi==0.100.0', + 'fastapi==0.101.0', 'pydantic==2.1.1', 'uvicorn==0.23.1', 'pytest-split==0.8.1',