Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API calls for getting all versions of entities #472

Merged
merged 3 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ public FeatureGroup getFeatureGroup(String name) throws FeatureStoreException, I
return getFeatureGroup(name, DEFAULT_VERSION);
}

/**
* Get a list of all versions of a feature group from the feature store.
*
* @param name the name of the feature group
* @return FeatureGroup
* @throws FeatureStoreException
* @throws IOException
*/
public scala.collection.Seq<FeatureGroup> getFeatureGroups(@NonNull String name)
throws FeatureStoreException, IOException {
return JavaConverters.asScalaBufferConverter(featureGroupApi.getFeatureGroups(this, name))
.asScala().toSeq();
}

/**
* Get a on-demand feature group object from the feature store.
*
Expand Down Expand Up @@ -124,6 +138,20 @@ public OnDemandFeatureGroup getOnDemandFeatureGroup(String name) throws FeatureS
return getOnDemandFeatureGroup(name, DEFAULT_VERSION);
}

/**
* Get a list of all versions of an on-demand feature group from the feature store.
*
* @param name the name of the feature group
* @return OnDemandFeatureGroup
* @throws FeatureStoreException
* @throws IOException
*/
public scala.collection.Seq<OnDemandFeatureGroup> getOnDemandFeatureGroups(@NonNull String name)
throws FeatureStoreException, IOException {
return JavaConverters.asScalaBufferConverter(featureGroupApi.getOnDemandFeatureGroups(this, name))
.asScala().toSeq();
}

