Skip to content

Commit

Permalink
Merge pull request #60 from kostaleonard/republication
Browse files Browse the repository at this point in the history
Republication
  • Loading branch information
kostaleonard authored Apr 10, 2022
2 parents 875077d + fc6923b commit 37bdab7
Show file tree
Hide file tree
Showing 14 changed files with 364 additions and 300 deletions.
1 change: 1 addition & 0 deletions mlops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

__all__ = [
'__version__',
'artifact',
'dataset',
'errors',
'model',
Expand Down
118 changes: 118 additions & 0 deletions mlops/artifact/versioned_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,32 @@

from abc import ABC, abstractmethod
from typing import Any
import os
import shutil
import json
from s3fs import S3FileSystem
from mlops.artifact.versioned_artifact_builder import VersionedArtifactBuilder


class VersionedArtifact(ABC):
"""Represents a versioned artifact (e.g., a dataset or model)."""

@property
@abstractmethod
def name(self) -> str:
"""Returns the artifact's name.
:return: The artifact's name.
"""

@property
@abstractmethod
def path(self) -> str:
"""Returns the local or remote path to the artifact.
:return: The local or remote path to the artifact.
"""

@property
@abstractmethod
def metadata_path(self) -> str:
Expand All @@ -17,6 +36,38 @@ def metadata_path(self) -> str:
:return: The local or remote path to the artifact's metadata.
"""

@property
@abstractmethod
def version(self) -> str:
"""Returns the artifact's version.
:return: The artifact's version.
"""

@property
@abstractmethod
def md5(self) -> str:
"""Returns the artifact's MD5 hash.
:return: The artifact's MD5 hash.
"""

def __eq__(self, other: 'VersionedArtifact') -> bool:
"""Returns True if the two objects have the same loaded MD5 hash code,
False otherwise.
:param other: The artifact with which to compare this object.
:return: True if the object MD5 hashes match.
"""
return self.md5 == other.md5

def __hash__(self) -> int:
"""Returns this object's hashcode based on the loaded MD5 hashcode.
:return: The object's hashcode based on the loaded MD5 hashcode.
"""
return hash(self.md5)

def update_metadata(self, updates: dict[str, Any]) -> None:
"""Updates the artifact's metadata with the new values.
Expand Down Expand Up @@ -48,3 +99,70 @@ def update_metadata(self, updates: dict[str, Any]) -> None:
else:
with open(self.metadata_path, 'w', encoding='utf-8') as outfile:
outfile.write(json.dumps(updated_metadata))

def republish(self, republication_path: str) -> str:
"""Saves the versioned artifact files to the given path. If the path
and appended version already exists, this operation will raise a
PublicationPathAlreadyExistsError.
:param republication_path: The path, either on the local filesystem or
in a cloud store such as S3, to which the artifact should be saved.
The version will be appended to this path as a subdirectory. An S3
path should be a URL of the form "s3://bucket-name/path/to/dir". It
is recommended to use this same path to publish all artifacts,
since it will prevent the user from creating two different
artifacts with the same version.
:return: The versioned artifact's publication path.
"""
if republication_path.startswith('s3://'):
return self._republish_to_s3(republication_path)
return self._republish_to_local(republication_path)

def _republish_to_local(self, republication_path: str) -> str:
"""Saves the versioned dataset files to the given path. If the path and
appended version already exists, this operation will raise a
PublicationPathAlreadyExistsError.
:param republication_path: The local path to which to publish the
versioned object.
:return: The versioned object's republication path.
"""
# pylint: disable=protected-access
publication_path = os.path.join(republication_path, self.version)
VersionedArtifactBuilder._make_publication_path_local(publication_path)
if self.path.startswith('s3://'):
fs = S3FileSystem()
fs.get(self.path, publication_path, recursive=True)
else:
shutil.copytree(self.path, publication_path, dirs_exist_ok=True)
return publication_path

def _republish_to_s3(self, republication_path: str) -> str:
"""Saves the versioned dataset files to the given path. If the path and
appended version already exists, this operation will raise a
PublicationPathAlreadyExistsError.
:param republication_path: The S3 path to which to publish the
versioned object.
:return: The versioned object's republication path.
"""
# pylint: disable=protected-access
publication_path = os.path.join(republication_path, self.version)
fs = S3FileSystem()
VersionedArtifactBuilder._make_publication_path_s3(
publication_path, fs)
if self.path.startswith('s3://'):
artifact_path_no_prefix = self.path.replace('s3://', '', 1)
copy_path_no_prefix = publication_path.replace('s3://', '', 1)
for current_path, _, filenames in fs.walk(self.path):
outfile_prefix = current_path.replace(artifact_path_no_prefix,
copy_path_no_prefix, 1)
for filename in filenames:
infile_path = os.path.join(current_path,
filename)
outfile_path = os.path.join(outfile_prefix,
filename)
fs.copy(infile_path, outfile_path)
else:
fs.put(self.path, publication_path, recursive=True)
return publication_path
56 changes: 56 additions & 0 deletions mlops/artifact/versioned_artifact_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Contains the VersionedArtifactBuilder class."""

from abc import ABC, abstractmethod
from typing import Any
from pathlib import Path
from s3fs import S3FileSystem
from mlops.errors import PublicationPathAlreadyExistsError


class VersionedArtifactBuilder(ABC):
"""Represents a versioned artifact builder."""
# pylint: disable=too-few-public-methods

@abstractmethod
def publish(self, path: str, *args: Any, **kwargs: Any) -> str:
"""Saves the versioned artifact files to the given path. If the path
and appended version already exists, this operation will raise a
PublicationPathAlreadyExistsError.
:param path: The path, either on the local filesystem or in a cloud
store such as S3, to which the artifact should be saved. The
version will be appended to this path as a subdirectory. An S3 path
should be a URL of the form "s3://bucket-name/path/to/dir". It is
recommended to use this same path to publish all artifacts of a
given type (e.g., datasets, models), since it will prevent the user
from creating two different artifacts with the same version.
:param args: Additional positional args.
:param kwargs: Keyword args.
:return: The versioned artifact's publication path.
"""

@staticmethod
def _make_publication_path_local(publication_path: str) -> None:
"""Creates the directories that compose the publication path.
:param publication_path: The path to which to publish the artifact.
"""
path_obj = Path(publication_path)
try:
path_obj.mkdir(parents=True, exist_ok=False)
except FileExistsError as err:
raise PublicationPathAlreadyExistsError from err

@staticmethod
def _make_publication_path_s3(publication_path: str,
fs: S3FileSystem) -> None:
"""Creates the directories that compose the publication path.
:param publication_path: The path to which to publish the artifact.
:param fs: The S3 filesystem object to interface with S3.
"""
# fs.mkdirs with exist_ok=False does not raise an error, so use ls.
if fs.ls(publication_path):
raise PublicationPathAlreadyExistsError
fs.mkdirs(publication_path)
6 changes: 4 additions & 2 deletions mlops/dataset/pathless_versioned_dataset_builder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Contains the PathlessVersionedDatasetBuilder class."""

from typing import List, Optional, Dict
from typing import List, Optional, Dict, Any
import numpy as np
from mlops.dataset.versioned_dataset_builder import VersionedDatasetBuilder, \
STRATEGY_LINK
Expand All @@ -27,10 +27,12 @@ def __init__(self,

def publish(self,
path: str,
*args: Any,
name: str = 'dataset',
version: Optional[str] = None,
dataset_copy_strategy: str = STRATEGY_LINK,
tags: Optional[List[str]] = None) -> str:
tags: Optional[List[str]] = None,
**kwargs: Any) -> str:
"""Saves the versioned dataset files to the given path. If the path and
appended version already exists, this operation will raise a
PublicationPathAlreadyExistsError.
Expand Down
64 changes: 30 additions & 34 deletions mlops/dataset/versioned_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import dill as pickle
import numpy as np
from s3fs import S3FileSystem
from mlops.republication import republication
from mlops.artifact.versioned_artifact import VersionedArtifact


Expand All @@ -19,7 +18,7 @@ def __init__(self, path: str) -> None:
store such as S3, from which the dataset should be loaded. An S3
path should be a URL of the form "s3://bucket-name/path/to/dir".
"""
self.path = path
self._path = path
self._metadata_path = os.path.join(path, 'meta.json')
if path.startswith('s3://'):
fs = S3FileSystem()
Expand All @@ -37,9 +36,6 @@ def __init__(self, path: str) -> None:
'r',
encoding='utf-8') as infile:
metadata = json.loads(infile.read())
self.name = metadata['name']
self.version = metadata['version']
self.md5 = metadata['hash']
# Get data processor.
with fs.open(os.path.join(path, 'data_processor.pkl'),
'rb') as infile:
Expand All @@ -60,13 +56,29 @@ def __init__(self, path: str) -> None:
'r',
encoding='utf-8') as infile:
metadata = json.loads(infile.read())
self.name = metadata['name']
self.version = metadata['version']
self.md5 = metadata['hash']
# Get data processor.
with open(os.path.join(path, 'data_processor.pkl'), 'rb') as infile:
processor = pickle.loads(infile.read(), ignore=True)
self.data_processor = processor
self._name = metadata['name']
self._version = metadata['version']
self._md5 = metadata['hash']

