-
Notifications
You must be signed in to change notification settings - Fork 442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(learning): feature_store & graph_store V1 #4237
Open
Yi-Eaaa
wants to merge
23
commits into
alibaba:main
Choose a base branch
from
Yi-Eaaa:PyG_Remote_Backend
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+550
−1
Open
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
861680c
feature_store & graph_store V1
Yi-Eaaa 9813f2d
GLTNeighborLoader & PygNeighborSampler
Yi-Eaaa e2b36a3
Update submodule to a valid commit
Yi-Eaaa 56260cb
Code formatting
Yi-Eaaa 181ad99
Code formatting
Yi-Eaaa c67648b
Code formatting
Yi-Eaaa b77e509
Code formatting
Yi-Eaaa 983f230
Code formatting
Yi-Eaaa 7a22983
Code formatting
Yi-Eaaa 6130280
Code formatting
Yi-Eaaa b55250e
feature_store function in Client
Yi-Eaaa 6cb7e2a
Code formatting
Yi-Eaaa 976650c
Code formatting
Yi-Eaaa d488f63
Support GsGraphStore
Yi-Eaaa f76f5bd
Code formatting
Yi-Eaaa ad1c7a5
Code formatting
Yi-Eaaa 1073a28
check feature_store & graph_store
Yi-Eaaa c065dd0
Learning with PyG remote backend
Yi-Eaaa ce3cc2f
Add annotion & Code Formatting
Yi-Eaaa 6e8086c
Revert "Add annotion & Code Formatting"
Yi-Eaaa e10b6c8
Test & Learning with PyG remote backend
Yi-Eaaa bba3a8d
pyg loader with graphscope remote backend
Yi-Eaaa d597602
Merge branch 'alibaba:main' into PyG_Remote_Backend
Yi-Eaaa File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Submodule graphlearn-for-pytorch
updated
from 6d7f31 to c303a8
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
import base64 | ||
import json | ||
from multiprocessing.reduction import ForkingPickler | ||
from typing import Dict | ||
from typing import List | ||
from typing import Optional | ||
from typing import Tuple | ||
from typing import Union | ||
|
||
import torch | ||
from torch import Tensor | ||
from torch_geometric.data import FeatureStore | ||
from torch_geometric.data import TensorAttr | ||
from torch_geometric.typing import FeatureTensorType | ||
|
||
from graphscope.learning.graphlearn_torch.data import DeviceGroup | ||
from graphscope.learning.graphlearn_torch.data import Feature | ||
|
||
KeyType = Tuple[Optional[str], Optional[str]] | ||
|
||
|
||
class GSFeatureStore(FeatureStore): | ||
def __init__(self, endpoints, handle=None, config=None, graph=None) -> None: | ||
super().__init__() | ||
# self.store: Dict[KeyType, Tuple[Tensor, Tensor]] = {} | ||
self.handle = handle | ||
self.config = config | ||
|
||
if config is not None: | ||
config = json.loads( | ||
base64.b64decode(config.encode("utf-8", errors="ignore")).decode( | ||
"utf-8", errors="ignore" | ||
) | ||
) | ||
self.edge_features = config["edge_features"] | ||
self.node_features = config["node_features"] | ||
self.node_labels = config["node_labels"] | ||
self.edges = config["edges"] | ||
|
||
self.endpoints = endpoints | ||
|
||
@staticmethod | ||
def key(attr: TensorAttr) -> KeyType: | ||
return (attr.group_name, attr.attr_name) | ||
|
||
def _put_tensor(self, tensor: FeatureTensorType, attr: TensorAttr) -> bool: | ||
r"""To be implemented by :class:`GSFeatureStore`.""" | ||
raise NotImplementedError | ||
|
||
def _get_tensor(self, attr: TensorAttr) -> Optional[Tensor]: | ||
r"""To be implemented by :class:`GSFeatureStore`.""" | ||
raise NotImplementedError | ||
|
||
def _remove_tensor(self, attr: TensorAttr) -> bool: | ||
r"""To be implemented by :class:`GSFeatureStore`.""" | ||
raise NotImplementedError | ||
|
||
def _get_tensor_size(self, attr: TensorAttr) -> Optional[Tuple[int, ...]]: | ||
if self.node_features is not None: | ||
node_tensor = self.node_features.get(attr.group_name) | ||
if node_tensor is not None: | ||
return [len(node_tensor)] | ||
if self.edge_features is not None: | ||
edge_tensor = self.edge_features.get(attr.group_name) | ||
if edge_tensor is not None: | ||
return [len(edge_tensor)] | ||
return None | ||
|
||
def get_all_tensor_attrs(self) -> List[TensorAttr]: | ||
TensorAttrList = [] | ||
if self.node_features is not None: | ||
for node_type, node_features in self.node_features.items(): | ||
for idx, node_feature in enumerate(node_features): | ||
TensorAttrList.append(TensorAttr(node_type, node_feature, torch.tensor([idx]))) | ||
if self.edge_features is not None: | ||
for edge_type, edge_features in self.edge_features.items(): | ||
for idx, edge_feature in enumerate(edge_features): | ||
TensorAttrList.append(TensorAttr(edge_type, edge_feature, torch.tensor([idx]))) | ||
return TensorAttrList | ||
|
||
def _build_features( | ||
self, | ||
feature_data, | ||
id2idx, | ||
split_ratio: Union[float, Dict[str, float]] = 0.0, | ||
device_group_list: Optional[List[DeviceGroup]] = None, | ||
device: Optional[int] = None, | ||
with_gpu: bool = True, | ||
dtype: Optional[torch.dtype] = None | ||
): | ||
r""" Build `Feature`s for node/edge feature data. | ||
""" | ||
if feature_data is not None: | ||
if isinstance(feature_data, dict): | ||
# heterogeneous. | ||
if not isinstance(split_ratio, dict): | ||
split_ratio = { | ||
graph_type: float(split_ratio) | ||
for graph_type in feature_data.keys() | ||
} | ||
|
||
if id2idx is not None: | ||
assert isinstance(id2idx, dict) | ||
else: | ||
id2idx = {} | ||
|
||
features = {} | ||
for graph_type, feat in feature_data.items(): | ||
features[graph_type] = Feature( | ||
feat, id2idx.get(graph_type, None), | ||
split_ratio.get(graph_type, 0.0), | ||
device_group_list, device, with_gpu, | ||
dtype if dtype is not None else feat.dtype | ||
) | ||
else: | ||
# homogeneous. | ||
features = Feature( | ||
feature_data, id2idx, float(split_ratio), | ||
device_group_list, device, with_gpu, | ||
dtype if dtype is not None else feature_data.dtype | ||
) | ||
else: | ||
features = None | ||
|
||
return features | ||
|
||
@classmethod | ||
def from_ipc_handle(cls, ipc_handle): | ||
return cls(*ipc_handle) | ||
|
||
def share_ipc(self): | ||
ipc_hanlde = ( | ||
list(self.endpoints), self.handle, self.config | ||
) | ||
return ipc_hanlde | ||
|
||
|
||
## Pickling Registration | ||
|
||
def rebuild_featurestore(ipc_handle): | ||
fs = GSFeatureStore.from_ipc_handle(ipc_handle) | ||
return fs | ||
|
||
def reduce_featurestore(FeatureStore: GSFeatureStore): | ||
ipc_handle = FeatureStore.share_ipc() | ||
return (rebuild_featurestore, (ipc_handle, )) | ||
|
||
ForkingPickler.register(GSFeatureStore, reduce_featurestore) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do these methods
_get_tensor_size
get_all_tensor_attrs
_build_features
work?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TensorAttr stores various attributes that uniquely represent the vertex/edge feature.
_get_tensor_size gets the length of the vertex feature corresponding to TensorAttr.
get_all_tensor_attrs retrieves all TensorAttrs that exist in the FeatureStore.
_build_features is what I referred to in dist_data earlier, it doesn't work here, I forgot to delete it.