From 9691f8fa7d7894431accdda270b9c9845501017f Mon Sep 17 00:00:00 2001 From: Alwine Balfanz <100916390+alwba@users.noreply.github.com> Date: Tue, 9 Aug 2022 20:26:13 +0200 Subject: [PATCH 1/7] [#1570] add degree range operator --- .../metric/DegreeRangeEvolution.java | 81 ++++++++++ .../metric/functions/AggregateType.java | 23 +++ .../GroupDegreeTreesToAggregateDegrees.java | 135 ++++++++++++++++ .../metric/DegreeRangeEvolutionTest.java | 146 ++++++++++++++++++ 4 files changed, 385 insertions(+) create mode 100644 gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java create mode 100644 gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java create mode 100644 gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java create mode 100644 gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java new file mode 100644 index 000000000000..65ee1262a2ab --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java @@ -0,0 +1,81 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; +import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator; +import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; +import org.gradoop.temporal.model.api.TimeDimension; +import org.gradoop.temporal.model.impl.TemporalGraph; +import org.gradoop.temporal.model.impl.operators.metric.functions.*; +import org.gradoop.temporal.model.impl.operators.metric.functions.ExtractAllTimePointsReduce; + +import java.util.Objects; +import java.util.SortedSet; +import java.util.TreeMap; + +/** + * Operator that calculates the degree range evolution of a temporal graph for the + * whole lifetime of the graph. + */ +public class DegreeRangeEvolution implements UnaryBaseGraphToValueOperator>> { + /** + * The time dimension that will be considered. + */ + private final TimeDimension dimension; + + /** + * The degree type (IN, OUT, BOTH); + */ + private final VertexDegree degreeType; + + /** + * Creates an instance of this average degree evolution operator. + * + * @param degreeType the degree type to use (IN, OUT, BOTH). + * @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME). + */ + public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) { + this.degreeType = Objects.requireNonNull(degreeType); + this.dimension = Objects.requireNonNull(dimension); + } + + @Override + public DataSet> execute(TemporalGraph graph) { + DataSet>> absoluteDegreeTrees = graph.getEdges() + // 1) Extract vertex id(s) and corresponding time intervals + .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) + // 2) Group them by the vertex id + .groupBy(0) + // 3) For each vertex id, build a degree tree data structure + .reduceGroup(new BuildTemporalDegreeTree()) + // 4) Transform each tree to aggregated evolution + .map(new TransformDeltaToAbsoluteDegreeTree()); + + DataSet> timePoints = absoluteDegreeTrees + // 5) extract all timestamps where degree of any vertex changes + .reduceGroup(new ExtractAllTimePointsReduce()) + .distinct(); + + return absoluteDegreeTrees + // join with interval degree mappings + // 6) Merge trees together and calculate aggregation + .reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.RANGE, timePoints)); + } +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java new file mode 100644 index 000000000000..131e999a93e7 --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java @@ -0,0 +1,23 @@ +package org.gradoop.temporal.model.impl.operators.metric.functions; + +/** + * Enum for defining an aggregate type. + */ +public enum AggregateType { + /** + * Minimum aggregation. + */ + MIN, + /** + * Maximum aggregation. + */ + MAX, + /** + * Average aggregation. + */ + AVG, + /** + * Degree Range aggregation. + */ + RANGE +} \ No newline at end of file diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java new file mode 100644 index 000000000000..2f41036ee5b0 --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java @@ -0,0 +1,135 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric.functions; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.gradoop.common.model.impl.id.GradoopId; + +import java.util.*; +import java.util.stream.Stream; + +/** + * A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree) + * that represents the aggregated degree value for the whole graph at the given time. + */ +public class GroupDegreeTreesToAggregateDegrees + implements GroupReduceFunction>, Tuple2> { + + /** + * The aggregate type to use (min,max,avg). + */ + private final AggregateType aggregateType; + /** + * The timestamps where at least one vertex degree changes. + */ + private final SortedSet timePoints; + + /** + * Creates an instance of this group reduce function. + * + * @param aggregateType the aggregate type to use (min,max,avg). + */ + public GroupDegreeTreesToAggregateDegrees(AggregateType aggregateType, DataSet> timePoints) { + this.aggregateType = aggregateType; + + List> tuples; + try { + tuples = timePoints.collect(); + this.timePoints = new TreeSet<>(); + + for (int i = 0; i < timePoints.count(); i = i + 1) { + this.timePoints.add(tuples.get(i).getField(0)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + @Override + public void reduce(Iterable>> iterable, + Collector> collector) throws Exception { + + // init necessary maps and set + HashMap> degreeTrees = new HashMap<>(); + HashMap vertexDegrees = new HashMap<>(); + + // convert the iterables to a hashmap and remember all possible timestamps + for (Tuple2> tuple : iterable) { + degreeTrees.put(tuple.f0, tuple.f1); + } + + int numberOfVertices = degreeTrees.size(); + + // Add default times + timePoints.add(Long.MIN_VALUE); + + for (Long timePoint : timePoints) { + // skip last default time + if (Long.MAX_VALUE == timePoint) { + continue; + } + // Iterate over all vertices + for (Map.Entry> entry : degreeTrees.entrySet()) { + // Make sure the vertex is registered in the current vertexDegrees capture + if (!vertexDegrees.containsKey(entry.getKey())) { + vertexDegrees.put(entry.getKey(), 0); + } + + // Check if timestamp is in tree, if not, take the lower key + if (entry.getValue().containsKey(timePoint)) { + vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint)); + } else { + Long lowerKey = entry.getValue().lowerKey(timePoint); + if (lowerKey != null) { + vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey)); + } + } + } + + // Here, every tree with this time point is iterated. Now we need to aggregate for the current time. + Optional opt; + Optional opt2; + switch (aggregateType) { + case MIN: + opt = vertexDegrees.values().stream().reduce(Math::min); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + case MAX: + opt = vertexDegrees.values().stream().reduce(Math::max); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + case AVG: + opt = vertexDegrees.values().stream().reduce(Math::addExact); + opt.ifPresent(integer -> collector.collect( + new Tuple2<>(timePoint, (int) Math.ceil((double) integer / (double) numberOfVertices)))); + break; + case RANGE: + opt = vertexDegrees.values().stream().reduce(Math::max); + opt2 = vertexDegrees.values().stream().reduce(Math::min); + opt.flatMap(max -> opt2.map(min -> max - min)); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + default: + throw new IllegalArgumentException("Aggregate type not specified."); + } + } + } +} \ No newline at end of file diff --git a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java new file mode 100644 index 000000000000..b5c65ac28bc6 --- /dev/null +++ b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java @@ -0,0 +1,146 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric; + +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; +import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; +import org.gradoop.temporal.model.api.TimeDimension; +import org.gradoop.temporal.model.impl.TemporalGraph; +import org.gradoop.temporal.util.TemporalGradoopTestBase; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase { + /** + * The expected in-degrees for each vertex label. + */ + private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); + /** + * The expected out-degrees for each vertex label. + */ + private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); + /** + * The expected degrees for each vertex label. + */ + private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); + + static { + // IN DEGREES + EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1)); + + // OUT DEGREES + EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 2)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1)); + + // DEGREES + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); //4,3 + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 0)); //5,1 + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1)); + } + + /** + * The degree type to test. + */ + @Parameterized.Parameter(0) + public VertexDegree degreeType; + + /** + * The expected degree range evolution for the given type. + */ + @Parameterized.Parameter(1) + public List> expectedDegrees; + + /** + * The temporal graph to test the operator. + */ + TemporalGraph testGraph; + + /** + * The parameters to test the operator. + * + * @return three different vertex degree types with its corresponding expected degree evolution. + */ + @Parameterized.Parameters(name = "Test degree type {0}.") + public static Iterable parameters() { + return Arrays.asList( + new Object[] {VertexDegree.IN, EXPECTED_IN_DEGREES}, + new Object[] {VertexDegree.OUT, EXPECTED_OUT_DEGREES}, + new Object[] {VertexDegree.BOTH, EXPECTED_BOTH_DEGREES}); + } + + /** + * Set up the test graph and create the id-label mapping. + * + * @throws Exception in case of an error + */ + @Before + public void setUp() throws Exception { + testGraph = getTestGraphWithValues(); + Collection> idLabelCollection = new HashSet<>(); + testGraph.getVertices().map(v -> new Tuple2<>(v.getId(), v.getLabel())) + .returns(new TypeHint>() { + }).output(new LocalCollectionOutputFormat<>(idLabelCollection)); + getExecutionEnvironment().execute(); + } + + /** + * Test the degree range evolution operator. + * + * @throws Exception in case of an error. + */ + @Test + public void testDegreeVariance() throws Exception { + Collection> resultCollection = new ArrayList<>(); + + final DataSet> resultDataSet = testGraph + .callForValue(new DegreeRangeEvolution(degreeType, TimeDimension.VALID_TIME)); + + resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection)); + getExecutionEnvironment().execute(); + + System.out.println(resultCollection); + + assertTrue(resultCollection.containsAll(expectedDegrees)); + assertTrue(expectedDegrees.containsAll(resultCollection)); + } +} From 091a23d675ec29c97ba8e6709f402a3bfc31a920 Mon Sep 17 00:00:00 2001 From: Alwine Balfanz <100916390+alwba@users.noreply.github.com> Date: Wed, 10 Aug 2022 11:11:32 +0200 Subject: [PATCH 2/7] [#1570] add help functions --- .../functions/ExtractAllTimePointsReduce.java | 49 +++++++++++++++++ .../TransformDeltaToAbsoluteDegreeTree.java | 55 +++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java create mode 100644 gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java new file mode 100644 index 000000000000..ece67f4a2efd --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java @@ -0,0 +1,49 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric.functions; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.gradoop.common.model.impl.id.GradoopId; + +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * Reduce function to extract all timestamps where the degree of a vertex changes. + */ +public class ExtractAllTimePointsReduce implements GroupReduceFunction>, Tuple1> { + + public ExtractAllTimePointsReduce() { + } + + @Override + public void reduce(Iterable>> iterable, Collector> collector) throws Exception { + SortedSet timePoints = new TreeSet<>(); + + for (Tuple2> tuple : iterable) { + timePoints.addAll(tuple.f1.keySet()); + } + + for (Long timePoint: timePoints) { + collector.collect(new Tuple1<>(timePoint)); + } + + } +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java new file mode 100644 index 000000000000..85b8c1fadb2d --- /dev/null +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.gradoop.temporal.model.impl.operators.metric.functions; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.gradoop.common.model.impl.id.GradoopId; + +import java.util.Map; +import java.util.TreeMap; + +/** + * Replaces the degree tree, that just stores the degree changes for each time, with a degree tree that + * stores the actual degree of the vertex at that time. + */ +@FunctionAnnotation.ForwardedFields("f0") +public class TransformDeltaToAbsoluteDegreeTree + implements MapFunction>, + Tuple2>> { + + /** + * To reduce object instantiations. + */ + private TreeMap absoluteDegreeTree; + + @Override + public Tuple2> map( + Tuple2> vIdTreeMapTuple) throws Exception { + // init the degree and the temporal tree + int degree = 0; + absoluteDegreeTree = new TreeMap<>(); + + // aggregate the degrees + for (Map.Entry entry : vIdTreeMapTuple.f1.entrySet()) { + degree += entry.getValue(); + absoluteDegreeTree.put(entry.getKey(), degree); + } + vIdTreeMapTuple.f1 = absoluteDegreeTree; + return vIdTreeMapTuple; + } +} \ No newline at end of file From 26024a351d2a1a7e061c1fbb7bc9bc0c25a73944 Mon Sep 17 00:00:00 2001 From: Alwine Balfanz <100916390+alwba@users.noreply.github.com> Date: Wed, 10 Aug 2022 19:01:23 +0200 Subject: [PATCH 3/7] [#1570] reformat code --- .../metric/DegreeRangeEvolution.java | 83 +++---- .../metric/functions/AggregateType.java | 49 +++-- .../functions/ExtractAllTimePointsReduce.java | 30 +-- .../GroupDegreeTreesToAggregateDegrees.java | 200 +++++++++-------- .../TransformDeltaToAbsoluteDegreeTree.java | 36 +-- .../metric/DegreeRangeEvolutionTest.java | 208 +++++++++--------- 6 files changed, 317 insertions(+), 289 deletions(-) diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java index 65ee1262a2ab..c45559b4b68e 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java @@ -23,11 +23,14 @@ import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; import org.gradoop.temporal.model.api.TimeDimension; import org.gradoop.temporal.model.impl.TemporalGraph; -import org.gradoop.temporal.model.impl.operators.metric.functions.*; import org.gradoop.temporal.model.impl.operators.metric.functions.ExtractAllTimePointsReduce; +import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees; +import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType; +import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree; +import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree; +import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval; import java.util.Objects; -import java.util.SortedSet; import java.util.TreeMap; /** @@ -35,47 +38,47 @@ * whole lifetime of the graph. */ public class DegreeRangeEvolution implements UnaryBaseGraphToValueOperator>> { - /** - * The time dimension that will be considered. - */ - private final TimeDimension dimension; + /** + * The time dimension that will be considered. + */ + private final TimeDimension dimension; - /** - * The degree type (IN, OUT, BOTH); - */ - private final VertexDegree degreeType; + /** + * The degree type (IN, OUT, BOTH); + */ + private final VertexDegree degreeType; - /** - * Creates an instance of this average degree evolution operator. - * - * @param degreeType the degree type to use (IN, OUT, BOTH). - * @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME). - */ - public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) { - this.degreeType = Objects.requireNonNull(degreeType); - this.dimension = Objects.requireNonNull(dimension); - } + /** + * Creates an instance of this average degree evolution operator. + * + * @param degreeType the degree type to use (IN, OUT, BOTH). + * @param dimension the time dimension to use (VALID_TIME, TRANSACTION_TIME). + */ + public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) { + this.degreeType = Objects.requireNonNull(degreeType); + this.dimension = Objects.requireNonNull(dimension); + } - @Override - public DataSet> execute(TemporalGraph graph) { - DataSet>> absoluteDegreeTrees = graph.getEdges() - // 1) Extract vertex id(s) and corresponding time intervals - .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) - // 2) Group them by the vertex id - .groupBy(0) - // 3) For each vertex id, build a degree tree data structure - .reduceGroup(new BuildTemporalDegreeTree()) - // 4) Transform each tree to aggregated evolution - .map(new TransformDeltaToAbsoluteDegreeTree()); + @Override + public DataSet> execute(TemporalGraph graph) { + DataSet>> absoluteDegreeTrees = graph.getEdges() + // 1) Extract vertex id(s) and corresponding time intervals + .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) + // 2) Group them by the vertex id + .groupBy(0) + // 3) For each vertex id, build a degree tree data structure + .reduceGroup(new BuildTemporalDegreeTree()) + // 4) Transform each tree to aggregated evolution + .map(new TransformDeltaToAbsoluteDegreeTree()); - DataSet> timePoints = absoluteDegreeTrees - // 5) extract all timestamps where degree of any vertex changes - .reduceGroup(new ExtractAllTimePointsReduce()) - .distinct(); + DataSet> timePoints = absoluteDegreeTrees + // 5) extract all timestamps where degree of any vertex changes + .reduceGroup(new ExtractAllTimePointsReduce()) + .distinct(); - return absoluteDegreeTrees - // join with interval degree mappings - // 6) Merge trees together and calculate aggregation - .reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.RANGE, timePoints)); - } + return absoluteDegreeTrees + // join with interval degree mappings + // 6) Merge trees together and calculate aggregation + .reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.RANGE, timePoints)); + } } diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java index 131e999a93e7..ecb981707eef 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java @@ -1,23 +1,38 @@ +/* + * Copyright © 2014 - 2021 Leipzig University (Database Research Group) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.gradoop.temporal.model.impl.operators.metric.functions; /** * Enum for defining an aggregate type. */ public enum AggregateType { - /** - * Minimum aggregation. - */ - MIN, - /** - * Maximum aggregation. - */ - MAX, - /** - * Average aggregation. - */ - AVG, - /** - * Degree Range aggregation. - */ - RANGE -} \ No newline at end of file + /** + * Minimum aggregation. + */ + MIN, + /** + * Maximum aggregation. + */ + MAX, + /** + * Average aggregation. + */ + AVG, + /** + * Degree Range aggregation. + */ + RANGE +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java index ece67f4a2efd..2c66d59dee52 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java @@ -30,20 +30,24 @@ */ public class ExtractAllTimePointsReduce implements GroupReduceFunction>, Tuple1> { - public ExtractAllTimePointsReduce() { + /** + * Creates an instance of this group reduce function. + */ + public ExtractAllTimePointsReduce() { + } + + @Override + public void reduce(Iterable>> iterable, Collector> collector) + throws Exception { + SortedSet timePoints = new TreeSet<>(); + + for (Tuple2> tuple : iterable) { + timePoints.addAll(tuple.f1.keySet()); } - @Override - public void reduce(Iterable>> iterable, Collector> collector) throws Exception { - SortedSet timePoints = new TreeSet<>(); - - for (Tuple2> tuple : iterable) { - timePoints.addAll(tuple.f1.keySet()); - } - - for (Long timePoint: timePoints) { - collector.collect(new Tuple1<>(timePoint)); - } - + for (Long timePoint : timePoints) { + collector.collect(new Tuple1<>(timePoint)); } + + } } diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java index 2f41036ee5b0..0427690015c7 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java @@ -22,114 +22,120 @@ import org.apache.flink.util.Collector; import org.gradoop.common.model.impl.id.GradoopId; -import java.util.*; -import java.util.stream.Stream; +import java.util.TreeMap; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; /** * A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree) * that represents the aggregated degree value for the whole graph at the given time. */ public class GroupDegreeTreesToAggregateDegrees - implements GroupReduceFunction>, Tuple2> { - - /** - * The aggregate type to use (min,max,avg). - */ - private final AggregateType aggregateType; - /** - * The timestamps where at least one vertex degree changes. - */ - private final SortedSet timePoints; - - /** - * Creates an instance of this group reduce function. - * - * @param aggregateType the aggregate type to use (min,max,avg). - */ - public GroupDegreeTreesToAggregateDegrees(AggregateType aggregateType, DataSet> timePoints) { - this.aggregateType = aggregateType; - - List> tuples; - try { - tuples = timePoints.collect(); - this.timePoints = new TreeSet<>(); - - for (int i = 0; i < timePoints.count(); i = i + 1) { - this.timePoints.add(tuples.get(i).getField(0)); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - + implements GroupReduceFunction>, Tuple2> { + + /** + * The aggregate type to use (min,max,avg). + */ + private final AggregateType aggregateType; + /** + * The timestamps where at least one vertex degree changes. + */ + private final SortedSet timePoints; + + /** + * Creates an instance of this group reduce function. + * + * @param aggregateType the aggregate type to use (min,max,avg). + * @param timePoints the time points were vertex degrees change. + */ + public GroupDegreeTreesToAggregateDegrees(AggregateType aggregateType, DataSet> timePoints) { + this.aggregateType = aggregateType; + + List> tuples; + try { + tuples = timePoints.collect(); + this.timePoints = new TreeSet<>(); + + for (int i = 0; i < timePoints.count(); i = i + 1) { + this.timePoints.add(tuples.get(i).getField(0)); + } + } catch (Exception e) { + throw new RuntimeException(e); } - @Override - public void reduce(Iterable>> iterable, - Collector> collector) throws Exception { + } + + @Override + public void reduce(Iterable>> iterable, + Collector> collector) throws Exception { - // init necessary maps and set - HashMap> degreeTrees = new HashMap<>(); - HashMap vertexDegrees = new HashMap<>(); + // init necessary maps and set + HashMap> degreeTrees = new HashMap<>(); + HashMap vertexDegrees = new HashMap<>(); + + // convert the iterables to a hashmap and remember all possible timestamps + for (Tuple2> tuple : iterable) { + degreeTrees.put(tuple.f0, tuple.f1); + } - // convert the iterables to a hashmap and remember all possible timestamps - for (Tuple2> tuple : iterable) { - degreeTrees.put(tuple.f0, tuple.f1); + int numberOfVertices = degreeTrees.size(); + + // Add default times + timePoints.add(Long.MIN_VALUE); + + for (Long timePoint : timePoints) { + // skip last default time + if (Long.MAX_VALUE == timePoint) { + continue; + } + // Iterate over all vertices + for (Map.Entry> entry : degreeTrees.entrySet()) { + // Make sure the vertex is registered in the current vertexDegrees capture + if (!vertexDegrees.containsKey(entry.getKey())) { + vertexDegrees.put(entry.getKey(), 0); } - int numberOfVertices = degreeTrees.size(); - - // Add default times - timePoints.add(Long.MIN_VALUE); - - for (Long timePoint : timePoints) { - // skip last default time - if (Long.MAX_VALUE == timePoint) { - continue; - } - // Iterate over all vertices - for (Map.Entry> entry : degreeTrees.entrySet()) { - // Make sure the vertex is registered in the current vertexDegrees capture - if (!vertexDegrees.containsKey(entry.getKey())) { - vertexDegrees.put(entry.getKey(), 0); - } - - // Check if timestamp is in tree, if not, take the lower key - if (entry.getValue().containsKey(timePoint)) { - vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint)); - } else { - Long lowerKey = entry.getValue().lowerKey(timePoint); - if (lowerKey != null) { - vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey)); - } - } - } - - // Here, every tree with this time point is iterated. Now we need to aggregate for the current time. - Optional opt; - Optional opt2; - switch (aggregateType) { - case MIN: - opt = vertexDegrees.values().stream().reduce(Math::min); - opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); - break; - case MAX: - opt = vertexDegrees.values().stream().reduce(Math::max); - opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); - break; - case AVG: - opt = vertexDegrees.values().stream().reduce(Math::addExact); - opt.ifPresent(integer -> collector.collect( - new Tuple2<>(timePoint, (int) Math.ceil((double) integer / (double) numberOfVertices)))); - break; - case RANGE: - opt = vertexDegrees.values().stream().reduce(Math::max); - opt2 = vertexDegrees.values().stream().reduce(Math::min); - opt.flatMap(max -> opt2.map(min -> max - min)); - opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); - break; - default: - throw new IllegalArgumentException("Aggregate type not specified."); - } + // Check if timestamp is in tree, if not, take the lower key + if (entry.getValue().containsKey(timePoint)) { + vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint)); + } else { + Long lowerKey = entry.getValue().lowerKey(timePoint); + if (lowerKey != null) { + vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey)); + } } + } + + // Here, every tree with this time point is iterated. Now we need to aggregate for the current time. + Optional opt; + Optional opt2; + switch (aggregateType) { + case MIN: + opt = vertexDegrees.values().stream().reduce(Math::min); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + case MAX: + opt = vertexDegrees.values().stream().reduce(Math::max); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + case AVG: + opt = vertexDegrees.values().stream().reduce(Math::addExact); + opt.ifPresent(integer -> collector.collect( + new Tuple2<>(timePoint, (int) Math.ceil((double) integer / (double) numberOfVertices)))); + break; + case RANGE: + opt = vertexDegrees.values().stream().reduce(Math::max); + opt2 = vertexDegrees.values().stream().reduce(Math::min); + opt.flatMap(max -> opt2.map(min -> max - min)); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + default: + throw new IllegalArgumentException("Aggregate type not specified."); + } } -} \ No newline at end of file + } +} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java index 85b8c1fadb2d..719e76dcaccd 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/TransformDeltaToAbsoluteDegreeTree.java @@ -32,24 +32,24 @@ public class TransformDeltaToAbsoluteDegreeTree implements MapFunction>, Tuple2>> { - /** - * To reduce object instantiations. - */ - private TreeMap absoluteDegreeTree; + /** + * To reduce object instantiations. + */ + private TreeMap absoluteDegreeTree; - @Override - public Tuple2> map( - Tuple2> vIdTreeMapTuple) throws Exception { - // init the degree and the temporal tree - int degree = 0; - absoluteDegreeTree = new TreeMap<>(); + @Override + public Tuple2> map( + Tuple2> vIdTreeMapTuple) throws Exception { + // init the degree and the temporal tree + int degree = 0; + absoluteDegreeTree = new TreeMap<>(); - // aggregate the degrees - for (Map.Entry entry : vIdTreeMapTuple.f1.entrySet()) { - degree += entry.getValue(); - absoluteDegreeTree.put(entry.getKey(), degree); - } - vIdTreeMapTuple.f1 = absoluteDegreeTree; - return vIdTreeMapTuple; + // aggregate the degrees + for (Map.Entry entry : vIdTreeMapTuple.f1.entrySet()) { + degree += entry.getValue(); + absoluteDegreeTree.put(entry.getKey(), degree); } -} \ No newline at end of file + vIdTreeMapTuple.f1 = absoluteDegreeTree; + return vIdTreeMapTuple; + } +} diff --git a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java index b5c65ac28bc6..4137a51877a8 100644 --- a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java +++ b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java @@ -39,108 +39,108 @@ @RunWith(Parameterized.class) public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase { - /** - * The expected in-degrees for each vertex label. - */ - private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); - /** - * The expected out-degrees for each vertex label. - */ - private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); - /** - * The expected degrees for each vertex label. - */ - private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); - - static { - // IN DEGREES - EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1)); - EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1)); - - // OUT DEGREES - EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 2)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1)); - EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1)); - - // DEGREES - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); //4,3 - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 0)); //5,1 - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1)); - } - - /** - * The degree type to test. - */ - @Parameterized.Parameter(0) - public VertexDegree degreeType; - - /** - * The expected degree range evolution for the given type. - */ - @Parameterized.Parameter(1) - public List> expectedDegrees; - - /** - * The temporal graph to test the operator. - */ - TemporalGraph testGraph; - - /** - * The parameters to test the operator. - * - * @return three different vertex degree types with its corresponding expected degree evolution. - */ - @Parameterized.Parameters(name = "Test degree type {0}.") - public static Iterable parameters() { - return Arrays.asList( - new Object[] {VertexDegree.IN, EXPECTED_IN_DEGREES}, - new Object[] {VertexDegree.OUT, EXPECTED_OUT_DEGREES}, - new Object[] {VertexDegree.BOTH, EXPECTED_BOTH_DEGREES}); - } - - /** - * Set up the test graph and create the id-label mapping. - * - * @throws Exception in case of an error - */ - @Before - public void setUp() throws Exception { - testGraph = getTestGraphWithValues(); - Collection> idLabelCollection = new HashSet<>(); - testGraph.getVertices().map(v -> new Tuple2<>(v.getId(), v.getLabel())) - .returns(new TypeHint>() { - }).output(new LocalCollectionOutputFormat<>(idLabelCollection)); - getExecutionEnvironment().execute(); - } - - /** - * Test the degree range evolution operator. - * - * @throws Exception in case of an error. - */ - @Test - public void testDegreeVariance() throws Exception { - Collection> resultCollection = new ArrayList<>(); - - final DataSet> resultDataSet = testGraph - .callForValue(new DegreeRangeEvolution(degreeType, TimeDimension.VALID_TIME)); - - resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection)); - getExecutionEnvironment().execute(); - - System.out.println(resultCollection); - - assertTrue(resultCollection.containsAll(expectedDegrees)); - assertTrue(expectedDegrees.containsAll(resultCollection)); - } + /** + * The expected in-degrees for each vertex label. + */ + private static final List> EXPECTED_IN_DEGREES = new ArrayList<>(); + /** + * The expected out-degrees for each vertex label. + */ + private static final List> EXPECTED_OUT_DEGREES = new ArrayList<>(); + /** + * The expected degrees for each vertex label. + */ + private static final List> EXPECTED_BOTH_DEGREES = new ArrayList<>(); + + static { + // IN DEGREES + EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); // nov= 4 + EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); // nov=4, min=0, max=1 + EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2)); // + EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1)); + + // OUT DEGREES + EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 2)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 1)); + EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 1)); + + // DEGREES + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); //4,3 //flatmap does not work + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 0)); //5,1 // calculates min as 0; the number of vertices not correct + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1)); + } + + /** + * The degree type to test. + */ + @Parameterized.Parameter(0) + public VertexDegree degreeType; + + /** + * The expected degree range evolution for the given type. + */ + @Parameterized.Parameter(1) + public List> expectedDegrees; + + /** + * The temporal graph to test the operator. + */ + TemporalGraph testGraph; + + /** + * The parameters to test the operator. + * + * @return three different vertex degree types with its corresponding expected degree evolution. + */ + @Parameterized.Parameters(name = "Test degree type {0}.") + public static Iterable parameters() { + return Arrays.asList( + new Object[]{VertexDegree.IN, EXPECTED_IN_DEGREES}, + new Object[]{VertexDegree.OUT, EXPECTED_OUT_DEGREES}, + new Object[]{VertexDegree.BOTH, EXPECTED_BOTH_DEGREES}); + } + + /** + * Set up the test graph and create the id-label mapping. + * + * @throws Exception in case of an error + */ + @Before + public void setUp() throws Exception { + testGraph = getTestGraphWithValues(); + Collection> idLabelCollection = new HashSet<>(); + testGraph.getVertices().map(v -> new Tuple2<>(v.getId(), v.getLabel())) + .returns(new TypeHint>() { + }).output(new LocalCollectionOutputFormat<>(idLabelCollection)); + getExecutionEnvironment().execute(); + } + + /** + * Test the degree range evolution operator. + * + * @throws Exception in case of an error. + */ + @Test + public void testDegreeRange() throws Exception { + Collection> resultCollection = new ArrayList<>(); + + final DataSet> resultDataSet = testGraph + .callForValue(new DegreeRangeEvolution(degreeType, TimeDimension.VALID_TIME)); + + resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection)); + getExecutionEnvironment().execute(); + + System.out.println(resultCollection); + + assertTrue(resultCollection.containsAll(expectedDegrees)); + assertTrue(expectedDegrees.containsAll(resultCollection)); + } } From e9fd33e81df8fd1eb5c8334f139bf2ec5bbbe78e Mon Sep 17 00:00:00 2001 From: Alwine Balfanz <100916390+alwba@users.noreply.github.com> Date: Wed, 10 Aug 2022 21:01:13 +0200 Subject: [PATCH 4/7] [#1570] fix unit test and reformat code --- .../functions/ExtractAllTimePointsReduce.java | 4 +- .../GroupDegreeTreesToAggregateDegrees.java | 42 +++++++++---------- .../metric/DegreeRangeEvolutionTest.java | 12 +++--- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java index 2c66d59dee52..ae7335d9e2e3 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java @@ -37,8 +37,8 @@ public ExtractAllTimePointsReduce() { } @Override - public void reduce(Iterable>> iterable, Collector> collector) - throws Exception { + public void reduce(Iterable>> iterable, + Collector> collector) throws Exception { SortedSet timePoints = new TreeSet<>(); for (Tuple2> tuple : iterable) { diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java index 0427690015c7..b7a1bed2ca5b 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java @@ -114,27 +114,27 @@ public void reduce(Iterable>> iterable, Optional opt; Optional opt2; switch (aggregateType) { - case MIN: - opt = vertexDegrees.values().stream().reduce(Math::min); - opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); - break; - case MAX: - opt = vertexDegrees.values().stream().reduce(Math::max); - opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); - break; - case AVG: - opt = vertexDegrees.values().stream().reduce(Math::addExact); - opt.ifPresent(integer -> collector.collect( - new Tuple2<>(timePoint, (int) Math.ceil((double) integer / (double) numberOfVertices)))); - break; - case RANGE: - opt = vertexDegrees.values().stream().reduce(Math::max); - opt2 = vertexDegrees.values().stream().reduce(Math::min); - opt.flatMap(max -> opt2.map(min -> max - min)); - opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); - break; - default: - throw new IllegalArgumentException("Aggregate type not specified."); + case MIN: + opt = vertexDegrees.values().stream().reduce(Math::min); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + case MAX: + opt = vertexDegrees.values().stream().reduce(Math::max); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + case AVG: + opt = vertexDegrees.values().stream().reduce(Math::addExact); + opt.ifPresent(integer -> collector.collect( + new Tuple2<>(timePoint, (int) Math.ceil((double) integer / (double) numberOfVertices)))); + break; + case RANGE: + opt = vertexDegrees.values().stream().reduce(Math::max); + opt2 = vertexDegrees.values().stream().reduce(Math::min); + opt.flatMap(max -> opt2.map(min -> max - min)); + opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); + break; + default: + throw new IllegalArgumentException("Aggregate type not specified."); } } } diff --git a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java index 4137a51877a8..49792fb68da0 100644 --- a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java +++ b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java @@ -54,9 +54,9 @@ public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase { static { // IN DEGREES - EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); // nov= 4 - EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); // nov=4, min=0, max=1 - EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2)); // + EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 1)); + EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 2)); EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 1)); EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 1)); EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 1)); @@ -72,8 +72,8 @@ public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase { // DEGREES EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); //4,3 //flatmap does not work - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 0)); //5,1 // calculates min as 0; the number of vertices not correct + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 3)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 1)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1)); } @@ -138,8 +138,6 @@ public void testDegreeRange() throws Exception { resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection)); getExecutionEnvironment().execute(); - System.out.println(resultCollection); - assertTrue(resultCollection.containsAll(expectedDegrees)); assertTrue(expectedDegrees.containsAll(resultCollection)); } From 58b48173d0544634dc3a7e4a971009f64c457cab Mon Sep 17 00:00:00 2001 From: Alwine Balfanz <100916390+alwba@users.noreply.github.com> Date: Thu, 11 Aug 2022 15:22:25 +0200 Subject: [PATCH 5/7] [#1570] put timestamp extraction inside the aggregation group reduce --- .../metric/DegreeRangeEvolution.java | 19 +++----------- .../GroupDegreeTreesToAggregateDegrees.java | 25 +++---------------- 2 files changed, 6 insertions(+), 38 deletions(-) diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java index c45559b4b68e..8c2fc90665d0 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java @@ -16,14 +16,11 @@ package org.gradoop.temporal.model.impl.operators.metric; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; -import org.gradoop.common.model.impl.id.GradoopId; import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator; import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; import org.gradoop.temporal.model.api.TimeDimension; import org.gradoop.temporal.model.impl.TemporalGraph; -import org.gradoop.temporal.model.impl.operators.metric.functions.ExtractAllTimePointsReduce; import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees; import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType; import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree; @@ -31,7 +28,6 @@ import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval; import java.util.Objects; -import java.util.TreeMap; /** * Operator that calculates the degree range evolution of a temporal graph for the @@ -61,7 +57,7 @@ public DegreeRangeEvolution(VertexDegree degreeType, TimeDimension dimension) { @Override public DataSet> execute(TemporalGraph graph) { - DataSet>> absoluteDegreeTrees = graph.getEdges() + return graph.getEdges() // 1) Extract vertex id(s) and corresponding time intervals .flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType)) // 2) Group them by the vertex id @@ -69,16 +65,7 @@ public DataSet> execute(TemporalGraph graph) { // 3) For each vertex id, build a degree tree data structure .reduceGroup(new BuildTemporalDegreeTree()) // 4) Transform each tree to aggregated evolution - .map(new TransformDeltaToAbsoluteDegreeTree()); - - DataSet> timePoints = absoluteDegreeTrees - // 5) extract all timestamps where degree of any vertex changes - .reduceGroup(new ExtractAllTimePointsReduce()) - .distinct(); - - return absoluteDegreeTrees - // join with interval degree mappings - // 6) Merge trees together and calculate aggregation - .reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.RANGE, timePoints)); + .map(new TransformDeltaToAbsoluteDegreeTree()) + .reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.RANGE)); } } diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java index b7a1bed2ca5b..a9e3b18ed3ea 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java @@ -16,8 +16,6 @@ package org.gradoop.temporal.model.impl.operators.metric.functions; import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.gradoop.common.model.impl.id.GradoopId; @@ -25,7 +23,6 @@ import java.util.TreeMap; import java.util.SortedSet; import java.util.TreeSet; -import java.util.List; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -41,32 +38,14 @@ public class GroupDegreeTreesToAggregateDegrees * The aggregate type to use (min,max,avg). */ private final AggregateType aggregateType; - /** - * The timestamps where at least one vertex degree changes. - */ - private final SortedSet timePoints; /** * Creates an instance of this group reduce function. * * @param aggregateType the aggregate type to use (min,max,avg). - * @param timePoints the time points were vertex degrees change. */ - public GroupDegreeTreesToAggregateDegrees(AggregateType aggregateType, DataSet> timePoints) { + public GroupDegreeTreesToAggregateDegrees(AggregateType aggregateType) { this.aggregateType = aggregateType; - - List> tuples; - try { - tuples = timePoints.collect(); - this.timePoints = new TreeSet<>(); - - for (int i = 0; i < timePoints.count(); i = i + 1) { - this.timePoints.add(tuples.get(i).getField(0)); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } @Override @@ -76,10 +55,12 @@ public void reduce(Iterable>> iterable, // init necessary maps and set HashMap> degreeTrees = new HashMap<>(); HashMap vertexDegrees = new HashMap<>(); + SortedSet timePoints = new TreeSet<>(); // convert the iterables to a hashmap and remember all possible timestamps for (Tuple2> tuple : iterable) { degreeTrees.put(tuple.f0, tuple.f1); + timePoints.addAll(tuple.f1.keySet()); } int numberOfVertices = degreeTrees.size(); From cac1c2d71ed359fcaca570406c9b325f463bb1ae Mon Sep 17 00:00:00 2001 From: Alwine Balfanz <100916390+alwba@users.noreply.github.com> Date: Thu, 11 Aug 2022 19:34:49 +0200 Subject: [PATCH 6/7] [#1570] delete a not needed help function --- .../functions/ExtractAllTimePointsReduce.java | 53 ------------------- 1 file changed, 53 deletions(-) delete mode 100644 gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java deleted file mode 100644 index ae7335d9e2e3..000000000000 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/ExtractAllTimePointsReduce.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright © 2014 - 2021 Leipzig University (Database Research Group) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.gradoop.temporal.model.impl.operators.metric.functions; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; -import org.gradoop.common.model.impl.id.GradoopId; - -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; - -/** - * Reduce function to extract all timestamps where the degree of a vertex changes. - */ -public class ExtractAllTimePointsReduce implements GroupReduceFunction>, Tuple1> { - - /** - * Creates an instance of this group reduce function. - */ - public ExtractAllTimePointsReduce() { - } - - @Override - public void reduce(Iterable>> iterable, - Collector> collector) throws Exception { - SortedSet timePoints = new TreeSet<>(); - - for (Tuple2> tuple : iterable) { - timePoints.addAll(tuple.f1.keySet()); - } - - for (Long timePoint : timePoints) { - collector.collect(new Tuple1<>(timePoint)); - } - - } -} From 638270c0f80b87263494ee2b8e6ced77b85a97be Mon Sep 17 00:00:00 2001 From: Alwine Balfanz <100916390+alwba@users.noreply.github.com> Date: Fri, 9 Sep 2022 18:39:34 +0200 Subject: [PATCH 7/7] [1570] delete unused code and fix unit test --- .../metric/DegreeRangeEvolution.java | 5 +-- .../metric/functions/AggregateType.java | 38 ---------------- ...ava => GroupDegreeTreesToDegreeRange.java} | 43 +++---------------- .../metric/DegreeRangeEvolutionTest.java | 2 +- 4 files changed, 9 insertions(+), 79 deletions(-) delete mode 100644 gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java rename gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/{GroupDegreeTreesToAggregateDegrees.java => GroupDegreeTreesToDegreeRange.java} (67%) diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java index 8c2fc90665d0..fa3fd5c3a69b 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolution.java @@ -21,8 +21,7 @@ import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree; import org.gradoop.temporal.model.api.TimeDimension; import org.gradoop.temporal.model.impl.TemporalGraph; -import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees; -import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType; +import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToDegreeRange; import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree; import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree; import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval; @@ -66,6 +65,6 @@ public DataSet> execute(TemporalGraph graph) { .reduceGroup(new BuildTemporalDegreeTree()) // 4) Transform each tree to aggregated evolution .map(new TransformDeltaToAbsoluteDegreeTree()) - .reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.RANGE)); + .reduceGroup(new GroupDegreeTreesToDegreeRange()); } } diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java deleted file mode 100644 index ecb981707eef..000000000000 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/AggregateType.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright © 2014 - 2021 Leipzig University (Database Research Group) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.gradoop.temporal.model.impl.operators.metric.functions; - -/** - * Enum for defining an aggregate type. - */ -public enum AggregateType { - /** - * Minimum aggregation. - */ - MIN, - /** - * Maximum aggregation. - */ - MAX, - /** - * Average aggregation. - */ - AVG, - /** - * Degree Range aggregation. - */ - RANGE -} diff --git a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java similarity index 67% rename from gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java rename to gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java index a9e3b18ed3ea..ba8bc2b7ebb9 100644 --- a/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToAggregateDegrees.java +++ b/gradoop-temporal/src/main/java/org/gradoop/temporal/model/impl/operators/metric/functions/GroupDegreeTreesToDegreeRange.java @@ -25,27 +25,20 @@ import java.util.TreeSet; import java.util.HashMap; import java.util.Map; -import java.util.Optional; /** * A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree) * that represents the aggregated degree value for the whole graph at the given time. */ -public class GroupDegreeTreesToAggregateDegrees +public class GroupDegreeTreesToDegreeRange implements GroupReduceFunction>, Tuple2> { - /** - * The aggregate type to use (min,max,avg). - */ - private final AggregateType aggregateType; - /** * Creates an instance of this group reduce function. * - * @param aggregateType the aggregate type to use (min,max,avg). */ - public GroupDegreeTreesToAggregateDegrees(AggregateType aggregateType) { - this.aggregateType = aggregateType; + public GroupDegreeTreesToDegreeRange() { + } @Override @@ -63,8 +56,6 @@ public void reduce(Iterable>> iterable, timePoints.addAll(tuple.f1.keySet()); } - int numberOfVertices = degreeTrees.size(); - // Add default times timePoints.add(Long.MIN_VALUE); @@ -92,31 +83,9 @@ public void reduce(Iterable>> iterable, } // Here, every tree with this time point is iterated. Now we need to aggregate for the current time. - Optional opt; - Optional opt2; - switch (aggregateType) { - case MIN: - opt = vertexDegrees.values().stream().reduce(Math::min); - opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); - break; - case MAX: - opt = vertexDegrees.values().stream().reduce(Math::max); - opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); - break; - case AVG: - opt = vertexDegrees.values().stream().reduce(Math::addExact); - opt.ifPresent(integer -> collector.collect( - new Tuple2<>(timePoint, (int) Math.ceil((double) integer / (double) numberOfVertices)))); - break; - case RANGE: - opt = vertexDegrees.values().stream().reduce(Math::max); - opt2 = vertexDegrees.values().stream().reduce(Math::min); - opt.flatMap(max -> opt2.map(min -> max - min)); - opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer))); - break; - default: - throw new IllegalArgumentException("Aggregate type not specified."); - } + int maxDegree = vertexDegrees.values().stream().reduce(Math::max).orElse(0); + int minDegree = vertexDegrees.values().stream().reduce(Math::min).orElse(0); + collector.collect(new Tuple2<>(timePoint, maxDegree - minDegree)); } } } diff --git a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java index 49792fb68da0..bf831207a896 100644 --- a/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java +++ b/gradoop-temporal/src/test/java/org/gradoop/temporal/model/impl/operators/metric/DegreeRangeEvolutionTest.java @@ -72,7 +72,7 @@ public class DegreeRangeEvolutionTest extends TemporalGradoopTestBase { // DEGREES EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 1)); - EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 3)); + EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 2)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 1)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 2)); EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 1));