Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add iteration time test as part of regression testing #358

Merged
merged 6 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions regression/cloud_providers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ command: |-
pip uninstall -y mosaicml-streaming
cd streaming
pip install -e '.[dev]'
composer -n 1 regression/synthetic_dataset.py --create --cloud_url $GS_CLOUD_URL --size_limit 4194304 # 4mb
composer -n 2 regression/iterate_data.py --cloud_url $GS_CLOUD_URL --check_download
composer -n 1 regression/synthetic_dataset.py --delete --cloud_url $GS_CLOUD_URL
composer -n 1 regression/synthetic_dataset.py --create --cloud_url $S3_URL --size_limit 134217728 # 128 mb
composer -n 4 regression/iterate_data.py --cloud_url $S3_URL --check_download --local
composer -n 1 regression/synthetic_dataset.py --delete --cloud_url $S3_URL
composer -n 1 regression/synthetic_dataset.py --create --cloud_url $OCI_URL --size_limit 268435456 # 256 mb
composer -n 8 regression/iterate_data.py --cloud_url $OCI_URL --check_download
composer -n 1 regression/synthetic_dataset.py --delete --cloud_url $OCI_URL
python regression/synthetic_dataset.py --create --name imagedataset --out $GCS_URL --size_limit 4194304 # 4mb
composer -n 2 regression/iterate_data.py --remote $GCS_URL --validate-files
python regression/synthetic_dataset.py --delete --out $GCS_URL

image: mosaicml/composer:0.15.0
python regression/synthetic_dataset.py --create --name imagedataset --out $S3_URL --num_samples 500 --size_limit 134217728 # 128 mb
composer -n 4 regression/iterate_data.py --remote $S3_URL --local /tmp/local_dataset/ --validate-files
python regression/synthetic_dataset.py --delete --out $S3_URL

python regression/synthetic_dataset.py --create --name imagedataset --out $OCI_URL --num_samples 500 --size_limit 268435456 # 256 mb
composer -n 8 regression/iterate_data.py --remote $OCI_URL --validate-files
python regression/synthetic_dataset.py --delete --out $OCI_URL

image: mosaicml/composer:latest
scheduling:
resumable: true
priority: medium
Expand Down
107 changes: 41 additions & 66 deletions regression/iterate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,16 @@
"""Create a streaming dataset from toy data with various options for regression testing."""

import os
import tempfile
import time
import urllib.parse
from argparse import ArgumentParser, Namespace
from typing import Union

import utils
from torch.utils.data import DataLoader
from utils import get_dataloader_params, get_kwargs, get_streaming_dataset_params

from streaming import StreamingDataset

_TRAIN_EPOCHS = 2


def get_kwargs(key: str) -> str:
"""Parse key of named command-line arguments.

Returns:
str: Key of named arguments.
"""
if key.startswith('--'):
key = key[2:]
key = key.replace('-', '_')
return key


def parse_args() -> tuple[Namespace, dict[str, str]]:
"""Parse command-line arguments.
Expand All @@ -35,38 +22,33 @@ def parse_args() -> tuple[Namespace, dict[str, str]]:
tuple(Namespace, dict[str, str]): Command-line arguments and named arguments.
"""
args = ArgumentParser()
args.add_argument('--cloud_url', type=str)
args.add_argument('--check_download', default=False, action='store_true')
args.add_argument('--local', default=False, action='store_true')
args.add_argument(
'--keep_zip',
default=False,
action='store_true',
help=('Whether to keep or delete the compressed form when decompressing'
' downloaded shards for StreamingDataset.'),
)
args.add_argument(
'--shuffle',
default=False,
action='store_true',
help=('Whether to iterate over the samples in randomized order for'
' StreamingDataset.'),
)
args.add_argument('--epochs',
type=int,
default=2,
help='Number of epochs to iterate over the data')
args.add_argument('--validate-files', default=False, action='store_true', help='Verify files')
args.add_argument('--validate-iter-time',
default=False,
action='store_true',
help='Test iter time')

args, runtime_args = args.parse_known_args()
kwargs = {get_kwargs(k): v for k, v in zip(runtime_args[::2], runtime_args[1::2])}
return args, kwargs