public Dataset<Row> sql(String query) {
return SparkEngine.getInstance().sql(query);
}
Expand Down Expand Up @@ -194,7 +222,7 @@ public Expectation.ExpectationBuilder createExpectation() {
*/
public TrainingDataset getTrainingDataset(@NonNull String name, @NonNull Integer version)
throws FeatureStoreException, IOException {
return trainingDatasetApi.get(this, name, version);
return trainingDatasetApi.getTrainingDataset(this, name, version);
}

/**
Expand All @@ -211,6 +239,19 @@ public TrainingDataset getTrainingDataset(String name) throws FeatureStoreExcept
return getTrainingDataset(name, DEFAULT_VERSION);
}

/**
* Get all versions of a training dataset object from the selected feature store.
*
* @param name name of the training dataset
* @return TrainingDataset
* @throws FeatureStoreException
* @throws IOException
*/
public scala.collection.Seq<TrainingDataset> getTrainingDatasets(@NonNull String name)
throws FeatureStoreException, IOException {
return JavaConverters.asScalaBufferConverter(trainingDatasetApi.get(this, name, null)).asScala().toSeq();
}

public scala.collection.Seq<Expectation> createExpectations(scala.collection.Seq<Expectation> expectations)
throws FeatureStoreException, IOException {
List<Expectation> newExpectations = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH;
Expand All @@ -48,6 +49,14 @@ public class FeatureGroupApi {

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class);

public List<FeatureGroup> getFeatureGroups(FeatureStore featureStore, String fgName)
throws FeatureStoreException, IOException {
FeatureGroup[] offlineFeatureGroups =
getInternal(featureStore, fgName, null, FeatureGroup[].class);

return Arrays.asList(offlineFeatureGroups);
}

public FeatureGroup getFeatureGroup(FeatureStore featureStore, String fgName, Integer fgVersion)
throws IOException, FeatureStoreException {
FeatureGroup[] offlineFeatureGroups =
Expand All @@ -60,6 +69,14 @@ public FeatureGroup getFeatureGroup(FeatureStore featureStore, String fgName, In
return resultFg;
}

public List<OnDemandFeatureGroup> getOnDemandFeatureGroups(FeatureStore featureStore, String fgName)
throws FeatureStoreException, IOException {
OnDemandFeatureGroup[] offlineFeatureGroups =
getInternal(featureStore, fgName, null, OnDemandFeatureGroup[].class);

return Arrays.asList(offlineFeatureGroups);
}

public OnDemandFeatureGroup getOnDemandFeatureGroup(FeatureStore featureStore, String fgName, Integer fgVersion)
throws IOException, FeatureStoreException {
OnDemandFeatureGroup[] offlineFeatureGroups =
Expand All @@ -79,15 +96,18 @@ private <T> T getInternal(FeatureStore featureStore, String fgName, Integer fgVe
+ FeatureStoreApi.FEATURE_STORE_PATH
+ FEATURE_GROUP_PATH;

String uri = UriTemplate.fromTemplate(pathTemplate)
UriTemplate uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", featureStore.getProjectId())
.set("fsId", featureStore.getId())
.set("fgName", fgName)
.set("version", fgVersion)
.expand();
.set("fgName", fgName);

LOGGER.info("Sending metadata request: " + uri);
return hopsworksClient.handleRequest(new HttpGet(uri), fgType);
if (fgVersion != null) {
uri.set("version", fgVersion);
}
String uriString = uri.expand();

LOGGER.info("Sending metadata request: " + uriString);
return hopsworksClient.handleRequest(new HttpGet(uriString), fgType);
}

public OnDemandFeatureGroup save(OnDemandFeatureGroup onDemandFeatureGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH;
Expand All @@ -49,31 +50,40 @@ public class TrainingDatasetApi {

private static final Logger LOGGER = LoggerFactory.getLogger(TrainingDatasetApi.class);

public TrainingDataset get(FeatureStore featureStore, String tdName, Integer tdVersion)
public List<TrainingDataset> get(FeatureStore featureStore, String tdName, Integer tdVersion)
throws IOException, FeatureStoreException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = HopsworksClient.PROJECT_PATH
+ FeatureStoreApi.FEATURE_STORE_PATH
+ TRAINING_DATASET_PATH;

String uri = UriTemplate.fromTemplate(pathTemplate)
UriTemplate uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", featureStore.getProjectId())
.set("fsId", featureStore.getId())
.set("tdName", tdName)
.set("version", tdVersion)
.expand();

LOGGER.info("Sending metadata request: " + uri);
TrainingDataset[] trainingDatasets = hopsworksClient.handleRequest(new HttpGet(uri), TrainingDataset[].class);
.set("tdName", tdName);

if (tdVersion != null) {
uri.set("version", tdVersion);
}
String uriString = uri.expand();

LOGGER.info("Sending metadata request: " + uriString);
TrainingDataset[] trainingDatasets = hopsworksClient.handleRequest(new HttpGet(uriString), TrainingDataset[].class);

for (TrainingDataset td : trainingDatasets) {
td.setFeatureStore(featureStore);
td.getFeatures().stream()
.filter(f -> f.getFeaturegroup() != null)
.forEach(f -> f.getFeaturegroup().setFeatureStore(featureStore));
moritzmeister marked this conversation as resolved.
Show resolved Hide resolved
}
return Arrays.asList(trainingDatasets);
}

public TrainingDataset getTrainingDataset(FeatureStore featureStore, String tdName, Integer tdVersion)
throws IOException, FeatureStoreException {
// There can be only one single training dataset with a specific name and version in a feature store
// There has to be one otherwise an exception would have been thrown.
TrainingDataset resultTd = trainingDatasets[0];
resultTd.setFeatureStore(featureStore);
resultTd.getFeatures().stream()
.filter(f -> f.getFeaturegroup() != null)
.forEach(f -> f.getFeaturegroup().setFeatureStore(featureStore));
return resultTd;
return get(featureStore, tdName, tdVersion).get(0);
}

public TrainingDataset createTrainingDataset(TrainingDataset trainingDataset)
Expand Down
13 changes: 9 additions & 4 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,18 @@ def get(self, name, version, fg_type):
"featuregroups",
name,
]
query_params = {"version": version}
fg_json = _client._send_request("GET", path_params, query_params)[0]
query_params = None if version is None else {"version": version}
json_list = _client._send_request("GET", path_params, query_params)

if fg_type == self.CACHED:
return feature_group.FeatureGroup.from_response_json(fg_json)
fg_list = feature_group.FeatureGroup.from_response_json(json_list)
else:
return feature_group.OnDemandFeatureGroup.from_response_json(fg_json)
fg_list = feature_group.OnDemandFeatureGroup.from_response_json(json_list)

if version is not None:
return fg_list[0]
else:
return fg_list

def delete_content(self, feature_group_instance):
"""Delete the content of a feature group.
Expand Down
10 changes: 7 additions & 3 deletions python/hsfs/core/training_dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ def get(self, name, version):
"trainingdatasets",
name,
]
query_params = {"version": version}
return training_dataset.TrainingDataset.from_response_json(
_client._send_request("GET", path_params, query_params)[0],
query_params = None if version is None else {"version": version}
td_list = training_dataset.TrainingDataset.from_response_json(
_client._send_request("GET", path_params, query_params),
)
if version is not None:
return td_list[0]
else:
return td_list

def get_query(self, training_dataset_instance, with_label):
_client = client.get_instance()
Expand Down
12 changes: 7 additions & 5 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -1049,8 +1049,9 @@ def compute_statistics(self, wallclock_time: Optional[str] = None):
@classmethod
def from_response_json(cls, json_dict):
json_decamelized = humps.decamelize(json_dict)
_ = json_decamelized.pop("type", None)
return cls(**json_decamelized)
for fg in json_decamelized:
_ = fg.pop("type", None)
return [cls(**fg) for fg in json_decamelized]

def update_from_response_json(self, json_dict):
json_decamelized = humps.decamelize(json_dict)
Expand Down Expand Up @@ -1361,9 +1362,10 @@ def validate(self): # noqa: F821
@classmethod
def from_response_json(cls, json_dict):
json_decamelized = humps.decamelize(json_dict)
_ = json_decamelized.pop("online_topic_name", None)
_ = json_decamelized.pop("type", None)
return cls(**json_decamelized)
for fg in json_decamelized:
_ = fg.pop("online_topic_name", None)
_ = fg.pop("type", None)
return [cls(**fg) for fg in json_decamelized]

def update_from_response_json(self, json_dict):
json_decamelized = humps.decamelize(json_dict)
Expand Down
58 changes: 58 additions & 0 deletions python/hsfs/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,27 @@ def get_feature_group(self, name: str, version: int = None):
name, version, feature_group_api.FeatureGroupApi.CACHED
)

def get_feature_groups(self, name: str):
"""Get a list of all versions of a feature group entity from the feature store.

Getting a feature group from the Feature Store means getting its metadata handle
so you can subsequently read the data into a Spark or Pandas DataFrame or use
the `Query`-API to perform joins between feature groups.

# Arguments
name: Name of the feature group to get.

# Returns
`FeatureGroup`: List of feature group metadata objects.

# Raises
`RestAPIError`: If unable to retrieve feature group from the feature store.

"""
return self._feature_group_api.get(
name, None, feature_group_api.FeatureGroupApi.CACHED
)

def get_on_demand_feature_group(self, name: str, version: int = None):
"""Get a on-demand feature group entity from the feature store.

Expand Down Expand Up @@ -164,6 +185,26 @@ def get_on_demand_feature_group(self, name: str, version: int = None):
name, version, feature_group_api.FeatureGroupApi.ONDEMAND
)

def get_on_demand_feature_groups(self, name: str):
"""Get a list of all versions of an on-demand feature group entity from the feature store.

Getting a on-demand feature group from the Feature Store means getting its
metadata handle so you can subsequently read the data into a Spark or
Pandas DataFrame or use the `Query`-API to perform joins between feature groups.

# Arguments
name: Name of the on-demand feature group to get.

# Returns
`OnDemandFeatureGroup`: List of on-demand feature group metadata objects.

# Raises
`RestAPIError`: If unable to retrieve feature group from the feature store.
"""
return self._feature_group_api.get(
name, None, feature_group_api.FeatureGroupApi.ONDEMAND
)

def get_training_dataset(self, name: str, version: int = None):
"""Get a training dataset entity from the feature store.

Expand Down Expand Up @@ -192,6 +233,23 @@ def get_training_dataset(self, name: str, version: int = None):
version = self.DEFAULT_VERSION
return self._training_dataset_api.get(name, version)

def get_training_datasets(self, name: str):
"""Get a list of all versions of a training dataset entity from the feature store.

Getting a training dataset from the Feature Store means getting its metadata handle
so you can subsequently read the data into a Spark or Pandas DataFrame.

# Arguments
name: Name of the training dataset to get.

# Returns
`TrainingDataset`: List of training dataset metadata objects.

# Raises
`RestAPIError`: If unable to retrieve feature group from the feature store.
"""
return self._training_dataset_api.get(name, None)

def get_storage_connector(self, name: str):
"""Get a previously created storage connector from the feature store.

Expand Down
5 changes: 3 additions & 2 deletions python/hsfs/training_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,9 @@ def delete(self):
@classmethod
def from_response_json(cls, json_dict):
json_decamelized = humps.decamelize(json_dict)
_ = json_decamelized.pop("type")
return cls(**json_decamelized)
for td in json_decamelized:
_ = td.pop("type")
return [cls(**td) for td in json_decamelized]

def update_from_response_json(self, json_dict):
json_decamelized = humps.decamelize(json_dict)
Expand Down