Skip to content

Commit

Permalink
API calls for getting all versions of entities (#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
moritzmeister authored and SirOibaf committed Sep 16, 2021
1 parent e2997af commit 8ecbec5
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 35 deletions.
43 changes: 42 additions & 1 deletion java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ public FeatureGroup getFeatureGroup(String name) throws FeatureStoreException, I
return getFeatureGroup(name, DEFAULT_VERSION);
}

/**
* Get a list of all versions of a feature group from the feature store.
*
* @param name the name of the feature group
* @return FeatureGroup
* @throws FeatureStoreException
* @throws IOException
*/
public scala.collection.Seq<FeatureGroup> getFeatureGroups(@NonNull String name)
throws FeatureStoreException, IOException {
return JavaConverters.asScalaBufferConverter(featureGroupApi.getFeatureGroups(this, name))
.asScala().toSeq();
}

/**
* Get a on-demand feature group object from the feature store.
*
Expand Down Expand Up @@ -124,6 +138,20 @@ public OnDemandFeatureGroup getOnDemandFeatureGroup(String name) throws FeatureS
return getOnDemandFeatureGroup(name, DEFAULT_VERSION);
}

/**
* Get a list of all versions of an on-demand feature group from the feature store.
*
* @param name the name of the feature group
* @return OnDemandFeatureGroup
* @throws FeatureStoreException
* @throws IOException
*/
public scala.collection.Seq<OnDemandFeatureGroup> getOnDemandFeatureGroups(@NonNull String name)
throws FeatureStoreException, IOException {
return JavaConverters.asScalaBufferConverter(featureGroupApi.getOnDemandFeatureGroups(this, name))
.asScala().toSeq();
}