@property
def name(self) -> str:
"""Returns the artifact's name.
:return: The artifact's name.
"""
return self._name

@property
def path(self) -> str:
"""Returns the local or remote path to the artifact.
:return: The local or remote path to the artifact.
"""
return self._path

@property
def metadata_path(self) -> str:
Expand All @@ -76,34 +88,18 @@ def metadata_path(self) -> str:
"""
return self._metadata_path

def republish(self, path: str) -> str:
"""Saves the versioned dataset files to the given path. If the path and
appended version already exists, this operation will raise a
PublicationPathAlreadyExistsError.
:param path: The path, either on the local filesystem or in a cloud
store such as S3, to which the dataset should be saved. The version
will be appended to this path as a subdirectory. An S3 path
should be a URL of the form "s3://bucket-name/path/to/dir". It is
recommended to use this same path to publish all datasets, since it
will prevent the user from creating two different datasets with the
same version.
:return: The versioned dataset's publication path.
"""
return republication.republish(self.path, path, self.version)

def __eq__(self, other: 'VersionedDataset') -> bool:
"""Returns True if the two objects have the same loaded MD5 hash code,
False otherwise.
@property
def version(self) -> str:
"""Returns the artifact's version.
:param other: The dataset with which to compare this object.
:return: True if the object MD5 hashes match.
:return: The artifact's version.
"""
return self.md5 == other.md5
return self._version

def __hash__(self) -> int:
"""Returns this object's hashcode based on the loaded MD5 hashcode.
@property
def md5(self) -> str:
"""Returns the artifact's MD5 hash.
:return: The object's hashcode based on the loaded MD5 hashcode.
:return: The artifact's MD5 hash.
"""
return hash(self.md5)
return self._md5
Loading

0 comments on commit 37bdab7

Please sign in to comment.