def get_file_count(cloud_url: str) -> int:
def get_file_count(remote: str) -> Union[int, None]:
"""Get the number of files in a remote directory.

Args:
cloud_url (str): Cloud provider url.
remote (str): Remote directory URL.
"""
obj = urllib.parse.urlparse(cloud_url)
obj = urllib.parse.urlparse(remote)
cloud = obj.scheme
files = []
if cloud == '':
return len(
[name for name in os.listdir(remote) if os.path.isfile(os.path.join(remote, name))])
if cloud == 'gs':
from google.cloud.storage import Bucket, Client

Expand All @@ -75,12 +57,14 @@ def get_file_count(cloud_url: str) -> int:

bucket = Bucket(gcs_client, obj.netloc)
files = bucket.list_blobs(prefix=obj.path.lstrip('/'))
return sum(1 for f in files if f.key[-1] != '/')
karan6181 marked this conversation as resolved.
Show resolved Hide resolved
elif cloud == 's3':
import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket(obj.netloc)
files = bucket.objects.filter(Prefix=obj.path.lstrip('/'))
return sum(1 for f in files if f.key[-1] != '/')
elif cloud == 'oci':
import oci

Expand All @@ -91,8 +75,9 @@ def get_file_count(cloud_url: str) -> int:
objects = oci_client.list_objects(namespace, obj.netloc, prefix=obj.path.lstrip('/'))

files = objects.data.objects

return sum(1 for _ in files)
return sum(1 for _ in files)
else:
raise ValueError(f'Unsupported remote directory prefix {cloud} in {remote}')


