From ea935f9c29c1d6554a92dbdb89b53978f8332433 Mon Sep 17 00:00:00 2001 From: jxy <865526875@qq.com> Date: Mon, 3 Jun 2024 03:20:09 +0000 Subject: [PATCH 1/4] add feature_store & graph_store --- gammagl/data/feature_store.py | 516 ++++++++++++++++++++++++++++++++++ gammagl/data/graph_store.py | 360 ++++++++++++++++++++++++ gammagl/utils/mixin.py | 22 ++ 3 files changed, 898 insertions(+) create mode 100644 gammagl/data/feature_store.py create mode 100644 gammagl/data/graph_store.py create mode 100644 gammagl/utils/mixin.py diff --git a/gammagl/data/feature_store.py b/gammagl/data/feature_store.py new file mode 100644 index 00000000..bacc83a2 --- /dev/null +++ b/gammagl/data/feature_store.py @@ -0,0 +1,516 @@ +r"""This class defines the abstraction for a backend-agnostic feature store. +The goal of the feature store is to abstract away all node and edge feature +memory management so that varying implementations can allow for independent +scale-out. + +This particular feature store abstraction makes a few key assumptions: +* The features we care about storing are node and edge features of a graph. + To this end, the attributes that the feature store supports include a + `group_name` (e.g. a heterogeneous node name or a heterogeneous edge type), + an `attr_name` (e.g. `x` or `edge_attr`), and an index. +* A feature can be uniquely identified from any associated attributes specified + in `TensorAttr`. + +It is the job of a feature store implementor class to handle these assumptions +properly. For example, a simple in-memory feature store implementation may +concatenate all metadata values with a feature index and use this as a unique +index in a KV store. More complicated implementations may choose to partition +features in interesting manners based on the provided metadata. + +Major TODOs for future implementation: +* Async `put` and `get` functionality +""" +import copy +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum +from typing import Any, List, Optional, Tuple, Union + +import numpy as np +import tensorlayerx as tlx + +from gammagl.typing import NodeType +from gammagl.utils.mixin import CastMixin + +# Tensorlayerx does not have a Tensor type name. For each backend, the corresponding +# Tensor type is used. Here, use an empty Tensor class instead. +class Tensor: + pass + +# We allow indexing with a tensor, numpy array, Python slicing, or a single +# integer index. +IndexType = Union[Tensor, np.ndarray, slice, int] + +# A representation of a feature tensor +FeatureTensorType = Union[Tensor, np.ndarray] + +class _FieldStatus(Enum): + UNSET = None + +@dataclass +class TensorAttr(CastMixin): + r"""Defines the attributes of a :class:`FeatureStore` tensor. + It holds all the parameters necessary to uniquely identify a tensor from + the :class:`FeatureStore`. + + Note that the order of the attributes is important; this is the order in + which attributes must be provided for indexing calls. :class:`FeatureStore` + implementations can define a different ordering by overriding + :meth:`TensorAttr.__init__`. + """ + + # The group name that the tensor corresponds to. Defaults to UNSET. + group_name: Optional[NodeType] = _FieldStatus.UNSET + + # The name of the tensor within its group. Defaults to UNSET. + attr_name: Optional[str] = _FieldStatus.UNSET + + # The node indices the rows of the tensor correspond to. Defaults to UNSET. + index: Optional[IndexType] = _FieldStatus.UNSET + + # Convenience methods ##################################################### + + def is_set(self, key: str) -> bool: + r"""Whether an attribute is set in :obj:`TensorAttr`.""" + assert key in self.__dataclass_fields__ + return getattr(self, key) != _FieldStatus.UNSET + + def is_fully_specified(self) -> bool: + r"""Whether the :obj:`TensorAttr` has no unset fields.""" + return all([self.is_set(key) for key in self.__dataclass_fields__]) + + def fully_specify(self) -> 'TensorAttr': + r"""Sets all :obj:`UNSET` fields to :obj:`None`.""" + for key in self.__dataclass_fields__: + if not self.is_set(key): + setattr(self, key, None) + return self + + def update(self, attr: 'TensorAttr') -> 'TensorAttr': + r"""Updates an :class:`TensorAttr` with set attributes from another + :class:`TensorAttr`. + """ + for key in self.__dataclass_fields__: + if attr.is_set(key): + setattr(self, key, getattr(attr, key)) + return self + +class AttrView(CastMixin): + r"""Defines a view of a :class:`FeatureStore` that is obtained from a + specification of attributes on the feature store. The view stores a + reference to the backing feature store as well as a :class:`TensorAttr` + object that represents the view's state. + + Users can create views either using the :class:`AttrView` constructor, + :meth:`FeatureStore.view`, or by incompletely indexing a feature store. + For example, the following calls all create views: + + .. code-block:: python + + store[group_name] + store[group_name].feat + store[group_name, feat] + + While the following calls all materialize those views and produce tensors + by either calling the view or fully-specifying the view: + + .. code-block:: python + + store[group_name]() + store[group_name].feat[index] + store[group_name, feat][index] + """ + def __init__(self, store: 'FeatureStore', attr: TensorAttr): + self.__dict__['_store'] = store + self.__dict__['_attr'] = attr + + # Advanced indexing + + def __getattr__(self, key: Any) -> Union['AttrView', FeatureTensorType]: + r"""Sets the first unset field of the backing :class:`TensorAttr` + object to the attribute. + + This allows for :class:`AttrView` to be indexed by different values of + attributes, in order. + In particular, for a feature store that we want to index by + :obj:`group_name` and :obj:`attr_name`, the following code will do so: + + .. code-block:: python + + store[group, attr] + store[group].attr + store.group.attr + """ + out = copy.copy(self) + arrt_name: Optional[str] = None + # find the first attr name that is UNSET + for field in out._attr.__dataclass_fields__: + if getattr(out._attr, field) == _FieldStatus.UNSET: + arrt_name = field + break + + if arrt_name is None: + raise AttributeError(f"Cannot access attribute '{key}' on view " + f"'{out}' as all attributes have already " + f"been set in this view") + + setattr(out._attr, arrt_name, key) + + if out._attr.is_fully_specified(): + return out._store.get_tensor(out._attr) + + return out + + def __getitem__(self, key: Any) -> Union['AttrView', FeatureTensorType]: + r"""Sets the first unset field of the backing :class:`TensorAttr` + object to the attribute via indexing. + + This allows for :class:`AttrView` to be indexed by different values of + attributes, in order. + In particular, for a feature store that we want to index by + :obj:`group_name` and :obj:`attr_name`, the following code will do so: + + .. code-block:: python + + store[group, attr] + store[group][attr] + + """ + return self.__getattr__(key) + + # Setting attributes + + def __setattr__(self, key: str, value: Any): + r"""Supports attribute assignment to the backing :class:`TensorAttr` of + an :class:`AttrView`. + + This allows for :class:`AttrView` objects to set their backing + attribute values. + In particular, the following operation sets the :obj:`index` of an + :class:`AttrView`: + + .. code-block:: python + + view = store.view(group_name) + view.index = tlx.convert_to_tensor([1, 2, 3]) + """ + if key not in self._attr.__dataclass_fields__: + raise ValueError(f"Attempted to set nonexistent attribute '{key}' " + f"(acceptable attributes are " + f"{self._attr.__dataclass_fields__})") + + setattr(self._attr, key, value) + + def __setitem__(self, key: str, value: Any): + r"""Supports attribute assignment to the backing :class:`TensorAttr` of + an :class:`AttrView` via indexing. + + This allows for :class:`AttrView` objects to set their backing + attribute values. + In particular, the following operation sets the `index` of an + :class:`AttrView`: + + .. code-block:: python + + view = store.view(TensorAttr(group_name)) + view['index'] = tlx.convert_to_tensor([1, 2, 3]) + """ + self.__setattr__(key, value) + + # Miscellaneous built-ins + + def __call__(self) -> FeatureTensorType: + r"""Supports :class:`AttrView` as a callable to force retrieval from + the currently specified attributes. + + In particular, this passes the current :class:`TensorAttr` object to a + GET call, regardless of whether all attributes have been specified. + It returns the result of this call. + In particular, the following operation returns a tensor by performing a + GET operation on the backing feature store: + + .. code-block:: python + + store[group_name, attr_name]() + """ + # Set all UNSET values to None: + out = copy.copy(self) + out._attr.fully_specify() + return out._store.get_tensor(out._attr) + + def __copy__(self) -> 'AttrView': + out = self.__class__.__new__(self.__class__) + for key, value in self.__dict__.items(): + out.__dict__[key] = value + out.__dict__['_attr'] = copy.copy(out.__dict__['_attr']) + return out + + def __eq__(self, obj: Any) -> bool: + r"""Compares two :class:`AttrView` objects by checking equality of + their :class:`FeatureStore` references and :class:`TensorAttr` + attributes. + """ + if not isinstance(obj, AttrView): + return False + return self._store == obj._store and self._attr == obj._attr + + def __repr__(self) -> str: + return (f'{self.__class__.__name__}(store={self._store}, ' + f'attr={self._attr})') + +class FeatureStore(ABC): + r"""An abstract base class to access features from a remote feature store. + + Args: + tensor_attr_cls (TensorAttr, optional): A user-defined + :class:`TensorAttr` class to customize the required attributes and + their ordering to unique identify tensor values. + (default: :obj:`None`) + """ + _tensor_attr_cls: TensorAttr + + def __init__(self, tensor_attr_cls: Optional[Any] = None): + super().__init__() + self.__dict__['_tensor_attr_cls'] = tensor_attr_cls or TensorAttr + + # Core (CRUD) + + @abstractmethod + def _put_tensor(self, tensor: FeatureTensorType, attr:TensorAttr) -> bool: + r"""To be implemented by :class:`FeatureStore` subclasses.""" + pass + + def put_tensor(self, tensor: FeatureTensorType, *args, **kwargs) -> bool: + r"""Synchronously adds a :obj:`tensor` to the :class:`FeatureStore`. + Returns whether insertion was successful. + + Args: + tensor(Tensor or np.ndarray): The feature tensor to be added. + *args: Arguments passed to :class:`TensorAttr`. + **kwargs: Keyword arguments passed to :class:`TensorAttr`. + + Raises: + ValueError: If the input :class:`TensorAttr` is not fully + specified. + """ + attr = self._tensor_attr_cls.cast(*args, **kwargs) + if not attr.is_fully_specified(): + raise ValueError(f"The input TensorAttr '{attr}' is not fully " + f"specified. Please fully-specify the input by " + f"specifying all 'UNSET' fields") + return self._put_tensor(tensor, attr) + + @abstractmethod + def _get_tensor(slef, attr: TensorAttr) -> Optional[FeatureTensorType]: + r"""To be implemented by :class:`FeatureStore` subclasses.""" + pass + + def get_tensor( + self, + *args, + convert_type: bool = False, + **kwargs + ) -> FeatureTensorType: + r"""Synchronously obtains a :class:`Tensor` from the + :class:`FeatureStore`. + + Args: + *args: Arguments passed to :class:`TensorAttr`. + convert_type (bool, optional): Whether to convert the type of the + output tensor to the type of the attribute index. + (default: :obj:`False`) + **kwargs: Keyword arguments passed to :class:`TensorAttr`. + + Raises: + ValueError: If the input :class:`TensorAttr` is not fully + specified. + """ + attr = self._tensor_attr_cls.cast(*args, **kwargs) + if not attr.is_fully_specified(): + raise ValueError(f"The input TensorAttr '{attr}' is not fully " + f"specified. Please fully-specify the input by " + f"specifying all 'UNSET' fields.") + + tensor = self._get_tensor(attr) + if convert_type: + tensor = self._to_type(attr, tensor) + return tensor + + def _multi_get_tensor( + self, + attrs: List[TensorAttr], + ) -> List[Optional[FeatureTensorType]]: + r"""To be implemented by :class:`FeatureStore` subclasses.""" + return [self._get_tensor(attr) for attr in attrs] + + def multi_get_tensor( + self, + sttrs: List[TensorAttr], + convert_type: bool = False + ) -> List[FeatureTensorType]: + r"""Synchronously obtains a list of tensors from the + :class:`FeatureStore` for each tensor associated with the attributes in + :obj:`attrs`. + + .. note:: + The default implementation simply iterates over all calls to + :meth:`get_tensor`. Implementor classes that can provide + additional, more performant functionality are recommended to + to override this method. + + Args: + attrs (List[TensorAttr]): A list of input :class:`TensorAttr` + objects that identify the tensors to obtain. + convert_type (bool, optional): Whether to convert the type of the + output tensor to the type of the attribute index. + (default: :obj:`False`) + + Raises: + ValueError: If any input :class:`TensorAttr` is not fully + specified. + """ + attrs = [self._tensor_attr_cls.cast(attr) for attr in attrs] + bad_attrs = [attr for attr in attrs if not attr.is_fully_specified()] + if len(bad_attrs) > 0: + raise ValueError( + f"The input TensorAttr(s) '{bad_attrs}' are not fully " + f"specified. Please fully-specify them by specifying all " + f"'UNSET' fields") + + tensors = self._multi_get_tensor(attrs) + if convert_type: + tensors = [ + self._to_type(attr, tensor) + for attr, tensor in zip(attrs, tensors) + ] + return tensors + + @abstractmethod + def _remove_tensor(self, attr: TensorAttr) -> bool: + r"""To be implemented by :class:`FeatureStore` subclasses.""" + pass + + def remove_tensor(self, *args, **kwargs) -> bool: + r"""Removes a tensor from the :class:`FeatureStore`. + Returns whether deletion was successful. + + Args: + *args: Arguments passed to :class:`TensorAttr`. + **kwargs: Keyword arguments passed to :class:`TensorAttr`. + + Raises: + ValueError: If the input :class:`TensorAttr` is not fully + specified. + """ + attr = self._tensor_attr_cls.cast(*args, **kwargs) + if not attr.is_fully_specified(): + raise ValueError(f"The input TensorAttr '{attr}' is not fully " + f"specified. Please fully-specify the input by " + f"specifying all 'UNSET' fields.") + return self._remove_tensor(attr) + + def update_tensor(self, tensor:FeatureTensorType, *args, **kwargs) -> bool: + r"""Updates a :obj:`tensor` in the :class:`FeatureStore` with a new + value. Returns whether the update was succesful. + + .. note:: + Implementor classes can choose to define more efficient update + methods; the default performs a removal and insertion. + + Args: + tensor (Tensor or np.ndarray): The feature tensor to be updated. + *args: Arguments passed to :class:`TensorAttr`. + **kwargs: Keyword arguments passed to :class:`TensorAttr`. + """ + attr = self._tensor_attr_cls.cast(*args, **kwargs) + self.remove_tensor(attr) + return self.put_tensor(tensor, attr) + + # Additional methods + + @abstractmethod + def _get_tensor_size(self, attr: TensorAttr) -> Optional[Tuple[int, ...]]: + r"""To be implemented by :class:`FeatureStore` subclasses.""" + pass + + def get_tensor_size(self, *args, **kwargs) -> Optional[Tuple[int, ...]]: + r"""Obtains the size of a tensor given its :class:`TensorAttr`, or + :obj:`None` if the tensor does not exist. + """ + attr = self._tensor_attr_cls.cast(*args, **kwargs) + if not attr.is_set('index'): + attr.index = None + return self._get_tensor_size(attr) + + @abstractmethod + def get_all_tensor_attrs(self) -> List[TensorAttr]: + r"""Returns all registered tensor attributes.""" + pass + + # `AttrView` methods ###################################################### + + def view(self, *args, **kwargs) -> AttrView: + r"""Returns a view of the :class:`FeatureStore` given a not yet + fully-specified :class:`TensorAttr`. + """ + attr = self._tensor_attr_cls.cast(*args, **kwargs) + return AttrView(self, attr) + + # Helper functions ######################################################## + + @staticmethod + def _to_type( + attr: TensorAttr, + tensor: FeatureTensorType, + ) -> FeatureTensorType: + if tlx.is_tensor(attr) and isinstance(tensor, np.ndarray): + return tlx.convert_to_tensor(tensor) + if isinstance(attr.index, np.ndarray) and isinstance(tensor, Tensor): + return tlx.convert_to_numpy(tensor) + return tensor + + # Python built-ins ######################################################## + + def __setitem__(self, key: TensorAttr, value: FeatureTensorType): + r"""Supports :obj:`store[tensor_attr] = tensor`.""" + # CastMixin will handle the case of key being a tuple or TensorAttr object: + key = self._tensor_attr_cls.cast(key) + # We need to fully-specify the key for __setitem__ as it does not make + # sense to work with a view here: + key.fully_specify() + self.put_tensor(value, key) + + def __getitem__(self, key: TensorAttr) -> Any: + r"""Supports pythonic indexing into the :class:`FeatureStore`. + + In particular, the following rules are followed for indexing: + + * A fully-specified :obj:`key` will produce a tensor output. + + * A partially-specified :obj:`key` will produce an :class:`AttrView` + output, which is a view on the :class:`FeatureStore`. If a view is + called, it will produce a tensor output from the corresponding + (partially specified) attributes. + """ + # CastMixin will handle the case of key being a tuple or TensorAttr object: + attr = self._tensor_attr_cls.cast(key) + if attr.is_fully_specified(): + return self.get_tensor(attr) + # If the view is not fully-specified, return a :class:`AttrView`: + return self.view(attr) + + def __delitem__(self, key: TensorAttr): + r"""Supports :obj:`del store[tensor_attr]`.""" + # CastMixin will handle the case of key being a tuple or TensorAttr object: + key = self._tensor_attr_cls.cast(key) + key.fully_specify() + self.remove_tensor(key) + + def __iter__(self): + raise NotImplementedError + + def __eq__(self, obj: object) -> bool: + return id(self) == id(obj) + + def __repr__(self) -> str: + return f'{self.__class__.__name__}()' diff --git a/gammagl/data/graph_store.py b/gammagl/data/graph_store.py new file mode 100644 index 00000000..1a265e90 --- /dev/null +++ b/gammagl/data/graph_store.py @@ -0,0 +1,360 @@ +r"""This class defines the abstraction for a backend-agnostic graph store. +Thr goal of the graph store is to abstract away alla graph edge index memory +management so that vbarying implementations can allow for independent scale-out. + +This particular graph store abstraction makes a few key assumptions: +* The edge indices we care about storing are represented either in COO, CSC, or + CSR format. They can be uniquely identified by an edge type (in GammaGL, + this is a tuple of the form (source node type, edge type, destination node)). +* The edge indices are static once they are stored in the graph. That is, we do not + support dynamic modification of edge indices once they have been inserted + into the graph store. + +It is the job of a graph store implementor class to handle these assumptions +properly. For example, a simple in-memory graph store implementation may +concatenate all metadata values with an edge index and use this as a unique +index in a KV store. More complicated implementations may choose to partition +the graph in interesting manners based on the provided metadata. +""" +import copy +from abc import ABC, abstractmethod +from collections import defaultdict +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple + +import tensorlayerx as tlx +from gammagl.typing import EdgeType +from gammagl.utils.mixin import CastMixin +from gammagl.ops.sparse import ind2ptr, ptr2ind + +# Tensorlayerx does not have a Tensor type name. For each backend, the corresponding +# Tensor type is used. Here, use an empty Tensor class instead. +class Tensor: + pass + +# The output of converting between two types in the GraphStore is a Tuple of +# dictionaries: row, col, and perm. The dictionaries are keyed by the edge +# type of the input edge attribute. +# * The row dictionary contains the row tensor for COO, the row pointer for +# CSR, or the row tensor for CSC +# * The col dictionary contains the col tensor for COO, the col tensor for +# CSR, or the col pointer for CSC +# * The perm dictionary contains the permutation of edges that was applied +# in converting between formats, if applicable. +ConversionOutputType = Tuple[Dict[EdgeType, Tensor], Dict[EdgeType, Tensor], Dict[EdgeType, Tensor]] + +class EdgeLayout(Enum): + COO = 'coo' + CSC = 'csc' + CSR = 'csr' + +# A representation of an edge index, following the possible formats: +# * COO: (row, col) +# * CSC: (row, colptr) +# * CSR: (rowptr, col) +EdgeTensorType = Tuple[Tensor, Tensor] + +@dataclass +class EdgeAttr(CastMixin): + r"""Defines the attributes of a :obj:`GraphStore` edge. + It holds all the parameters necessary to uniquely identify an edge from + the :class:`GraphStore`. + + Note that the order of the attributes is important; this is the order in + which attributes must be provided for indexing calls. :class:`GraphStore` + implementations can define a different ordering by overriding + :meth:`EdgeAttr.__init__`. + """ + + # The type of the edge: + edge_type: EdgeType + + # The layout of the edge representation: + layout: EdgeLayout + + # Whether the edge index is sorted by destination node. Useful for + # avoiding sorting costs when performing neighbor sampling, and only + # meaningful for COO (CSC is sorted and CSR is not sorted by definition): + is_sorted: bool = False + + # The number of source and destination nodes in this edge type: + size: Optional[Tuple[int, int]] = None + + # NOTE we define __init__ to force-cast layout + def __init__( + self, + edge_type: EdgeType, + layout: EdgeLayout, + is_sorted: bool = False, + size: Optional[Tuple[int, int]] = None, + ): + layout = EdgeLayout(layout) + + if layout == EdgeLayout.CSR and is_sorted: + raise ValueError("Cannot create a 'CSR' edge attribute with " + "option 'is_sorted=True'") + + if layout == EdgeLayout.CSC: + is_sorted = True + + self.edge_type = edge_type + self.layout = layout + self.is_sorted = is_sorted + self.size = size + +class GraphStore(ABC): + r"""An abstract base class to access edges from a remote graph store. + + Args: + edge_attr_cls (EdgeAttr, optional): A user-defined + :class:`EdgeAttr` class to customize the required attributes and + their ordering to uniquely identify edges. (default: :obj:`None`) + """ + def __init__(self, edge_attr_cls: Optional[Any] = None): + super().__init__() + self.__dict__['_edge_attr_cls'] = edge_attr_cls or EdgeAttr + + # Core (CRUD) ############################################################# + + @abstractmethod + def _put_edge_index(self, edge_index: EdgeTensorType, edge_attr: EdgeAttr) -> bool: + r"""To be implemented by :class:`GraphStore` subclasses.""" + pass + + def put_edge_index(self, edge_index: EdgeTensorType, *args, + **kwargs) -> bool: + r"""Synchronously adds an :obj:`edge_index` tuple to the + :class:`GraphStore`. + Returns whether insertion was successful. + + Args: + edge_index (Tuple[Tensor, Tensor]): The:obj: + `edge_index` tuple in a format specified in + :class:`EdgeAttr`. + *args: Arguments passed to :class:`EdgeAttr`. + **kwargs: Keyword arguments passed to :class:`EdgeAttr`. + """ + edge_attr = self._edge_attr_cls.cast(*args, **kwargs) + return self._put_edge_index(edge_index, edge_attr) + + @abstractmethod + def _get_edge_index(self, edge_attr: EdgeAttr) -> Optional[EdgeTensorType]: + r"""To be implemented by :class:`GraphStore` subclasses.""" + pass + + def get_edge_index(self, *args, **kwargs) -> EdgeTensorType: + r"""Synchronously obtains an :obj:`edge_index` tuple from the + :class:`GraphStore`. + + Args: + *args: Arguments passed to :class:`EdgeAttr`. + **kwargs: Keyword arguments passed to :class:`EdgeAttr`. + + Raises: + KeyError: If the :obj:`edge_index` corresponding to the input + :class:`EdgeAttr` was not found. + """ + edge_attr = self._edge_attr_cls.cast(*args, **kwargs) + edge_index = self._get_edge_index(edge_attr) + if edge_index is None: + raise KeyError(f"'edge_index' for '{edge_attr}' not found") + return edge_index + + @abstractmethod + def _remove_edge_index(self, edge_attr: EdgeAttr) -> bool: + r"""To be implemented by :class:`GraphStore` subclasses.""" + pass + + def remove_edge_index(self, *args, **kwargs) -> bool: + r"""Synchronously deletes an :obj:`edge_index` tuple from the + :class:`GraphStore`. + Returns whether deletion was successful. + + Args: + *args: Arguments passed to :class:`EdgeAttr`. + **kwargs: Keyword arguments passed to :class:`EdgeAttr`. + """ + edge_attr = self._edge_attr_cls.cast(*args, **kwargs) + return self._remove_edge_index(edge_attr) + + @abstractmethod + def get_all_edge_attrs(self) -> List[EdgeAttr]: + r"""Returns all registered edge attributes.""" + pass + + # Layout Conversion ####################################################### + + def coo( + self, + edge_types: Optional[List[Any]] = None, + store: bool = False + ) -> ConversionOutputType: + r"""Returns the edge indices in the :class:`GraphStore` in COO format. + + Args: + edge_types (List[Any], optional): The edge types of edge indices + to obtain. If set to :obj:`None`, will return the edge indices + of all existing edge types. (default: :obj:`None`) + store (bool, optional): Whether to store converted edge indices in + the :class:`GraphStore`. (default: :obj:`False`) + """ + return self._edges_to_layout(EdgeLayout.COO, edge_types, store) + + def csr( + self, + edge_types: Optional[List[Any]] = None, + store: bool = False + ) -> ConversionOutputType: + r"""Returns the edge indices in the :class:`GraphStore` in CSR format. + + Args: + edge_types (List[Any], optional): The edge types of edge indices + to obtain. If set to :obj:`None`, will return the edge indices + of all existing edge types. (default: :obj:`None`) + store (bool, optional): Whether to store converted edge indices in + the :class:`GraphStore`. (default: :obj:`False`) + """ + return self._edges_to_layout(EdgeLayout.CSR, edge_types, store) + + def csc( + self, + edge_types: Optional[List[Any]] = None, + store: bool = False + ) -> ConversionOutputType: + r"""Returns the edge indices in the :class:`GraphStore` in CSC format. + + Args: + edge_types (List[Any], optional): The edge types of edge indices + to obtain. If set to :obj:`None`, will return the edge indices + of all existing edge types. (default: :obj:`None`) + store (bool, optional): Whether to store converted edge indices in + the :class:`GraphStore`. (default: :obj:`False`) + """ + return self._edges_to_layout(EdgeLayout.CSC, edge_types, store) + + # Python built-ins ######################################################## + + def __setitem__(self, key: EdgeAttr, value: EdgeTensorType): + self.put_edge_index(value, key) + + def __getitem__(self, key: EdgeAttr) -> Optional[EdgeTensorType]: + return self.get_edge_index(key) + + def __delitem__(self, key: EdgeAttr): + return self.remove_edge_index(key) + + def __repr__(self) -> str: + return f'{self.__class__.__name__}()' + + # Helper methods ########################################################## + + def _edge_to_layout( + self, + attr: EdgeAttr, + layout: EdgeLayout, + store: bool = False): + + (row, col), perm = self.get_edge_index(attr), None + + if layout == EdgeLayout.COO: # COO output requested: + if attr.layout == EdgeLayout.CSR: # CSR->COO + row = ptr2ind(row) + elif attr.layout == EdgeLayout.CSC: # CSC->COO + col = ptr2ind(col) + + elif layout == EdgeLayout.CSR: # CSR output requested: + if attr.layout == EdgeLayout.CSC: # CSC->COO + col = ptr2ind(col) + + if attr.layout != EdgeLayout.CSR: # COO->CSR + num_rows = attr.size[0] if attr.size else int(row.max()) + 1 + perm = tlx.argsort(row) + row = tlx.gather(row, perm) + # row, perm = index_sort(row, max_value=num_rows) + col = col[perm] + row = ind2ptr(row, num_rows) + + else: # CSC output requested: + if attr.layout == EdgeLayout.CSR: # CSR->COO + row = ptr2ind(row) + + if attr.layout != EdgeLayout.CSC: # COO->CSC + if hasattr(self, 'meta') and self.meta.get('is_hetero', False): + # Hotfix for `LocalGraphStore`, where in heterogeneous + # graphs, edge indices for different edge types have + # continuous indices not starting at 0. + num_cols = int(col.max()) + 1 + elif attr.size is not None: + num_cols = attr.size[1] + else: + num_cols = int(col.max()) + 1 + + if not attr.is_sorted: # Not sorted by destination. + perm = tlx.argsort(col) + col = tlx.gather(col, perm) + # col, perm = index_sort(col, max_value=num_cols) + row = row[perm] + col = ind2ptr(col, num_cols) + + if attr.layout != layout and store: + attr = copy.copy(attr) + attr.layout = layout + if perm is not None: + attr.is_sorted = False + self.put_edge_index((row, col), attr) + + return row, col, perm + + def _edges_to_layout( + self, + layout: EdgeLayout, + edge_types: Optional[List[Any]] = None, + store: bool = False, + ) -> ConversionOutputType: + + edge_attrs: List[EdgeAttr] = self.get_all_edge_attrs() + + if hasattr(self, 'meta'): # `LocalGraphStore` hack. + is_hetero = self.meta.get('is_hetero', False) + else: + is_hetero = all(attr.edge_type is not None for attr in edge_attrs) + + if not is_hetero: + return self._edge_to_layout(edge_attrs[0], layout, store) + + # Obtain all edge attributes, grouped by type: + edge_type_attrs: Dict[EdgeType, List[EdgeAttr]] = defaultdict(list) + for attr in self.get_all_edge_attrs(): + edge_type_attrs[attr.edge_type].append(attr) + + # Check that requested edge types exist and filter: + if edge_types is not None: + for edge_type in edge_types: + if edge_type not in edge_type_attrs: + raise ValueError(f"The 'edge_index' of type '{edge_type}' " + f"was not found in the graph store.") + + edge_type_attrs = { + key: attr + for key, attr in edge_type_attrs.items() if key in edge_types + } + + # Convert layout from its most favorable original layout: + row_dict, col_dict, perm_dict = {}, {}, {} + for edge_type, attrs in edge_type_attrs.items(): + layouts = [attr.layout for attr in attrs] + + if layout in layouts: # No conversion needed. + attr = attrs[layouts.index(layout)] + elif EdgeLayout.COO in layouts: # Prefer COO for conversion. + attr = attrs[layouts.index(EdgeLayout.COO)] + elif EdgeLayout.CSC in layouts: + attr = attrs[layouts.index(EdgeLayout.CSC)] + elif EdgeLayout.CSR in layouts: + attr = attrs[layouts.index(EdgeLayout.CSR)] + + row_dict[edge_type], col_dict[edge_type], perm_dict[edge_type] = ( + self._edge_to_layout(attr, layout, store)) + + return row_dict, col_dict, perm_dict diff --git a/gammagl/utils/mixin.py b/gammagl/utils/mixin.py new file mode 100644 index 00000000..7bda5baa --- /dev/null +++ b/gammagl/utils/mixin.py @@ -0,0 +1,22 @@ +from typing import Any, Iterator, TypeVar + +T = TypeVar('T') + + +class CastMixin: + @classmethod + def cast(cls: T, *args: Any, **kwargs: Any) -> T: + if len(args) == 1 and len(kwargs) == 0: + elem = args[0] + if elem is None: + return None # type: ignore + if isinstance(elem, CastMixin): + return elem # type: ignore + if isinstance(elem, tuple): + return cls(*elem) # type: ignore + if isinstance(elem, dict): + return cls(**elem) # type: ignore + return cls(*args, **kwargs) # type: ignore + + def __iter__(self) -> Iterator: + return iter(self.__dict__.values()) From 84b53e8eb5e6904f1542c1fc7bbbc7b606687f91 Mon Sep 17 00:00:00 2001 From: jxy <865526875@qq.com> Date: Wed, 5 Jun 2024 07:00:21 +0000 Subject: [PATCH 2/4] update __init__.py --- gammagl/data/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/gammagl/data/__init__.py b/gammagl/data/__init__.py index 91c01c15..15b64fa6 100644 --- a/gammagl/data/__init__.py +++ b/gammagl/data/__init__.py @@ -6,6 +6,8 @@ from .in_memory_dataset import InMemoryDataset from .extract import extract_zip, extract_tar from .utils import global_config_init +from .feature_store import FeatureStore, TensorAttr +from .graph_store import GraphStore, EdgeAttr __all__ = [ 'BaseGraph', @@ -18,6 +20,10 @@ 'extract_zip', 'extract_tar', 'global_config_init', + 'FeatureStore', + 'TensorAttr', + 'GraphStore', + 'EdgeAttr', ] classes = __all__ From e229d2447a6888e723808fdcfbea017143746128 Mon Sep 17 00:00:00 2001 From: jxy <865526875@qq.com> Date: Fri, 12 Jul 2024 18:45:00 +0800 Subject: [PATCH 3/4] add example of feature_store & graph_store --- examples/database/graph_store/prepare_data.py | 21 +++ .../graph_store/reddit_sage_trainer.py | 130 ++++++++++++++++++ 2 files changed, 151 insertions(+) create mode 100644 examples/database/graph_store/prepare_data.py create mode 100644 examples/database/graph_store/reddit_sage_trainer.py diff --git a/examples/database/graph_store/prepare_data.py b/examples/database/graph_store/prepare_data.py new file mode 100644 index 00000000..bb6e8c88 --- /dev/null +++ b/examples/database/graph_store/prepare_data.py @@ -0,0 +1,21 @@ +from gammagl.utils import mask_to_index +from gammagl.datasets import Reddit +import tensorlayerx as tlx +import numpy as np +from gdbi.ggl import Neo4jFeatureStore, Neo4jGraphStore + +dataset = Reddit('') +graph = dataset[0] + +train_idx = tlx.convert_to_numpy(mask_to_index(graph.train_mask)) +val_idx = tlx.convert_to_numpy(mask_to_index(graph.val_mask)) +test_idx = tlx.convert_to_numpy(mask_to_index(graph.test_mask)) + +np.savez('idx.npz', train_idx=train_idx, val_idx=val_idx, test_idx=test_idx) + +feature_store = Neo4jFeatureStore(uri='bolt://10.129.91.49:7687', user_name='neo4j', password='root123456') +graph_store = Neo4jGraphStore(uri='bolt://10.129.91.49:7687', user_name='neo4j', password='root123456') + +feature_store['reddit_node', 'x', tlx.arange(start=0, limit=graph.x.shape[0])] = graph.x +feature_store['reddit_node', 'y', tlx.arange(start=0, limit=graph.x.shape[0])] = graph.y +graph_store['reddit_edge', 'coo'] = graph.edge_index \ No newline at end of file diff --git a/examples/database/graph_store/reddit_sage_trainer.py b/examples/database/graph_store/reddit_sage_trainer.py new file mode 100644 index 00000000..e003bced --- /dev/null +++ b/examples/database/graph_store/reddit_sage_trainer.py @@ -0,0 +1,130 @@ +import os +# os.environ['CUDA_VISIBLE_DEVICES'] = '0' +# os.environ['TL_BACKEND'] = 'torch' +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' + +from gammagl.utils import mask_to_index +from tensorlayerx.model import WithLoss, TrainOneStep +from tqdm import tqdm +import tensorlayerx as tlx +import argparse +from gammagl.loader.neighbor_sampler import NeighborSampler +from gammagl.models import GraphSAGE_Sample_Model +import numpy as np +from gdbi.ggl import Neo4jFeatureStore, Neo4jGraphStore + +class SemiSpvzLoss(WithLoss): + def __init__(self, net, loss_fn): + super(SemiSpvzLoss, self).__init__(backbone=net, loss_fn=loss_fn) + + def forward(self, data, y): + logits = self.backbone_network(data['x'], data['subgs']) + loss = self._loss_fn(logits, tlx.gather(data['y'], data['dst_node'])) + return loss + + +def calculate_acc(logits, y, metrics): + """ + Args: + logits: node logits + y: node labels + metrics: tensorlayerx.metrics + + Returns: + rst + """ + + metrics.update(logits, y) + rst = metrics.result() + metrics.reset() + return rst + + +def main(args): + feature_store = Neo4jFeatureStore(uri='bolt://10.129.91.49:7687', user_name='neo4j', password='root123456') + graph_store = Neo4jGraphStore(uri='bolt://10.129.91.49:7687', user_name='neo4j', password='root123456') + + edge_index = graph_store['reddit_edge', 'coo'] + + with np.load('idx.npz') as data: + train_idx = tlx.convert_to_tensor(data['train_idx']) + val_idx = tlx.convert_to_tensor(data['val_idx']) + test_idx = tlx.convert_to_tensor(data['test_idx']) + train_loader = NeighborSampler(edge_index=edge_index, + node_idx=train_idx, + sample_lists=[25, 10], batch_size=2048, shuffle=True, num_workers=0) + + val_loader = NeighborSampler(edge_index=edge_index, + node_idx=val_idx, + sample_lists=[-1], batch_size=2048 * 2, shuffle=False, num_workers=0) + test_loader = NeighborSampler(edge_index=edge_index, + node_idx=test_idx, + sample_lists=[-1], batch_size=2048 * 2, shuffle=False, num_workers=0) + + + net = GraphSAGE_Sample_Model(in_feat=602, + hid_feat=args.hidden_dim, + out_feat=41, + drop_rate=args.drop_rate, + num_layers=args.num_layers) + optimizer = tlx.optimizers.Adam(args.lr) + metrics = tlx.metrics.Accuracy() + train_weights = net.trainable_weights + + loss_func = SemiSpvzLoss(net, tlx.losses.softmax_cross_entropy_with_logits) + train_one_step = TrainOneStep(loss_func, optimizer, train_weights) + + y = feature_store['reddit_node', 'y', tlx.arange(start=0, limit=232965)] + y = tlx.reshape(tlx.cast(y, dtype=tlx.int64), (-1,)) + + for epoch in range(args.n_epoch): + pbar = tqdm(total=int(len(train_loader.dataset))) + pbar.set_description(f'Epoch {epoch:02d}') + for dst_node, n_id, adjs in train_loader: + net.set_train() + # input : sampled subgraphs, sampled node's feat + data = {"x": feature_store['reddit_node', 'x', n_id], + "y": y, + "dst_node": dst_node, + "subgs": adjs} + # label is not used + train_loss = train_one_step(data, tlx.convert_to_tensor([0])) + pbar.update(len(dst_node)) + print("Epoch [{:0>3d}] ".format(epoch + 1) + " train loss: {:.4f}".format(train_loss.item())) + + logits = net.inference(x, val_loader, data['x']) + if tlx.BACKEND == 'torch': + val_idx = val_idx.to(data['x'].device) + val_logits = tlx.gather(logits, val_idx) + val_y = tlx.gather(data['y'], val_idx) + val_acc = calculate_acc(val_logits, val_y, metrics) + + logits = net.inference(x, test_loader, data['x']) + test_logits = tlx.gather(logits, test_idx) + test_y = tlx.gather(data['y'], test_idx) + test_acc = calculate_acc(test_logits, test_y, metrics) + + print("val acc: {:.4f} || test acc{:.4f}".format(val_acc, test_acc)) + + +if __name__ == '__main__': + # parameters setting + parser = argparse.ArgumentParser() + parser.add_argument("--lr", type=float, default=0.0005, help="learnin rate") + parser.add_argument("--n_epoch", type=int, default=50, help="number of epoch") + parser.add_argument("--hidden_dim", type=int, default=256, help="dimention of hidden layers") + parser.add_argument("--drop_rate", type=float, default=0.8, help="drop_rate") + parser.add_argument("--num_layers", type=int, default=2) + parser.add_argument("--l2_coef", type=float, default=0., help="l2 loss coeficient") + parser.add_argument('--dataset', type=str, default='reddit', help='dataset') + parser.add_argument("--dataset_path", type=str, default=r'', help="path to save dataset") + # parser.add_argument("--best_model_path", type=str, default=r'./', help="path to save best model") + parser.add_argument("--gpu", type=int, default=0) + + args = parser.parse_args() + if args.gpu >= 0: + tlx.set_device("GPU", args.gpu) + else: + tlx.set_device("CPU") + + main(args) \ No newline at end of file From ab2afbbfa1daace0630f05d6cfa46d7d1f3a7ae2 Mon Sep 17 00:00:00 2001 From: jxy <865526875@qq.com> Date: Fri, 12 Jul 2024 18:59:40 +0800 Subject: [PATCH 4/4] update login information --- examples/database/graph_store/prepare_data.py | 8 ++++++-- examples/database/graph_store/reddit_sage_trainer.py | 10 +++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/examples/database/graph_store/prepare_data.py b/examples/database/graph_store/prepare_data.py index bb6e8c88..47e489c3 100644 --- a/examples/database/graph_store/prepare_data.py +++ b/examples/database/graph_store/prepare_data.py @@ -13,8 +13,12 @@ np.savez('idx.npz', train_idx=train_idx, val_idx=val_idx, test_idx=test_idx) -feature_store = Neo4jFeatureStore(uri='bolt://10.129.91.49:7687', user_name='neo4j', password='root123456') -graph_store = Neo4jGraphStore(uri='bolt://10.129.91.49:7687', user_name='neo4j', password='root123456') +uri = 'bolt://localhost:7687' +user_name= 'neo4j' +password= 'neo4j' + +feature_store = Neo4jFeatureStore(uri=uri, user_name=user_name, password=password) +graph_store = Neo4jGraphStore(uri=uri, user_name=user_name, password=password) feature_store['reddit_node', 'x', tlx.arange(start=0, limit=graph.x.shape[0])] = graph.x feature_store['reddit_node', 'y', tlx.arange(start=0, limit=graph.x.shape[0])] = graph.y diff --git a/examples/database/graph_store/reddit_sage_trainer.py b/examples/database/graph_store/reddit_sage_trainer.py index e003bced..d31f8de4 100644 --- a/examples/database/graph_store/reddit_sage_trainer.py +++ b/examples/database/graph_store/reddit_sage_trainer.py @@ -41,8 +41,12 @@ def calculate_acc(logits, y, metrics): def main(args): - feature_store = Neo4jFeatureStore(uri='bolt://10.129.91.49:7687', user_name='neo4j', password='root123456') - graph_store = Neo4jGraphStore(uri='bolt://10.129.91.49:7687', user_name='neo4j', password='root123456') + uri = 'bolt://localhost:7687' + user_name= 'neo4j' + password= 'neo4j' + + feature_store = Neo4jFeatureStore(uri=uri, user_name=user_name, password=password) + graph_store = Neo4jGraphStore(uri=uri, user_name=user_name, password=password) edge_index = graph_store['reddit_edge', 'coo'] @@ -119,7 +123,7 @@ def main(args): parser.add_argument('--dataset', type=str, default='reddit', help='dataset') parser.add_argument("--dataset_path", type=str, default=r'', help="path to save dataset") # parser.add_argument("--best_model_path", type=str, default=r'./', help="path to save best model") - parser.add_argument("--gpu", type=int, default=0) + parser.add_argument("--gpu", type=int, default=-1) args = parser.parse_args() if args.gpu >= 0: