Skip to content

Commit

Permalink
Improving network analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
pkiraly committed Sep 23, 2020
1 parent 92098a0 commit feb92cf
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 58 deletions.
9 changes: 7 additions & 2 deletions common-script
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,18 @@ do_functional_analysis() {
do_network_analysis() {
printf "%s %s> [network-analysis]\n" $(date +"%F %T")
printf "%s %s> ./network-analysis --defaultRecordType BOOKS ${TYPE_PARAMS} --outputDir ${OUTPUT_DIR}/ ${MARC_DIR}/${MASK} 2> ${PREFIX}/network-analysis.log\n" $(date +"%F %T")
./network-analysis --defaultRecordType BOOKS ${TYPE_PARAMS} --outputDir ${OUTPUT_DIR}/ ${MARC_DIR}/${MASK} 2> ${PREFIX}/network-analysis.log
./network-analysis --defaultRecordType BOOKS \
${TYPE_PARAMS} \
--outputDir ${OUTPUT_DIR}/ \
${MARC_DIR}/${MASK} 2> ${PREFIX}/network-analysis.log

printf "%s %s> Rscript scripts/network-transform.R ${OUTPUT_DIR} &>> ${PREFIX}/network-analysis.log\n" $(date +"%F %T")
Rscript scripts/network-transform.R ${OUTPUT_DIR} &>> ${PREFIX}/network-analysis.log

printf "%s %s> ./network-analysis --outputDir ${OUTPUT_DIR} --action pairing --group-limit 2000 &>> ${PREFIX}/network-analysis.log\n" $(date +"%F %T")
./network-analysis --outputDir ${OUTPUT_DIR} --action pairing --group-limit 2000 &>> ${PREFIX}/network-analysis.log
./network-analysis --outputDir ${OUTPUT_DIR} \
--action pairing \
&>> ${PREFIX}/network-analysis.log

spark-shell -I scripts/network.scala --conf spark.driver.metadata.qa.dir="${OUTPUT_DIR}"
./network-export.sh ${OUTPUT_DIR}
Expand Down
13 changes: 8 additions & 5 deletions network-export.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ OUTPUT_DIR=$1

download() {
NAME=$1
echo "downloading ${NAME}.csv ..."
cat ${OUTPUT_DIR}/${NAME}.csv.dir/part-* > ${OUTPUT_DIR}/${NAME}.csv
rm -rf ${OUTPUT_DIR}/${NAME}.csv.dir
wc -l ${OUTPUT_DIR}/${NAME}.csv
if [[ -d ${OUTPUT_DIR}/${NAME}.csv.dir ]]; then
echo "downloading ${NAME}.csv ..."
cat ${OUTPUT_DIR}/${NAME}.csv.dir/part-* > ${OUTPUT_DIR}/${NAME}.csv
rm -rf ${OUTPUT_DIR}/${NAME}.csv.dir
wc -l ${OUTPUT_DIR}/${NAME}.csv
fi
}

download network-nodes-indegrees
Expand All @@ -19,4 +21,5 @@ download network-nodes-components
download network-nodes-components-stat
download network-nodes-components-histogram
download network-nodes-degrees
download network-nodes-degree-stat
download network-nodes-degrees-stat
download network-nodes-degrees-histogram
91 changes: 55 additions & 36 deletions scripts/network.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.graphx.GraphLoader
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import scala.collection.JavaConverters._

val log = Logger.getLogger("network-analysis")
log.setLevel(Level.INFO)

log.info("prepare")

val dir = "file://" + sc.getConf.get("spark.driver.metadata.qa.dir") + "/"
println(dir)
log.info(dir)

// val dir = "file:///home/kiru/bin/marc/_output/gent/"

// Load my user data and parse into tuples of user id and attribute list
val nodes = (sc.textFile(dir + "network-nodes.csv")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
.map(line => line.split(","))
.map(parts => (parts.head.toLong, parts.tail)))

// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, dir + "network-pairs-limited.csv")
val followerGraph = GraphLoader.edgeListFile(sc, dir + "network-pairs.csv")

// Attach the attributes
val graph = followerGraph.outerJoinVertices(nodes) {
Expand All @@ -24,73 +31,77 @@ val graph = followerGraph.outerJoinVertices(nodes) {
}

// STEP 1: indegree
log.info("STEP 1: indegree")

// indegree
var df = graph.inDegrees.toDF("id","degree")
var dataDF = df.select(df.columns.map(c => df.col(c).cast("string")): _*)
var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
var outputFolder = dir + "network-nodes-indegrees.csv.dir"
headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)
// var df = graph.inDegrees.toDF("id","degree")
// var dataDF = df.select(df.columns.map(c => df.col(c).cast("string")): _*)
// var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
// var outputFolder = dir + "network-nodes-indegrees.csv.dir"
// headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)

// indegree stat
var dataDF = df.select("degree").summary().toDF(Seq("statistic", "value"): _*)
var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
var outputFolder = dir + "network-nodes-indegree-stat.csv.dir"
headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)
// var dataDF = df.select("degree").summary().toDF(Seq("statistic", "value"): _*)
// var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
// var outputFolder = dir + "network-nodes-indegree-stat.csv.dir"
// headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)

// STEP 2: page rank
log.info("STEP 2: page rank")

var pagerankGraph = graph.pageRank(0.001)
// var pagerankGraph = graph.pageRank(0.001)

// Get the attributes of the top pagerank nodes
val infoWithPageRank = graph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
// val infoWithPageRank = graph.outerJoinVertices(pagerankGraph.vertices) {
// case (uid, attrList, Some(pr)) => (pr, attrList.toList)
// case (uid, attrList, None) => (0.0, attrList.toList)
// }

var df_pagerank = infoWithPageRank.vertices.map(e => (e._1, e._2._1)).toDF("id", "score")
var dataDF = df_pagerank.select(df_pagerank.columns.map(c => df_pagerank.col(c).cast("string")): _*)
var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
var outputFolder = dir + "network-nodes-pagerank.csv.dir"
headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)
// var df_pagerank = infoWithPageRank.vertices.map(e => (e._1, e._2._1)).toDF("id", "score")
// var dataDF = df_pagerank.select(df_pagerank.columns.map(c => df_pagerank.col(c).cast("string")): _*)
// var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
// var outputFolder = dir + "network-nodes-pagerank.csv.dir"
// headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)