def main(args: Namespace, kwargs: dict[str, str]) -> None:
Expand All @@ -102,41 +87,31 @@ def main(args: Namespace, kwargs: dict[str, str]) -> None:
args (Namespace): Command-line arguments.
kwargs (dict): Named arguments.
"""
tmp_dir = tempfile.gettempdir()
tmp_download_dir = os.path.join(tmp_dir, 'test_iterate_data_download')
dataset = StreamingDataset(
remote=args.cloud_url if args.cloud_url is not None else utils.get_local_remote_dir(),
local=tmp_download_dir if args.local else None,
split=kwargs.get('split'),
download_retry=int(kwargs.get('download_retry', 2)),
download_timeout=float(kwargs.get('download_timeout', 60)),
validate_hash=kwargs.get('validate_hash'),
keep_zip=args.keep_zip,
epoch_size=int(kwargs['epoch_size']) if 'epoch_size' in kwargs else None,
predownload=int(kwargs['predownload']) if 'predownload' in kwargs else None,
cache_limit=kwargs.get('cache_limit'),
partition_algo=kwargs.get('partition_algo', 'orig'),
num_canonical_nodes=int(kwargs['num_canonical_nodes'])
if 'num_canonical_nodes' in kwargs else None,
batch_size=int(kwargs['batch_size']) if 'batch_size' in kwargs else None,
shuffle=args.shuffle,
shuffle_algo=kwargs.get('shuffle_algo', 'py1s'),
shuffle_seed=int(kwargs.get('shuffle_seed', 9176)),
shuffle_block_size=int(kwargs.get('shuffle_block_size', 1 << 18)),
)

dataloader = DataLoader(dataset)
for _ in range(_TRAIN_EPOCHS):
dataset_params = get_streaming_dataset_params(kwargs)
dataloader_params = get_dataloader_params(kwargs)
dataset = StreamingDataset(**dataset_params)
dataloader = DataLoader(dataset=dataset, **dataloader_params)
iter_time = []
start_time = 0
for epoch in range(args.epochs):
if epoch > 0:
start_time = time.time()
for _ in dataloader:
pass
if epoch > 0:
iter_time.append(time.time() - start_time)

if args.check_download and args.cloud_url is not None:
num_cloud_files = get_file_count(args.cloud_url)
if args.validate_files and dataset.streams[0].remote is not None:
num_remote_files = get_file_count(dataset.streams[0].remote)
local_dir = dataset.streams[0].local
num_local_files = len([
name for name in os.listdir(local_dir) if os.path.isfile(os.path.join(local_dir, name))
])
assert num_cloud_files == num_local_files
assert num_remote_files == num_local_files, f'Expected {num_remote_files} files, got {num_local_files}'

# TODO: Assert the iteration time is within a certain threshold
if args.validate_iter_time:
print(f'Average iter time: {sum(iter_time) / len(iter_time)} secs.')


if __name__ == '__main__':
Expand Down
54 changes: 35 additions & 19 deletions regression/iterate_data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,41 @@ command: |-
pip uninstall -y mosaicml-streaming
cd streaming
pip install -e '.[dev]'
composer -n 1 regression/synthetic_dataset.py --create --compression gz
composer -n 2 regression/iterate_data.py
composer -n 1 regression/synthetic_dataset.py --delete
composer -n 1 regression/synthetic_dataset.py --create --hashes sha1 xxh128
composer -n 2 regression/iterate_data.py
composer -n 4 regression/iterate_data.py --local
composer -n 8 regression/iterate_data.py --download_retry 10
composer -n 8 regression/iterate_data.py --download_timeout 120
composer -n 8 regression/iterate_data.py --validate_hash sha1
composer -n 8 regression/iterate_data.py --validate_hash xxh128
composer -n 8 regression/iterate_data.py --keep_zip
composer -n 8 regression/iterate_data.py --predownload 1000
composer -n 8 regression/iterate_data.py --cache_limit 64mb
composer -n 8 regression/iterate_data.py --num_canonical_nodes 16
composer -n 8 regression/iterate_data.py --batch_size 1000
composer -n 8 regression/iterate_data.py --shuffle --shuffle_algo py1b --shuffle_seed 12 --shuffle_block_size 10000
composer -n 1 regression/synthetic_dataset.py --delete

image: mosaicml/composer:0.15.0
python regression/synthetic_dataset.py --create --name numberandsaydataset --out /tmp/streaming_dataset/ --num_samples 10_000 --size_limit 10240
composer -n 2 regression/iterate_data.py --remote /tmp/streaming_dataset/ --batch_size 16
composer -n 2 regression/iterate_data.py --remote /tmp/streaming_dataset/ --local /tmp/local_dataset/ --batch_size 32
composer -n 4 regression/iterate_data.py --local /tmp/streaming_dataset/
python regression/synthetic_dataset.py --delete --out /tmp/streaming_dataset/
rm -rf /tmp/local_dataset/

python regression/synthetic_dataset.py --create --name numberandsaydataset --out /tmp/streaming_dataset_gz/ --hashes sha1,xxh128 --compression gz --num_samples 10_000 --size_limit 10240
composer -n 2 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --batch_size 4
composer -n 2 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --batch_size 1
composer -n 4 regression/iterate_data.py --local /tmp/streaming_dataset_gz/
rm -rf /tmp/local_dataset/

composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --download_retry 4
rm -rf /tmp/local_dataset/

composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --download_timeout 120
rm -rf /tmp/local_dataset/

composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --validate_hash sha1
rm -rf /tmp/local_dataset/

composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --keep_zip
rm -rf /tmp/local_dataset/

composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --predownload 1000
rm -rf /tmp/local_dataset/

composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --num_canonical_nodes 16
rm -rf /tmp/local_dataset/

composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ --shuffle_algo py1b --shuffle_seed 12 --shuffle_block_size 10000
rm -rf /tmp/local_dataset/

image: mosaicml/composer:latest
scheduling:
resumable: true
priority: medium
Expand Down
22 changes: 22 additions & 0 deletions regression/iteration_time.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: streaming-regression-test-iteration-time
compute:
gpus: 8 # Number of GPUs to use
# cluster: TODO # Name of the cluster to use for this run
command: |-
pip uninstall -y mosaicml-streaming
cd streaming
pip install -e '.[dev]'
python regression/synthetic_dataset.py --create --name imagedataset --out /tmp/streaming_dataset/ --num_samples 300
composer -n 2 regression/iterate_data.py --remote /tmp/streaming_dataset/ --local /tmp/local_dataset/ --batch_size 32
python regression/synthetic_dataset.py --delete --out /tmp/streaming_dataset/
rm -rf /tmp/local_dataset/

image: mosaicml/composer:latest
scheduling:
resumable: true
priority: medium
integrations:
- integration_type: git_repo
git_repo: mosaicml/streaming
git_branch: main
ssh_clone: false
Loading