Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Optimize SQL Registry proto() method #4425

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ message Registry {
message ProjectMetadata {
string project = 1;
string project_uuid = 2;
google.protobuf.Timestamp last_updated_timestamp = 3;
}
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):

if len(feature_view.entities) != len(feature_view.entity_columns):
warnings.warn(
f"There are some mismatches in your feature view's registered entities. Please check if you have applied your entities correctly."
f"There are some mismatches in your feature view: {feature_view.name} registered entities. Please check if you have applied your entities correctly."
f"Entities: {feature_view.entities} vs Entity Columns: {feature_view.entity_columns}"
)

Expand Down
68 changes: 43 additions & 25 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _utc_now
Expand All @@ -24,14 +25,14 @@

class CachingRegistry(BaseRegistry):
def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str):
self.cached_registry_proto = self.proto()
proto_registry_utils.init_project_metadata(self.cached_registry_proto, project)
self.cached_registry_proto_created = _utc_now()
self.cache_mode = cache_mode
self.cached_registry_proto = RegistryProto()
self._refresh_lock = Lock()
self.cached_registry_proto_ttl = timedelta(
seconds=cache_ttl_seconds if cache_ttl_seconds is not None else 0
)
self.cache_mode = cache_mode
self.cached_registry_proto = self.proto()
self.cached_registry_proto_created = _utc_now()
if cache_mode == "thread":
self._start_thread_async_refresh(cache_ttl_seconds)
atexit.register(self._exit_handler)
Expand Down Expand Up @@ -304,6 +305,25 @@ def list_project_metadata(
)
return self._list_project_metadata(project)

@abstractmethod
def _get_project_metadata(self, project: str) -> Optional[ProjectMetadata]:
pass

# TODO: get_project_metadata() needs to be added to BaseRegistry class
def get_project_metadata(
self, project: str, allow_cache: bool = False
) -> Optional[ProjectMetadata]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
project_metadata_proto = proto_registry_utils.get_project_metadata(
self.cached_registry_proto, project
)
if project_metadata_proto is None:
return None
else:
return ProjectMetadata.from_proto(project_metadata_proto)
return self._get_project_metadata(project)

@abstractmethod
def _get_infra(self, project: str) -> Infra:
pass
Expand All @@ -312,34 +332,32 @@ def get_infra(self, project: str, allow_cache: bool = False) -> Infra:
return self._get_infra(project)

def refresh(self, project: Optional[str] = None):
if project:
project_metadata = proto_registry_utils.get_project_metadata(
registry_proto=self.cached_registry_proto, project=project
)
if not project_metadata:
proto_registry_utils.init_project_metadata(
self.cached_registry_proto, project
)
self.cached_registry_proto = self.proto()
self.cached_registry_proto_created = _utc_now()

def _refresh_cached_registry_if_necessary(self):
if self.cache_mode == "sync":
with self._refresh_lock:
expired = (
self.cached_registry_proto is None
or self.cached_registry_proto_created is None
) or (
self.cached_registry_proto_ttl.total_seconds()
> 0 # 0 ttl means infinity
and (
_utc_now()
> (
self.cached_registry_proto_created
+ self.cached_registry_proto_ttl
if self.cached_registry_proto == RegistryProto():
# Avoids the need to refresh the registry when cache is not populated yet
# Specially during the __init__ phase
# proto() will populate the cache with project metadata if no objects are registered
expired = False
else:
expired = (
self.cached_registry_proto is None
or self.cached_registry_proto_created is None
) or (
self.cached_registry_proto_ttl.total_seconds()
> 0 # 0 ttl means infinity
and (
_utc_now()
> (
self.cached_registry_proto_created
+ self.cached_registry_proto_ttl
)
)
)
)
if expired:
logger.info("Registry cache expired, so refreshing")
self.refresh()
Expand All @@ -351,7 +369,7 @@ def _start_thread_async_refresh(self, cache_ttl_seconds):
self.registry_refresh_thread = threading.Timer(
cache_ttl_seconds, self._start_thread_async_refresh, [cache_ttl_seconds]
)
self.registry_refresh_thread.setDaemon(True)
self.registry_refresh_thread.daemon = True
self.registry_refresh_thread.start()

def _exit_handler(self):
Expand Down
Loading
Loading