// page rank stat
var dataDF = df_pagerank.select("score").summary().toDF(Seq("statistic", "value"): _*)
var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
var outputFolder = dir + "network-nodes-pagerank-stat.csv.dir"
headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)
// var dataDF = df_pagerank.select("score").summary().toDF(Seq("statistic", "value"): _*)
// var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
// var outputFolder = dir + "network-nodes-pagerank-stat.csv.dir"
// headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)

// page rank histogram
var histogram = df_pagerank.select(round($"score", 0).as("score").cast("int")).groupBy("score").count().orderBy("score")
var histogramDF = histogram.select(histogram.columns.map(c => histogram.col(c).cast("string")): _*)
var headerDF = spark.createDataFrame(List(Row.fromSeq(histogramDF.columns.toSeq)).asJava, histogramDF.schema)
var outputFolder = dir + "network-nodes-pagerank-histogram.csv.dir"
headerDF.union(histogramDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)
// var histogram = df_pagerank.select(round($"score", 0).as("score").cast("int")).groupBy("score").count().orderBy("score")
// var histogramDF = histogram.select(histogram.columns.map(c => histogram.col(c).cast("string")): _*)
// var headerDF = spark.createDataFrame(List(Row.fromSeq(histogramDF.columns.toSeq)).asJava, histogramDF.schema)
// var outputFolder = dir + "network-nodes-pagerank-histogram.csv.dir"
// headerDF.union(histogramDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)

// STEP 3: connectedComponents
log.info("STEP 3: connectedComponents")

var cc = graph.connectedComponents()
var componentDF = cc.vertices.toDF("vid", "cid")
var componentsCount = componentDF.groupBy("cid").count().toDF(Seq("group-size", "count"): _*)
var componentsCount = componentDF.groupBy("cid").count().toDF("componentId", "size")
var dataDF = componentsCount.orderBy(desc("count")).select(componentsCount.columns.map(c => componentsCount.col(c).cast("string")): _*)
var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
var outputFolder = dir + "network-nodes-components.csv.dir"
headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)

// connectedComponents stat
var statDF = componentsCount.select("count").summary().toDF(Seq("statistic", "value"): _*)
var statDF = componentsCount.select("size").summary().toDF("statistic", "value")
var headerDF = spark.createDataFrame(List(Row.fromSeq(statDF.columns.toSeq)).asJava, statDF.schema)
var outputFolder = dir + "network-nodes-components-stat.csv.dir"
headerDF.union(statDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)

var histogram = componentsCount.select("count").groupBy("count").count().toDF(Seq("group-size", "count"): _*).orderBy("group-size")
var histogram = componentsCount.select("size").groupBy("size").count().orderBy("size")
var histogramDF = histogram.select(histogram.columns.map(c => histogram.col(c).cast("string")): _*)
var headerDF = spark.createDataFrame(List(Row.fromSeq(histogramDF.columns.toSeq)).asJava, histogramDF.schema)
var outputFolder = dir + "network-nodes-components-histogram.csv.dir"
headerDF.union(histogramDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)