public Dataset<Row> sql(String query) {
return SparkEngine.getInstance().sql(query);
}
Expand Down Expand Up @@ -194,7 +222,7 @@ public Expectation.ExpectationBuilder createExpectation() {
*/
public TrainingDataset getTrainingDataset(@NonNull String name, @NonNull Integer version)
throws FeatureStoreException, IOException {
return trainingDatasetApi.get(this, name, version);
return trainingDatasetApi.getTrainingDataset(this, name, version);
}

/**
Expand All @@ -211,6 +239,19 @@ public TrainingDataset getTrainingDataset(String name) throws FeatureStoreExcept
return getTrainingDataset(name, DEFAULT_VERSION);
}

/**
* Get all versions of a training dataset object from the selected feature store.
*
* @param name name of the training dataset
* @return TrainingDataset
* @throws FeatureStoreException
* @throws IOException
*/
public scala.collection.Seq<TrainingDataset> getTrainingDatasets(@NonNull String name)
throws FeatureStoreException, IOException {
return JavaConverters.asScalaBufferConverter(trainingDatasetApi.get(this, name, null)).asScala().toSeq();
}

public scala.collection.Seq<Expectation> createExpectations(scala.collection.Seq<Expectation> expectations)
throws FeatureStoreException, IOException {
List<Expectation> newExpectations = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH;
Expand All @@ -48,6 +49,14 @@ public class FeatureGroupApi {

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class);

public List<FeatureGroup> getFeatureGroups(FeatureStore featureStore, String fgName)
throws FeatureStoreException, IOException {
FeatureGroup[] offlineFeatureGroups =
getInternal(featureStore, fgName, null, FeatureGroup[].class);

return Arrays.asList(offlineFeatureGroups);
}

public FeatureGroup getFeatureGroup(FeatureStore featureStore, String fgName, Integer fgVersion)
throws IOException, FeatureStoreException {
FeatureGroup[] offlineFeatureGroups =
Expand All @@ -60,6 +69,14 @@ public FeatureGroup getFeatureGroup(FeatureStore featureStore, String fgName, In
return resultFg;
}

public List<OnDemandFeatureGroup> getOnDemandFeatureGroups(FeatureStore featureStore, String fgName)
throws FeatureStoreException, IOException {
OnDemandFeatureGroup[] offlineFeatureGroups =
getInternal(featureStore, fgName, null, OnDemandFeatureGroup[].class);

return Arrays.asList(offlineFeatureGroups);
}

public OnDemandFeatureGroup getOnDemandFeatureGroup(FeatureStore featureStore, String fgName, Integer fgVersion)
throws IOException, FeatureStoreException {
OnDemandFeatureGroup[] offlineFeatureGroups =
Expand All @@ -79,15 +96,18 @@ private <T> T getInternal(FeatureStore featureStore, String fgName, Integer fgVe
+ FeatureStoreApi.FEATURE_STORE_PATH
+ FEATURE_GROUP_PATH;

String uri = UriTemplate.fromTemplate(pathTemplate)
UriTemplate uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", featureStore.getProjectId())
.set("fsId", featureStore.getId())
.set("fgName", fgName)
.set("version", fgVersion)
.expand();
.set("fgName", fgName);

LOGGER.info("Sending metadata request: " + uri);
return hopsworksClient.handleRequest(new HttpGet(uri), fgType);
if (fgVersion != null) {
uri.set("version", fgVersion);
}
String uriString = uri.expand();

LOGGER.info("Sending metadata request: " + uriString);
return hopsworksClient.handleRequest(new HttpGet(uriString), fgType);
}

public OnDemandFeatureGroup save(OnDemandFeatureGroup onDemandFeatureGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH;
Expand All @@ -49,31 +50,40 @@ public class TrainingDatasetApi {

private static final Logger LOGGER = LoggerFactory.getLogger(TrainingDatasetApi.class);

public TrainingDataset get(FeatureStore featureStore, String tdName, Integer tdVersion)
public List<TrainingDataset> get(FeatureStore featureStore, String tdName, Integer tdVersion)
throws IOException, FeatureStoreException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = HopsworksClient.PROJECT_PATH
+ FeatureStoreApi.FEATURE_STORE_PATH
+ TRAINING_DATASET_PATH;

String uri = UriTemplate.fromTemplate(pathTemplate)
UriTemplate uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", featureStore.getProjectId())
.set("fsId", featureStore.getId())
.set("tdName", tdName)
.set("version", tdVersion)
.expand();

LOGGER.info("Sending metadata request: " + uri);
TrainingDataset[] trainingDatasets = hopsworksClient.handleRequest(new HttpGet(uri), TrainingDataset[].class);
.set("tdName", tdName);

if (tdVersion != null) {
uri.set("version", tdVersion);
}
String uriString = uri.expand();

LOGGER.info("Sending metadata request: " + uriString);
TrainingDataset[] trainingDatasets = hopsworksClient.handleRequest(new HttpGet(uriString), TrainingDataset[].class);

for (TrainingDataset td : trainingDatasets) {
td.setFeatureStore(featureStore);
td.getFeatures().stream()
.filter(f -> f.getFeaturegroup() != null)
.forEach(f -> f.getFeaturegroup().setFeatureStore(featureStore));
}
return Arrays.asList(trainingDatasets);
}

public TrainingDataset getTrainingDataset(FeatureStore featureStore, String tdName, Integer tdVersion)
throws IOException, FeatureStoreException {
// There can be only one single training dataset with a specific name and version in a feature store
// There has to be one otherwise an exception would have been thrown.
TrainingDataset resultTd = trainingDatasets[0];
resultTd.setFeatureStore(featureStore);
resultTd.getFeatures().stream()
.filter(f -> f.getFeaturegroup() != null)
.forEach(f -> f.getFeaturegroup().setFeatureStore(featureStore));
return resultTd;
return get(featureStore, tdName, tdVersion).get(0);
}

public TrainingDataset createTrainingDataset(TrainingDataset trainingDataset)
Expand Down
13 changes: 9 additions & 4 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,18 @@ def get(self, name, version, fg_type):
"featuregroups",
name,
]
query_params = {"version": version}
fg_json = _client._send_request("GET", path_params, query_params)[0]
query_params = None if version is None else {"version": version}
json_list = _client._send_request("GET", path_params, query_params)

if fg_type == self.CACHED:
return feature_group.FeatureGroup.from_response_json(fg_json)
fg_list = feature_group.FeatureGroup.from_response_json(json_list)
else:
return feature_group.OnDemandFeatureGroup.from_response_json(fg_json)
fg_list = feature_group.OnDemandFeatureGroup.from_response_json(json_list)

if version is not None:
return fg_list[0]
else:
return fg_list

def delete_content(self, feature_group_instance):
"""Delete the content of a feature group.
Expand Down
10 changes: 7 additions & 3 deletions python/hsfs/core/training_dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ def get(self, name, version):
"trainingdatasets",
name,
]
query_params = {"version": version}
return training_dataset.TrainingDataset.from_response_json(
_client._send_request("GET", path_params, query_params)[0],
query_params = None if version is None else {"version": version}
td_list = training_dataset.TrainingDataset.from_response_json(
_client._send_request("GET", path_params, query_params),
)
if version is not None:
return td_list[0]
else:
return td_list

def get_query(self, training_dataset_instance, with_label):
_client = client.get_instance()
Expand Down
12 changes: 7 additions & 5 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,8 +1040,9 @@ def compute_statistics(self, wallclock_time: Optional[str] = None):
@classmethod
def from_response_json(cls, json_dict):
json_decamelized = humps.decamelize(json_dict)
_ = json_decamelized.pop("type", None)
return cls(**json_decamelized)
for fg in json_decamelized:
_ = fg.pop("type", None)
return [cls(**fg) for fg in json_decamelized]

def update_from_response_json(self, json_dict):
json_decamelized = humps.decamelize(json_dict)
Expand Down Expand Up @@ -1351,9 +1352,10 @@ def validate(self): # noqa: F821
@classmethod
def from_response_json(cls, json_dict):
json_decamelized = humps.decamelize(json_dict)
_ = json_decamelized.pop("online_topic_name", None)
_ = json_decamelized.pop("type", None)
return cls(**json_decamelized)
for fg in json_decamelized:
_ = fg.pop("online_topic_name", None)
_ = fg.pop("type", None)
return [cls(**fg) for fg in json_decamelized]

def update_from_response_json(self, json_dict):
json_decamelized = humps.decamelize(json_dict)
Expand Down
58 changes: 58 additions & 0 deletions python/hsfs/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,27 @@ def get_feature_group(self, name: str, version: int = None):
name, version, feature_group_api.FeatureGroupApi.CACHED
)

def get_feature_groups(self, name: str):
"""Get a list of all versions of a feature group entity from the feature store.
Getting a feature group from the Feature Store means getting its metadata handle
so you can subsequently read the data into a Spark or Pandas DataFrame or use
the `Query`-API to perform joins between feature groups.
# Arguments
name: Name of the feature group to get.
# Returns
`FeatureGroup`: List of feature group metadata objects.
# Raises
`RestAPIError`: If unable to retrieve feature group from the feature store.
"""
return self._feature_group_api.get(
name, None, feature_group_api.FeatureGroupApi.CACHED
)

def get_on_demand_feature_group(self, name: str, version: int = None):
"""Get a on-demand feature group entity from the feature store.
Expand Down Expand Up @@ -164,6 +185,26 @@ def get_on_demand_feature_group(self, name: str, version: int = None):
name, version, feature_group_api.FeatureGroupApi.ONDEMAND
)

def get_on_demand_feature_groups(self, name: str):
"""Get a list of all versions of an on-demand feature group entity from the feature store.
Getting a on-demand feature group from the Feature Store means getting its
metadata handle so you can subsequently read the data into a Spark or
Pandas DataFrame or use the `Query`-API to perform joins between feature groups.
# Arguments
name: Name of the on-demand feature group to get.
# Returns
`OnDemandFeatureGroup`: List of on-demand feature group metadata objects.
# Raises
`RestAPIError`: If unable to retrieve feature group from the feature store.
"""
return self._feature_group_api.get(
name, None, feature_group_api.FeatureGroupApi.ONDEMAND
)

def get_training_dataset(self, name: str, version: int = None):
"""Get a training dataset entity from the feature store.
Expand Down Expand Up @@ -192,6 +233,23 @@ def get_training_dataset(self, name: str, version: int = None):
version = self.DEFAULT_VERSION
return self._training_dataset_api.get(name, version)

def get_training_datasets(self, name: str):
"""Get a list of all versions of a training dataset entity from the feature store.
Getting a training dataset from the Feature Store means getting its metadata handle
so you can subsequently read the data into a Spark or Pandas DataFrame.
# Arguments
name: Name of the training dataset to get.
# Returns
`TrainingDataset`: List of training dataset metadata objects.
# Raises
`RestAPIError`: If unable to retrieve feature group from the feature store.
"""
return self._training_dataset_api.get(name, None)

def get_storage_connector(self, name: str):
"""Get a previously created storage connector from the feature store.
Expand Down
5 changes: 3 additions & 2 deletions python/hsfs/training_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,9 @@ def delete(self):
@classmethod
def from_response_json(cls, json_dict):
json_decamelized = humps.decamelize(json_dict)
_ = json_decamelized.pop("type")
return cls(**json_decamelized)
for td in json_decamelized:
_ = td.pop("type")
return [cls(**td) for td in json_decamelized]

def update_from_response_json(self, json_dict):
json_decamelized = humps.decamelize(json_dict)
Expand Down

0 comments on commit 8ecbec5

Please sign in to comment.