-
Notifications
You must be signed in to change notification settings - Fork 1
/
io_graphx.html
206 lines (171 loc) · 10.3 KB
/
io_graphx.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<title>Saving and Loading Graphs in Graphx</title>
<link rel="stylesheet" href="/theme/css/main.css" />
<link rel="stylesheet" href="/css/" type="text/css" />
<!--[if IE]>
<script src="https://html5shiv.googlecode.com/svn/trunk/html5.js"></script>
<![endif]-->
</head>
<body id="index" class="home">
<header id="banner" class="body">
<h1><a href="/">In principio erat Verbum </a></h1>
<nav><ul>
<li class="active"><a href="/category/expository.html">Expository</a></li>
<li><a href="/category/math-212.html">Math 212</a></li>
</ul></nav>
</header><!-- /#banner -->
<section id="content" class="body">
<article>
<header>
<h1 class="entry-title">
<a href="/io_graphx.html" rel="bookmark"
title="Permalink to Saving and Loading Graphs in Graphx">Saving and Loading Graphs in Graphx</a></h1>
</header>
<div class="entry-content">
<footer class="post-info">
<abbr class="published" title="2015-10-20T21:29:00-04:00">
Published: Tue 20 October 2015
</abbr>
<br />
<abbr class="modified" title="2015-10-20T00:00:00-04:00">
Updated: Tue 20 October 2015
</abbr>
<address class="vcard author">
By <a class="url fn" href="/author/tingran-gao.html">Tingran Gao</a>
</address>
<p>In <a href="/category/expository.html">Expository</a>.</p>
<p>tags: <a href="/tag/scala.html">Scala</a> <a href="/tag/machine-learning.html">Machine Learning</a> <a href="/tag/graphx.html">GraphX</a> </p>
</footer><!-- /.post-info --> <hr />
<h2 id="save-graphs-on-disk">Save Graphs on Disk</h2>
<p>Since GraphX defines <code>Graph</code> as an abstract class as</p>
<div class="highlight"><pre> abstract class Graph[VD, ED] extends Serializable
</pre></div>
<p>we can not serialize a graph object as an RDD using <code>saveAsTextFile</code> or simialr RDD serialization routines. Instead, we can store <code>VertexRDD</code> and <code>EdgeRDD</code> as text files, and read them into memory when necessary before building up the graph in memory. While this demo only documents an experiment that dumps the RDDs to local disk, it is possible to store <code>VertexRDD</code> and <code>EdgeRDD</code> into databases such as HBase or Cassandra, as suggested by <a href="http://stackoverflow.com/questions/28337446/graphx-best-way-to-store-and-compute-over-3-billion-vertices?rq=1">some StackOverflow posts</a>.</p>
<p>Before testing loading graphs, let's save the graph we constructed in the <a href="/scala_graphx.html">previous post</a> to local disk. This is easily achieved by inserting the following two lines into the original code, after building <code>domainRDD</code>, <code>infoRDD</code>, and <code>linkRDD</code>:</p>
<div class="highlight"><pre>domainRDD.union(infoRDD).saveAsObjectFile("/home/trgao10/Work/Scala/webGraphInfer/repo/vertices");
linkRDD.saveAsObjectFile("/home/trgao10/Work/Scala/webGraphInfer/repo/edges")
</pre></div>
<p>Note that we saved the RDDs as object files. This is necessary because each vertex and each edge contains customized case classes; otherwise we can simply save them as textfiles.</p>
<p>Let's also randomly sample 100 <code>dtbd_id</code>s for testing look-ups in the rebuilt graph later. This is done with the following Scala code snippet:</p>
<div class="highlight"><pre>import java.io._
.....
.....
.....
val sampleFile = "/home/trgao10/Work/Scala/webGraphInfer/repo/samples.txt"
val writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sampleFile)))
val sampleData = domainRDD.map(_._2.asInstanceOf[Domain].dtbd_id).takeSample(false, 100)
for (x <- sampleData) {
writer.write(x + "\n") // however you want to format it
}
writer.close()
</pre></div>
<hr />
<h2 id="rebuild-graph-from-stored-vertices-and-edges-on-disk">Rebuild Graph from Stored Vertices and Edges on Disk</h2>
<p>We shall write a separate scala class to test reading the stored vertices and edges. The catch is that you still need to define the case classes, otherwise the loaded <code>VertexRDD</code> and <code>EdgeRDD</code> won't be correctly deserialized.</p>
<div class="highlight"><pre>class VertexProperty extends Serializable {}
case class Domain(val dtbd_id: String, val class_ecomm: String, val date_found: String, val isp_name: String, val enforce_status: String, val registrar: String, val traffic_rank: String) extends VertexProperty;
case class Info(val info: String, val category: String) extends VertexProperty;
</pre></div>
<p>Reading in the vertex and edge RDDs through <code>sc.objectFile</code>:</p>
<div class="highlight"><pre>val verticesRDD = sc.objectFile[(VertexId, VertexProperty)]("/home/trgao10/Work/Scala/webGraphInfer/repo/vertices");
val edgesRDD = sc.objectFile[Edge[String]]("/home/trgao10/Work/Scala/webGraphInfer/repo/edges");
</pre></div>
<p>Building the grpah and extracting connected components is then completely identical to the steps shown in the previous post.</p>
<div class="highlight"><pre>val mmBG: Graph[VertexProperty, String] = Graph(verticesRDD, edgesRDD);
val ccMMBG = mmBG.connectedComponents();
println("Total number of edges in the graph: " + mmBG.edges.count);
println("Total number of vertices in the graph: " + mmBG.vertices.count);
val ccNumVertices =
(ccMMBG.vertices.map(pair => (pair._2,1))
.reduceByKey(_+_) // count the number of vertices contained in each connected component (indexed by the smallest vertex index in the connecte dcomponent)
.map(pair => pair._2)) // only maintain the number of vertices counted
println("Number of Connected Components: " + ccNumVertices.count);
</pre></div>
<p>Here is the complete code <code>Main.scala</code>:</p>
<div class="highlight"><pre>package sparkGraph
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.collection.immutable.ListMap
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Level
import org.apache.log4j.Logger
import java.io._
object sparkGraph {
def main (args: Array[String]) {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
val conf = new SparkConf().setAppName("SparkGraph").setMaster("local[*]");
val sc = new SparkContext(conf);
class VertexProperty extends Serializable {}
case class Domain(val dtbd_id: String, val class_ecomm: String, val date_found: String, val isp_name: String, val enforce_status: String, val registrar: String, val traffic_rank: String) extends VertexProperty;
case class Info(val info: String, val category: String) extends VertexProperty;
val verticesRDD = sc.objectFile[(VertexId, VertexProperty)]("/home/trgao10/Work/Scala/webGraphInfer/repo/vertices");
val edgesRDD = sc.objectFile[Edge[String]]("/home/trgao10/Work/Scala/webGraphInfer/repo/edges");
val mmBG: Graph[VertexProperty, String] = Graph(verticesRDD, edgesRDD);
val ccMMBG = mmBG.connectedComponents();
println("Total number of edges in the graph: " + mmBG.edges.count);
println("Total number of vertices in the graph: " + mmBG.vertices.count);
val ccNumVertices =
(ccMMBG.vertices.map(pair => (pair._2,1))
.reduceByKey(_+_) // count the number of vertices contained in each connected component (indexed by the smallest vertex index in the connecte dcomponent)
.map(pair => pair._2)) // only maintain the number of vertices counted
println("Number of Connected Components: " + ccNumVertices.count);
}
}
</pre></div>
<hr />
<h2 id="query-the-rebuilt-graph">Query the Rebuilt Graph</h2>
<p>We shall test querying each <code>dtbd_id</code> stored in <code>samples.txt</code>. Before that, we should at least read in that text file. Reading in the text file and parse the lines into a list can be easily achieved with</p>
<div class="highlight"><pre>val lines = Source.fromFile("/home/trgao10/Work/Scala/webGraphInfer/repo/samples.txt").getLines.toList
</pre></div>
<p>For each <code>dtbd_id</code> in this list, let us find all vertices sharing that <code>dtbd_id</code> and count the number of these vertices. Timing the code through </p>
</div><!-- /.entry-content -->
</article>
</section>
<section id="extras" class="body">
<div class="blogroll">
<h2>useful links</h2>
<!--<h2>blogroll</h2>-->
<ul>
<li><a href="http://www.math.duke.edu/">Department of Mathematics</a></li>
<li><a href="http://www.math.duke.edu/courses/math_everywhere/">Math Everywhere @ Duke</a></li>
<li><a href="http://www.wolframalpha.com/">Wolfram Alpha</a></li>
</ul>
</div><!-- /.blogroll -->
<div class="social">
<h2>social</h2>
<ul>
<li><a href="https://github.com/trgao10">GitHub</a></li>
<li><a href="https://www.linkedin.com/pub/tingran-gao/89/8a8/a8a">LinkedIn</a></li>
</ul>
</div><!-- /.social -->
</section><!-- /#extras -->
<footer id="contentinfo" class="body">
<!--<address id="about" class="vcard body">
Proudly powered by <a href="http://getpelican.com/">Pelican</a>, which takes great advantage of <a href="http://python.org">Python</a>.
</address>--><!-- /#about -->
<p>Powered by <a href="http://getpelican.com/">Pelican</a><br>last modified:
<script>document.write(document.lastModified);</script>
</p>
<!--<p>The theme is by <a href="http://coding.smashingmagazine.com/2009/08/04/designing-a-html-5-layout-from-scratch/">Smashing Magazine</a>, thanks!</p>-->
</footer><!-- /#contentinfo -->
<script type="text/javascript">
var _gaq = _gaq || [];
_gaq.push(['_setAccount', 'UA-67249529-1']);
_gaq.push(['_trackPageview']);
(function() {
var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
ga.src = 'https://ssl.google-analytics.com/ga.js';
var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
})();
</script>
</body>
<script src="http://d3js.org/d3.v3.min.js"></script>
<script src="/js/"></script>
</html>