-
Notifications
You must be signed in to change notification settings - Fork 88
Temporal Graph Support
The temporal analysis of evolving graphs is an important requirement in many domains but hardly supported in current graph databases and graph processing systems. We, therefore, have started with extending Gradoop for temporal graph analysis by adding time properties to vertices, edges, and graphs and using them within graph operators.
See the following sections:
The key features of our model are:
- Bi-temporal time dimension support
- Backwards compatible with the most EPGM operators
- Flexible time representation: can be empty, a timestamp, or a time-interval
See our publications
- Exploration and Analysis of Temporal Property Graphs (Demo @ EDBT 2021)
- Distributed temporal graph analytics with GRADOOP (VLDB Journal 2021)
- Evolution Analysis of Large Graphs with Gradoop (LEG @ ECML PKDD 2019)
- Temporal Graph Analysis using Gradoop (Workshop @ BTW 2019)
The data model is an extension of the EPGM data model. Each graph element (i.e., logical graph, vertex and edge) has two additional time intervals to enable bi-temporal time semantics. One interval represents the transaction time, i.e. the time the fact is current in the graph. It represents rollback information that is maintained by Gradoop. The other interval represents the valid time dimension (also referred to as application time) that represents the time when the information is valid in the real world. It represents historical information. Valid times are typically embedded within the context of the application before they enter Gradoop.
We added two new operators to the TPGM model that can be exclusively applied on a TemporalGraph
instance: Snapshot and Difference. Besides that, we extended some EPGM operators like Transformation, Grouping, and Aggregation with additional features to enable different temporal analyses.
The TPGM snapshot operator allows one to retrieve a valid snapshot of the whole temporal graph either at a specific point in time or a subgraph that is valid during a given time range by providing a temporal predicate function. Besides the operator itself, several predefined predicate functions are available. They are adopted from SQL:2011 that supports temporal databases.
Example code snippet:
// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();
// We need a UNIX timestamp in milliseconds since epoch.
long queryTimestamp = LocalDateTime
.of(2018, 12, 24, 20, 15, 0, 0)
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
// Get the graph 'as of' our query timestamp (consideres the valid time dimension as default)
TemporalGraph historicalGraph = temporalGraph
.snapshot(new AsOf(queryTimestamp));
The evolution of graphs over time can be represented by the difference of two graph snapshots, i.e., by a difference graph that is the union of both snapshots where each graph element is annotated as an added, deleted, or persistent element.
The TPGM diff operator consumes two graph snapshots defined by temporal predicate functions and
calculates the difference graph. The annotations are stored as a property _diff
on each graph
element, whereas the value of the property will be a number indicating that an element is either
equal in both snapshots (0
) or added (1
) or removed (-1
) in the second snapshot.
Example code snippet:
// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();
// We need two UNIX timestamp in milliseconds since epoch that we want to compare
long firstQueryTimestamp = LocalDateTime
.of(2018, 12, 24, 20, 15, 0, 0)
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
long secondQueryTimestamp = LocalDateTime
.of(2019, 12, 24, 20, 15, 0, 0)
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
// Get the difference of both historical graph version (consideres the valid time dimension as default)
TemporalGraph differenceGraph = temporalGraph
.diff(new AsOf(firstQueryTimestamp), new AsOf(secondQueryTimestamp));
The transform operator defines a structure-preserving modification of graph, vertex and edge data. User-defined transformation functions can be applied to a temporal graph, which results in an modified output graph. Within TPGM it is possible to (1) modify the temporal attributes, (2) define the time attributes from information stored in properties or (3) create properties resulting from the temporal information of the time attributes. For example, if the temporal attributes are not yet set or calculated during a workflow, this operator offers the possibility to define the valid times at runtime.
Example code snippet:
// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();
// Assign the valid-from attribute from a property "CreationDate" for all vertices
temporalGraph = temporalGraph.transform(
// Keep the graph heads
TransformationFunction.keep(),
// Extract timestamps for vertices
(origin, t) -> {
if (origin.hasProperty("CreationDate") && origin.getPropertyValue("CreationDate").isDateTime()) {
origin.setValidFrom(
origin.getPropertyValue("CreationDate")
.getDateTime()
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli());
}
return origin;
},
// Keep the edges
TransformationFunction.keep()
);
A structural grouping of vertices and edges is an important task in temporal graph analytics.
Since temporal graphs can become very large, a condensation can facilitate deeper insights about
structures and patterns hidden in the graph. In the current EPGM implementation of the groupBy
operator, a grouping is based on vertex and edge grouping keys as well as vertex
and edge aggregation functions.
For temporal grouping, TPGM provides three additional features:
- time-specific value transformation functions can be applied to compute time values on the desired granularity for grouping
- support for GROUP BY CUBE and GROUP BY ROLLUP similar to SQL (the output of the operator is a collection where each graph corresponds to a single combination of the given grouping keys)
- support of aggregations on the temporal properties by user-defined functions and predefined time-specific aggregation functions (e.g., MinFrom or MaxFrom)
Example code snippet:
// Get the temporal graph from a data source
TemporalGraph temporalGraph = dataSource.getTemporalGraph();
// Assign the valid-from attribute from a property "CreationDate" for all vertices
TemporalGraph groupedGraph = temporalGraph.groupBy(
// Vertex grouping keys: Group vertices by their duration in months
Collections.singletonList(TemporalGroupingKeys.duration(VALID_TIME, ChronoUnit.MONTHS)),
// Vertex aggregate functions: calculate the average valid time duration of grouped vertices
Collections.singletonList(new AverageVertexDuration("avgVertexDurValid", VALID_TIME)),
// Edge grouping keys: Group edges by their type label
Collections.singletonList(GroupingKeys.label()),
// Edge aggregate functions: count the grouped edges and save the result in a property "cnt"
Collections.singletonList(new Count("cnt"))
);
The aggregation operator is used to reduce some kind of information that is contained in a graph
down to a single value by given aggregations. The operator can be called by the aggregate()
function that is available on the LogicalGraph
and TemporalGraph
class.
It takes any amount of aggregate functions as an input and outputs the original graph with the
result of each aggregate function as a new property at the graph head. Most aggregate functions
require a string denoting the name of the property or label they are being applied to.
The temporal extension of the aggregation operator introduces new aggregation functions, that are listed under Temporal Aggregations.
Gradoop uses the Temporal-GDL as pattern matching language. Temporal-GDL is a fork of the Graph Definition Language (GDL) of s1ck and extends it with several temporal constraints to support bitemporal property graphs. In Gradoop, you can use it by calling the .temporalQuery()
function on your temporal graph.
Please see the respective repo for further information: Temporal-GDL.
** Example usage **
TemporalGraph matchResult = myTemporalGraph
// Pattern matching
.temporalQuery("MATCH (v1:Station {cellId: 2883})-[t1:Trip]->(v2:Station)-[t2:Trip]->(v3:Station) " +
"WHERE v2.id != v1.id " +
"AND v2.id != v3.id " +
"AND v3.id != v1.id " +
"AND t1.val.precedes(t2.val) " +
"AND t1.val.lengthAtLeast(Minutes(30)) " +
"AND t2.val.lengthAtLeast(Minutes(30))")
// Reduce collection to graph
.reduce(new ReduceCombination<>())
The following temporal predicates are available for Snapshot and Difference operators.
The placeholder named queryTimestamp
, queryFrom
and queryTo
has to be specified as arguments by the user.
elementFrom
and elementTo
represent the begin and end of the elements validity interval.
Name | Description | Predicate |
---|---|---|
All |
A filter that returns all elements. | |
AsOf |
Given a timestamp, this predicate will match all timestamps before or at that time and all time-intervals containing that time. | elementFrom <= queryTimestamp && elementTo > queryTimestamp |
Between |
Given a time-interval, this predicate will match all intervals that start before or at that interval's end and end after the start of that interval. | elementFrom <= queryTo && elementTo > queryFrom |
ContainedIn |
Given a time interval, this predicate will match all intervals that are a subset of that interval. | queryFrom <= elementFrom && elementTo <= queryTo |
CreatedIn |
Given a time-interval, this predicate matches all intervals starting during that interval. | queryFrom <= elementFrom && elementFrom <= queryTo |
DeletedIn |
Given a time-interval, this predicate will match all intervals ending during that interval. | queryFrom <= elementTo && elementTo <= queryTo |
FromTo |
Given a time-interval, this predicate will match all intervals that were valid during that interval. | elementFrom < queryTo && elementTo > queryFrom |
ValidDuring |
Given a time-interval, this predicate matches all intervals that contain that interval. | elementFrom <= queryFrom && elementTo >= queryTo |
The following aggregations are available for Teime-dependent Grouping and Aggregation. The two available
time dimensions TRANSACTION_TIME
and VALID_TIME
can be specified by the usage of the enum called TimeDimension
. To specify the beginning or end of an interval of a time dimension, the enum TimeDimension.Field
can be used.
Function | Description | Input |
---|---|---|
MinTime |
Minimum of a specified time dimensions's begin or end over all edges and vertices | (1) The property key where the aggregated value is stored. (2) The time dimension to consider. (3) The field of the time dimension (begin or end) to consider. |
MinVertexTime |
Minimum of a specified time dimensions's begin or end over all vertices | Same as MinTime
|
MinEdgeTime |
Minimum of a specified time dimensions's begin or end over all edges | Same as MinTime
|
MaxTime |
Maximum of a specified time dimensions's begin or end over all edges and vertices | Same as MinTime
|
MaxVertexTime |
Maximum of a specified time dimensions's begin or end over all vertices | Same as MinTime
|
MaxEdgeTime |
Maximum of a specified time dimensions's begin or end over all edges | Same as MinTime
|
AverageDuration |
Calculate the average duration of temporal elements of a specified time dimension. Time intervals with either the start or end time set to the respective default value will be ignored. | (1) The property key where the aggregated value is stored. (2) The time dimension to consider. |
AverageVertexDuration |
Calculate the average duration of vertices of a specified time dimension. Time intervals with either the start or end time set to the respective default value will be ignored. | Same as AverageDuration
|
AverageEdgeDuration |
Calculate the average duration of edges of a specified time dimension. Time intervals with either the start or end time set to the respective default value will be ignored. | Same as AverageDuration
|
The TPGM provides several data sources and sinks to read and persist a TPGM graph to a file system, e.g., HDFS.
Data Sources: TemporalCSVDataSource, TemporalIndexedCSVDataSource, TemporalParquetDataSource, TemporalParquetProtobufDataSource
Data Sinks: TemporalCSVDataSink, TemporalIndexedCSVDataSink, TemporalParquetDataSink, TemporalParquetProtobufDataSink
The following is an example line of an edge.csv file written by the temporal csv data sink:
5d777236ed08fd369d717ab2;[5d777162ed08fd369d6f2cd5];5d7771d5ed08fd369d6f77d7;5d7771bded08fd369d6f63c0;Owner;;(1568108898709,9223372036854775807),(-9223372036854775808,9223372036854775807)
with the respective signature:
EdgeId;[GraphId(s)];SourceVertexId;TargetVertexId;Label;Properties;(tx-from,tx-to),(val-from,val-to)
The TPGM is mainly compatible to all operators of the EPGM, i.e., an operator like Subgraph can be
easily applied on a TemporalGraph
instance.
If you want to use EPGM operators that are not available on a TPGM graph, just call the .toLogicalGraph()
or .toLogicalGraphCollection()
function.
Below we provided an exemplary workflow of a typical temporal graph analysis.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TemporalGradoopConfig config = TemporalGradoopConfig.createConfig(env);
TemporalCSVDataSource dataSource = new TemporalCSVDataSource("path/to/graph", temporalGradoopConfig);
TemporalGraph graph = dataSource.getTemporalGraph();
graph
// Filter for necessary vertex and edge types
.subgraph(
new ByLabel<>("Agent").or(new ByLabel<>("Customer")
.and(new ByProperty<>("city", PropertyValue.create("Istanbul")))), new ByLabel<>("calls"))
// Extract a snapshot from historical graph information
.snapshot(new CreatedIn(
LocalDate.of(2018, Month.JANUARY, 1).atStartOfDay(),
LocalDate.of(2018, Month.DECEMBER, 31).atTime(LocalTime.MAX)))
// Remove dangling edges
.verify()
// Apply a grouping with ROLL UP feature
.groupEdgesByRollup(
// Vertex grouping key's
Arrays.asList(GroupingKeys.label(), GroupingKeys.property("city")),
// Vertex aggregation functions
Collections.singletonList(new Count()),
// Edge grouping key's
Arrays.asList(
TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.MONTH_OF_YEAR),
TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.MONTH_OF_YEAR),
TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.ALIGNED_WEEK_OF_YEAR),
TemporalGroupingKeys.timeStamp(TimeDimension.VALID_TIME, TimeDimension.Field.FROM, ChronoField.DAY_OF_YEAR)),
// Edge aggregation functions
Arrays.asList(new Count("cnt"), new AverageEdgeDuration()))
.writeTo(new TemporalCSVDataSink("path/to/output", temporalGradoopConfig));
env.execute("My temporal analysis.");