Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added Apply/Get/List/Delete methods to project_metadata to BaseRegistry #4441

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protos/feast/registry/RegistryServer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ message GetInfraRequest {
}

message ListProjectMetadataRequest {
string project = 1;
optional string project = 1;
bool allow_cache = 2;
}

Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class FeastObjectNotFoundException(Exception):
pass


class ProjectMetadataNotFoundException(FeastObjectNotFoundException):
def __init__(self, project: str):
super().__init__(f"Project Metadata does not exist in project {project}")


class EntityNotFoundException(FeastObjectNotFoundException):
def __init__(self, name, project=None):
if project:
Expand Down
56 changes: 53 additions & 3 deletions sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,22 +545,72 @@ def list_validation_references(
"""
raise NotImplementedError

@abstractmethod
def apply_project_metadata(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make more sense for this to also accept ProjectMetadata object as an input similar to other registry resources? I know the object doesn't hold much information right now, but if we plan for it to be "updatable", an update should happen through this method, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not envisioning this as a updatable. At this point its create only. Fields we have for this are UUID and LastUpdateTimestamp. UUID should be constant. LastUpdateTimestamp should be update when any changes happen to any objects associated to the project.

self,
project: str,
commit: bool = True,
):
"""
Persist a project metadata with a new uuid

Args:
project: Feast project name
commit: Whether the change should be persisted immediately
"""
raise NotImplementedError

@abstractmethod
def delete_project_metadata(
self,
project: str,
commit: bool = True,
):
"""
Deletes a project metadata or raises ProjectMetadataNotFoundException exception if not found.

Args:
project: Feast project name
commit: Whether the change should be persisted immediately
"""
raise NotImplementedError

@abstractmethod
def list_project_metadata(
self, project: str, allow_cache: bool = False
self,
project: Optional[str],
allow_cache: bool = False,
) -> List[ProjectMetadata]:
"""
Retrieves project metadata
Retrieves project metadata if given project name otherwise all project metadata

Args:
project: Filter metadata based on project name
project: Filter metadata based on project name or None to retrieve all project metadata
allow_cache: Allow returning feature views from the cached registry

Returns:
List of project metadata
"""
raise NotImplementedError

@abstractmethod
def get_project_metadata(
self,
project: str,
allow_cache: bool = False,
) -> Optional[ProjectMetadata]:
"""
Retrieves project metadata if present otherwise None

Args:
project: Filter metadata based on project name
allow_cache: Allow returning feature views from the cached registry

Returns:
Get project metadata if project exists otherwise None
"""
raise NotImplementedError

@abstractmethod
def update_infra(self, infra: Infra, project: str, commit: bool = True):
"""
Expand Down
22 changes: 20 additions & 2 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,11 @@ def list_validation_references(
return self._list_validation_references(project, tags)

@abstractmethod
def _list_project_metadata(self, project: str) -> List[ProjectMetadata]:
def _list_project_metadata(self, project: Optional[str]) -> List[ProjectMetadata]:
pass

def list_project_metadata(
self, project: str, allow_cache: bool = False
self, project: Optional[str], allow_cache: bool = False
) -> List[ProjectMetadata]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
Expand All @@ -315,6 +315,24 @@ def list_project_metadata(
)
return self._list_project_metadata(project)

@abstractmethod
def _get_project_metadata(self, project: str) -> Optional[ProjectMetadata]:
pass

def get_project_metadata(
self, project: str, allow_cache: bool = False
) -> Optional[ProjectMetadata]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
project_metadata_proto = proto_registry_utils.get_project_metadata(
self.cached_registry_proto, project
)
if project_metadata_proto is None:
return None
else:
return ProjectMetadata.from_proto(project_metadata_proto)
return self._get_project_metadata(project)

@abstractmethod
def _get_infra(self, project: str) -> Infra:
pass
Expand Down
18 changes: 12 additions & 6 deletions sdk/python/feast/infra/registry/proto_registry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,19 @@ def list_validation_references(

@registry_proto_cache
def list_project_metadata(
registry_proto: RegistryProto, project: str
registry_proto: RegistryProto, project: Optional[str]
) -> List[ProjectMetadata]:
return [
ProjectMetadata.from_proto(project_metadata)
for project_metadata in registry_proto.project_metadata
if project_metadata.project == project
]
if project is None:
return [
ProjectMetadata.from_proto(project_metadata)
for project_metadata in registry_proto.project_metadata
]
else:
return [
ProjectMetadata.from_proto(project_metadata)
for project_metadata in registry_proto.project_metadata
if project_metadata.project == project
]


@registry_proto_cache_with_tags
Expand Down
77 changes: 73 additions & 4 deletions sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
FeatureServiceNotFoundException,
FeatureViewNotFoundException,
PermissionNotFoundException,
ProjectMetadataNotFoundException,
ValidationReferenceNotFound,
)
from feast.feature_service import FeatureService
Expand Down Expand Up @@ -214,9 +215,11 @@ def __init__(

self._registry_store = cls(registry_config, repo_path)
self.cached_registry_proto_ttl = timedelta(
seconds=registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
seconds=(
registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
)
)

def clone(self) -> "Registry":
Expand Down Expand Up @@ -787,14 +790,80 @@ def delete_validation_reference(self, name: str, project: str, commit: bool = Tr
return
raise ValidationReferenceNotFound(name, project=project)

def apply_project_metadata(self, project: str, commit: bool = True):
self._prepare_registry_for_changes(project)
assert self.cached_registry_proto
if commit:
self.commit()

def list_project_metadata(
self, project: str, allow_cache: bool = False
self, project: Optional[str], allow_cache: bool = False
) -> List[ProjectMetadata]:
registry_proto = self._get_registry_proto(
project=project, allow_cache=allow_cache
)
return proto_registry_utils.list_project_metadata(registry_proto, project)

def get_project_metadata(
self, project: str, allow_cache: bool = False
) -> Optional[ProjectMetadata]:
registry_proto = self._get_registry_proto(
project=project, allow_cache=allow_cache
)
return proto_registry_utils.list_project_metadata(registry_proto, project)

def delete_project_metadata(self, project: str, commit: bool = True):
self._prepare_registry_for_changes(project)
assert self.cached_registry_proto

for idx, project_metadata_proto in enumerate(
self.cached_registry_proto.project_metadata
):
if project_metadata_proto.project == project:
list_entities = self.list_entities(project)
list_feature_views = self.list_feature_views(project)
list_on_demand_feature_views = self.list_on_demand_feature_views(
project
)
list_stream_feature_views = self.list_stream_feature_views(project)
list_feature_services = self.list_feature_services(project)
list_data_sources = self.list_data_sources(project)
list_saved_datasets = self.list_saved_datasets(project)
list_validation_references = self.list_validation_references(project)
list_permissions = self.list_permissions(project)
for entity in list_entities:
self.delete_entity(entity.name, project, commit=False)
for feature_view in list_feature_views:
self.delete_feature_view(feature_view.name, project, commit=False)
for on_demand_feature_view in list_on_demand_feature_views:
self.delete_feature_view(
on_demand_feature_view.name, project, commit=False
)
for stream_feature_view in list_stream_feature_views:
self.delete_feature_view(
stream_feature_view.name, project, commit=False
)
for feature_service in list_feature_services:
self.delete_feature_service(
feature_service.name, project, commit=False
)
for data_source in list_data_sources:
self.delete_data_source(data_source.name, project, commit=False)
for saved_dataset in list_saved_datasets:
self.delete_saved_dataset(saved_dataset.name, project, commit=False)
for validation_reference in list_validation_references:
self.delete_validation_reference(
validation_reference.name, project, commit=False
)
for permission in list_permissions:
self.delete_permission(permission.name, project, commit=False)
del self.cached_registry_proto.project_metadata[idx]
if commit:
self.commit()
return

raise ProjectMetadataNotFoundException(project)

def commit(self):
"""Commits the state of the registry cache to the remote registry store."""
if self.cached_registry_proto:
Expand Down
41 changes: 27 additions & 14 deletions sdk/python/feast/infra/registry/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.permissions.auth.auth_type import AuthType
from feast.permissions.auth_model import (
AuthConfig,
NoAuthConfig,
)
from feast.permissions.auth_model import AuthConfig, NoAuthConfig
from feast.permissions.client.grpc_client_auth_interceptor import (
GrpcClientAuthHeaderInterceptor,
)
Expand Down Expand Up @@ -173,15 +170,17 @@ def apply_feature_view(
arg_name = "on_demand_feature_view"

request = RegistryServer_pb2.ApplyFeatureViewRequest(
feature_view=feature_view.to_proto()
if arg_name == "feature_view"
else None,
stream_feature_view=feature_view.to_proto()
if arg_name == "stream_feature_view"
else None,
on_demand_feature_view=feature_view.to_proto()
if arg_name == "on_demand_feature_view"
else None,
feature_view=(
feature_view.to_proto() if arg_name == "feature_view" else None
),
stream_feature_view=(
feature_view.to_proto() if arg_name == "stream_feature_view" else None
),
on_demand_feature_view=(
feature_view.to_proto()
if arg_name == "on_demand_feature_view"
else None
),
project=project,
commit=commit,
)
Expand Down Expand Up @@ -375,14 +374,28 @@ def list_validation_references(
]

def list_project_metadata(
self, project: str, allow_cache: bool = False
self, project: Optional[str], allow_cache: bool = False
) -> List[ProjectMetadata]:
request = RegistryServer_pb2.ListProjectMetadataRequest(
project=project, allow_cache=allow_cache
)
response = self.stub.ListProjectMetadata(request)
return [ProjectMetadata.from_proto(pm) for pm in response.project_metadata]

def apply_project_metadata(self, project: StrictStr, commit: bool = True):
# TODO: Add logic for applying project metadata
pass

def get_project_metadata(
self, project: StrictStr, allow_cache: bool = False
) -> Optional[ProjectMetadata]:
# TODO: Add logic for getting project metadata
pass

def delete_project_metadata(self, project: StrictStr, commit: bool = True):
# TODO: Add logic for deleting project metadata
pass

def update_infra(self, infra: Infra, project: str, commit: bool = True):
request = RegistryServer_pb2.UpdateInfraRequest(
infra=infra.to_proto(), project=project, commit=commit
Expand Down
Loading
Loading