// STEP 4: degree
log.info("STEP 4: degree")

var degreesRDD = graph.degrees.cache()
var df = degreesRDD.toDF("id", "degree")
Expand All @@ -99,9 +110,17 @@ var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJ
var outputFolder = dir + "network-nodes-degrees.csv.dir"
headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)

var dataDF = df.select("degree").summary().toDF(Seq("statistic", "value"): _*)
var dataDF = df.select("degree").summary().toDF("statistic", "value")
var headerDF = spark.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
var outputFolder = "file:///home/kiru/bin/marc/_output/gent/network-nodes-degree-stat.csv.dir"
var outputFolder = dir + "network-nodes-degrees-stat.csv.dir"
headerDF.union(dataDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)

var histogram = df.select("degree").groupBy("degree").count().orderBy("degree")
var histogramDF = histogram.select(histogram.columns.map(c => histogram.col(c).cast("string")): _*)
var headerDF = spark.createDataFrame(List(Row.fromSeq(histogramDF.columns.toSeq)).asJava, histogramDF.schema)
var outputFolder = dir + "network-nodes-degrees-histogram.csv.dir"
headerDF.union(histogramDF).write.option("header", "false").mode(SaveMode.Overwrite).csv(outputFolder)

log.info("DONE")

System.exit(0)
40 changes: 25 additions & 15 deletions src/main/java/de/gwdg/metadataqa/marc/cli/NetworkAnalysis.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public class NetworkAnalysis implements MarcFileProcessor, Serializable {
private final boolean readyToProcess;
private final List<String> orphans = new ArrayList<>();
private Path path;
private BufferedWriter writer;
private BufferedWriter networkWriter;
private BufferedWriter pairWriter;
private BufferedWriter nodeWriter;

public NetworkAnalysis(String[] args) throws ParseException {
Expand Down Expand Up @@ -80,9 +81,9 @@ private void pairIds() {
logger.info("pairIds");
Path outputPath = Paths.get(parameters.getOutputDir(), "network-pairs.csv");
try {
writer = Files.newBufferedWriter(outputPath);
pairWriter = Files.newBufferedWriter(outputPath);
if (asBase36)
writer.write(createRow("id1", "id2"));
pairWriter.write(createRow("id1", "id2"));
} catch (IOException e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -117,7 +118,7 @@ private void pairIds() {
List<String> pairs = makePairs(encoded, asBase36);
try {
for (String pair : pairs) {
writer.write(pair);
pairWriter.write(pair);
}
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -133,16 +134,14 @@ private void pairIds() {
e.printStackTrace();
}
}


}
}
);
} catch (IOException e) {
e.printStackTrace();
}
try {
writer.close();
pairWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
Expand All @@ -169,12 +168,23 @@ private Object[] stringToInteger(String[] ids) {

private List<String> makePairs(Object[] elements, boolean asBase36) {
List<String> pairs = new ArrayList<>(elements.length);
for (int i = 0; i < elements.length - 1; i++) {
for (int j = (i + 1); j < elements.length; j++) {
if (asBase36)
int len = elements.length;
for (int i = 0; i < (len - 1); i++) {
for (int j = (i + 1); j < len; j++) {
if (asBase36) {
String a = (String) elements[i];
String b = (String) elements[j];
if (a.equals(b))
continue;
pairs.add(createRow(elements[i], elements[j]));
else
pairs.add(createRowWithSep(' ', elements[i], elements[j]));
} else {
int a = (int) elements[i];
int b = (int) elements[j];
if (a == b)
continue;
pairs.add(createRowWithSep(' ', a, b));

}
}
}
return pairs;
Expand All @@ -196,7 +206,7 @@ public void processRecord(MarcRecord marcRecord, int recordNumber) throws IOExce
Set<DataField> collector = analyzer.process(recordNumber);
if (collector.size() > 0) {
for (DataField field : collector) {
writer.write(createRow(field.toString().hashCode(), recordNumber));
networkWriter.write(createRow(field.toString().hashCode(), recordNumber));
}
}
orphans.add(marcRecord.getId(true));
Expand All @@ -207,8 +217,8 @@ public void beforeIteration() {
path = Paths.get(parameters.getOutputDir(), "network.csv");
logger.info(parameters.formatParameters());
try {
writer = Files.newBufferedWriter(path);
writer.write(createRow("concept", "id"));
networkWriter = Files.newBufferedWriter(path);
networkWriter.write(createRow("concept", "id"));
} catch (IOException e) {
e.printStackTrace();
}
Expand Down

0 comments on commit feb92cf

Please sign in to comment.