Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1559 Add min max avg degree evolution operators #1583

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;

/**
* Operator that calculates the evolution of the graph's average degree for the whole lifetime of the graph.
* The result is a triple dataset {@link DataSet<Tuple3>} in form {@code <Long, Long, Float>}. It
* represents a time interval (first and second element) and the aggregated degree value for this interval
* (3rd element).
*/
public class AvgDegreeEvolution extends BaseAggregateDegreeEvolution {
/**
* Creates an instance of this average degree evolution operator using {@link TimeDimension#VALID_TIME}
* as default time dimension and {@link VertexDegree#BOTH} as default degree type.
*/
public AvgDegreeEvolution() {
super();
}

/**
* Creates an instance of this average degree evolution operator using the given time dimension and
* degree type.
*
* @param degreeType the degree type (IN, OUT or BOTH)
* @param dimension the time dimension to consider (VALID_TIME or TRANSACTION_TIME)
*/
public AvgDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
super(degreeType, dimension);
}

@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph).reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.AVG));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;

import java.util.Objects;
import java.util.TreeMap;

/**
* Abstract class as parent for aggregated degree evolution operators.
*/
abstract class BaseAggregateDegreeEvolution
implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple3<Long, Long, Float>>> {

/**
* The time dimension that will be considered.
*/
private TimeDimension dimension = TimeDimension.VALID_TIME;

/**
* The degree type (IN, OUT, BOTH);
*/
private VertexDegree degreeType = VertexDegree.BOTH;

/**
* Default constructor using {@link TimeDimension#VALID_TIME} as default time dimension and
* {@link VertexDegree#BOTH} as default degree type.
*/
protected BaseAggregateDegreeEvolution() {
}

/**
* Abstract constructor for the aggregated degree evolution of a graph.
*
* @param degreeType the degree type (IN, OUT or BOTH)
* @param dimension the time dimension to consider (VALID_TIME or TRANSACTION_TIME)
*/
protected BaseAggregateDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
this.degreeType = Objects.requireNonNull(degreeType);
this.dimension = Objects.requireNonNull(dimension);
}

/**
* A pre-process function to prevent duplicate code for min, max and avg aggregation. The result is an
* absolute degree tree for each vertex (id).
*
* @param graph the temporal graph as input
* @return a dataset containing an absolute degree tree for each vertex identifier
*/
public DataSet<Tuple2<GradoopId, TreeMap<Long, Integer>>> preProcess(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
// 2) Group them by the vertex id
.groupBy(0)
// 3) For each vertex id, build a degree tree data structure
.reduceGroup(new BuildTemporalDegreeTree())
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;

/**
* Operator that calculates the evolution of the graph's maximum degree for the whole lifetime of the graph.
* The result is a triple dataset {@link DataSet<Tuple3>} in form {@code <Long, Long, Float>}. It
* represents a time interval (first and second element) and the aggregated degree value for this interval
* (3rd element).
*/
public class MaxDegreeEvolution extends BaseAggregateDegreeEvolution {
/**
* Creates an instance of this maximum degree evolution operator using {@link TimeDimension#VALID_TIME}
* as default time dimension and {@link VertexDegree#BOTH} as default degree type.
*/
public MaxDegreeEvolution() {
super();
}

/**
* Creates an instance of this maximum degree evolution operator using the given time dimension and
* degree type.
*
* @param degreeType the degree type (IN, OUT or BOTH)
* @param dimension the time dimension to consider (VALID_TIME or TRANSACTION_TIME)
*/
public MaxDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
super(degreeType, dimension);
}

@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph).reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.MAX));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregateType;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;

/**
* Operator that calculates the evolution of the graph's minimum degree for the whole lifetime of the graph.
* The result is a triple dataset {@link DataSet<Tuple3>} in form {@code <Long, Long, Float>}. It
* represents a time interval (first and second element) and the aggregated degree value for this interval
* (3rd element).
*/
public class MinDegreeEvolution extends BaseAggregateDegreeEvolution {

/**
* Creates an instance of this minimum degree evolution operator using {@link TimeDimension#VALID_TIME}
* as default time dimension and {@link VertexDegree#BOTH} as default degree type.
*/
public MinDegreeEvolution() {
super();
}

/**
* Creates an instance of this minimum degree evolution operator using the given time dimension and
* degree type.
*
* @param degreeType the degree type (IN, OUT or BOTH)
* @param dimension the time dimension to consider (VALID_TIME or TRANSACTION_TIME)
*/
public MinDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
super(degreeType, dimension);
}

@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph).reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregateType.MIN));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

/**
* Enum for defining an aggregate type.
*/
public enum AggregateType {
/**
* Minimum aggregation.
*/
MIN,
/**
* Maximum aggregation.
*/
MAX,
/**
* Average aggregation.
*/
AVG
}
Loading