Skip to content

Commit

Permalink
optimize SparkLearning1 and SparkLearning2
Browse files Browse the repository at this point in the history
xubo245 committed Feb 4, 2018
1 parent 706bc48 commit 1e12c30
Showing 259 changed files with 17,830 additions and 831 deletions.
96 changes: 96 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
*#*#
*.#*
*.iml
*.ipr
*.iws
*.pyc
*.pyo
*.swp
*~
.DS_Store
.cache
.classpath
.ensime
.ensime_cache/
.ensime_lucene
.generated-mima*
.idea/
.idea_modules/
.project
.pydevproject
.scala_dependencies
.settings
/lib/
R-unit-tests.log
R/unit-tests.out
R/cran-check.out
R/pkg/vignettes/sparkr-vignettes.html
R/pkg/tests/fulltests/Rplots.pdf
build/*.jar
build/apache-maven*
build/scala*
build/zinc*
cache
checkpoint
conf/*.cmd
conf/*.conf
conf/*.properties
conf/*.sh
conf/*.xml
conf/java-opts
conf/slaves
dependency-reduced-pom.xml
derby.log
dev/create-release/*final
dev/create-release/*txt
dev/pr-deps/
dist/
docs/_site
docs/api
sql/docs
sql/site
lib_managed/
lint-r-report.log
log/
logs/
out/
project/boot/
project/build/target/
project/plugins/lib_managed/
project/plugins/project/build.properties
project/plugins/src_managed/
project/plugins/target/
python/lib/pyspark.zip
python/deps
python/test_coverage/coverage_data
python/test_coverage/htmlcov
python/pyspark/python
reports/
scalastyle-on-compile.generated.xml
scalastyle-output.xml
scalastyle.txt
spark-*-bin-*.tgz
spark-tests.log
src_managed/
streaming-tests.log
target/
unit-tests.log
work/
adam.log

# For Hive
TempStatsStore/
metastore/
metastore_db/
sql/hive-thriftserver/test_warehouses
warehouse/
spark-warehouse/

# For R session data
.RData
.RHistory
.Rhistory
*.Rproj
*.Rproj.*

.Rproj.user
657 changes: 89 additions & 568 deletions .idea/workspace.xml

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -23,56 +23,56 @@ SparkLearning项目带有数据,下载会比较慢,如果只想下载部分
# 3.具体博客目录:
## (1).Spark基本学习篇: 
[SparkBaseLearning文档](./docs/Spark/SparkBaseLearning)
[SparkBaseLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/examples)
[SparkBaseLearning代码](SparkLearning1/src/main/scala/org/apache/spark/examples)

## (2).Spark代码篇:
[SparkCodeLearning文档](./docs/Spark/SparkCodeLearning)
[SparkCodeLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/rdd)
[SparkCodeLearning代码](SparkLearning1/src/main/scala/org/apache/spark/rdd)

## (3).Spark组件之Mllib学习篇
[MLlibLearning文档](./docs/Spark/MLlibLearning)
[MLlibLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/mllib)
[MLlibLearning代码](SparkLearning1/src/main/scala/org/apache/spark/mllib)

## (4).Spark组件之SparkSQL学习篇
[SparkSQLLearning文档](./docs/Spark/SparkSQLLearning)
[SparkSQLLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/sql)
[SparkSQLLearning代码](SparkLearning1/src/main/scala/org/apache/spark/sql)

## (5).Spark组件之SparkR学习篇
[SparkRLearning文档](./docs/Spark/SparkRLearning)
[SparkRLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/R)
[SparkRLearning代码](SparkLearning1/src/main/scala/org/apache/spark/R)

## (6).Spark组件之Spark Streaming学习篇
[SparkStreamingLearning文档](./docs/Spark/SparkStreamingLearning)
[SparkStreamingLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/Streaming)
[SparkStreamingLearning代码](SparkLearning1/src/main/scala/org/apache/spark/Streaming)

## (7). Spark组件之GraphX学习篇
[GraphXLearning文档](./docs/Spark/GraphXLearning)
[GraphXLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/graphx)
[GraphXLearning代码](SparkLearning1/src/main/scala/org/apache/spark/graphx)

## (8).Spark-Avro学习篇
[SparkAvroLearning文档](./docs/Spark/SparkAvroLearning)
[SparkAvroLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/avro)
[SparkAvroLearning代码](SparkLearning1/src/main/scala/org/apache/spark/avro)

## (9).Spark生态之Alluxio(Tachyon)学习篇
[AlluxioLearning文档](./docs/Spark/AlluxioLearning)
[AlluxioLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/tachyon)
[AlluxioLearning代码](SparkLearning1/src/main/scala/org/apache/spark/tachyon)

## (10).Spark生态之spark-csv篇:
[SparkCsvLearning文档](./docs/Spark/SparkCsvLearning)
[SparkCsvLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/sparkCSV)
[SparkCsvLearning代码](SparkLearning1/src/main/scala/org/apache/spark/sparkCSV)

## (11).Spark疑问篇
[SparkQuestion文档](./docs/Spark/SparkQuestion)

## (12).MLLearning:
[MLLearning文档](./docs/Spark/MLLearning)
[MLLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/ml)
[MLLearning代码](SparkLearning1/src/main/scala/org/apache/spark/ml)

MLlibLearning project: [https://github.com/xubo245/MLlibLearning](https://github.com/xubo245/MLlibLearning)

## (13). Spark源码学习
[SparkSourceLearning文档](./docs/SparkSourceLearning)
[SparkSourceLearning代码](SparkLearning1.5/src/main/scala/org/apache/spark/sourceCode)
[SparkSourceLearning代码](SparkLearning1/src/main/scala/org/apache/spark/sourceCode)


## Help
251 changes: 0 additions & 251 deletions SparkLearning.iml

This file was deleted.

188 changes: 188 additions & 0 deletions SparkLearning1/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>SparkLearning</artifactId>
<groupId>org.apache.spark</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>SparkLearning1</artifactId>

<properties>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<adam.version>0.18.2</adam.version>
<spark1.version>1.5.2</spark1.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_${scala.binary.version}</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.binary.version}</artifactId>
<version>0.11.2</version>
<exclusions>
<!-- This is included as a compile-scoped dependency by jtransforms, which is
a dependency of breeze. -->
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>
<version>2.2.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.bdgenomics.adam/adam-core_2.10 -->
<dependency>
<groupId>org.bdgenomics.adam</groupId>
<artifactId>adam-core_2.10</artifactId>
<version>${adam.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.bdgenomics.adam/adam-cli_2.10 -->
<dependency>
<groupId>org.bdgenomics.adam</groupId>
<artifactId>adam-cli_2.10</artifactId>
<version>${adam.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.bdgenomics.adam/adam-apis_2.10 -->
<dependency>
<groupId>org.bdgenomics.adam</groupId>
<artifactId>adam-apis_2.10</artifactId>
<version>${adam.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.databricks/spark-avro_2.10 -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv_2.10 -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>

<!-- http://mvnrepository.com/artifact/org.apache.spark/spark-mllib_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>${spark1.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>${spark1.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark1.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark1.version}</version>
<type>test-jar</type>
</dependency>


<!--<dependency>-->
<!--<groupId>org.bdgenomics.utils</groupId>-->
<!--<artifactId>utils-misc_2.10</artifactId>-->
<!--<version>0.2.7</version>-->
<!--<type>test-jar</type>-->
<!--<scope>test</scope>-->
<!--</dependency>-->
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<!--
This plugin forces the generation of jar containing sql test classes,
so that the tests classes of external modules can use them. The two execution profiles
are necessary - first one for 'mvn package', second one for 'mvn test-compile'. Ideally,
'mvn compile' should not compile test classes and therefore should not need this.
However, a closed due to "Cannot Reproduce" Maven bug (https://issues.apache.org/jira/browse/MNG-3559)
causes the compilation to fail if catalyst test-jar is not generated. Hence, the
second execution profile for 'mvn test-compile'.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>prepare-test-jar</id>
<phase>test-compile</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<argLine>-ea -Xmx4g -Xss4m -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/gen-java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@

//package org.apache.spark.ml;
//
//import java.util.Arrays;
//
//import org.apache.spark.SparkConf;
//import org.apache.spark.api.java.JavaRDD;
//import org.apache.spark.api.java.JavaSparkContext;
//import org.apache.spark.ml.classification.LogisticRegression;
//import org.apache.spark.ml.classification.LogisticRegressionModel;
//import org.apache.spark.ml.param.ParamMap;
//import org.apache.spark.mllib.linalg.Vectors;
//import org.apache.spark.mllib.regression.LabeledPoint;
//import org.apache.spark.sql.DataFrame;
//import org.apache.spark.sql.Row;
//import org.apache.spark.sql.SQLContext;
//
//public class ExampleETP {
// public static void main(String[] args) {
//
// SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]");
// JavaSparkContext sc = new JavaSparkContext(conf);
// SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
//
// // Prepare training data.
// // We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
// // into DataFrames, where it uses the bean metadata to infer the schema.
// DataFrame training = sqlContext
// .createDataFrame((JavaRDD<Object>) Arrays.asList(new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
// new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
// new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
// new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))), LabeledPoint.class);
//
// // Create a LogisticRegression instance. This instance is an Estimator.
// LogisticRegression lr = new LogisticRegression();
// // Print out the parameters, documentation, and any default values.
// System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");
//
// // We may set parameters using setter methods.
// lr.setMaxIter(10).setRegParam(0.01);
//
// // Learn a LogisticRegression model. This uses the parameters stored in lr.
// LogisticRegressionModel model1 = lr.fit(training);
// // Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// // we can view the parameters it used during fit().
// // This prints the parameter (name: value) pairs, where names are unique IDs for this
// // LogisticRegression instance.
// System.out
// .println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());
//
// // We may alternatively specify parameters using a ParamMap.
// ParamMap paramMap = new ParamMap().put(lr.maxIter().w(20)) // Specify 1 Param.
// .put(lr.maxIter(), 30) // This overwrites the original maxIter.
// .put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
//
// // One can also combine ParamMaps.
// ParamMap paramMap2 = new ParamMap().put(lr.probabilityCol().w("myProbability")); // Change output column name
// ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
//
// // Now learn a new model using the paramMapCombined parameters.
// // paramMapCombined overrides all parameters set earlier via lr.set* methods.
// LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
// System.out
// .println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());
//
// // Prepare test documents.
// DataFrame test = sqlContext
// .createDataFrame((JavaRDD<?>) Arrays.asList(new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
// new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
// new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))), LabeledPoint.class);
//
// // Make predictions on test documents using the Transformer.transform() method.
// // LogisticRegression.transform will only use the 'features' column.
// // Note that model2.transform() outputs a 'myProbability' column instead of the usual
// // 'probability' column since we renamed the lr.probabilityCol parameter previously.
// DataFrame results = model2.transform(test);
// for (Row r : results.select("features", "label", "myProbability", "prediction").collect()) {
// System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
// + ", prediction=" + r.get(3));
// }
// }
//}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@

//package org.apache.spark.ml.Example;
//
//import org.apache.spark.SparkConf;
//import org.apache.spark.api.java.JavaRDD;
//import org.apache.spark.api.java.JavaSparkContext;
//import org.apache.spark.rdd.RDD;
//import org.apache.spark.sql.SQLContext;
//
//import java.io.Serializable;
//import java.util.Arrays;
//import java.util.List;
//
//import org.apache.spark.ml.Pipeline;
//import org.apache.spark.ml.PipelineModel;
//import org.apache.spark.ml.PipelineStage;
//import org.apache.spark.ml.classification.LogisticRegression;
//import org.apache.spark.ml.feature.HashingTF;
//import org.apache.spark.ml.feature.Tokenizer;
//import org.apache.spark.sql.DataFrame;
//import org.apache.spark.sql.Row;
//public class PipelineLearning {
// public static void main(String[] args) {
// SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]");
// JavaSparkContext sc = new JavaSparkContext(conf);
// SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
//
//
//
//// Prepare training documents, which are labeled.
// DataFrame training = sqlContext.createDataFrame((JavaRDD<LabeledDocument>) Arrays.asList(
// new LabeledDocument(0L, "a b c d e spark", 1.0),
// new LabeledDocument(1L, "b d", 0.0),
// new LabeledDocument(2L, "spark f g h", 1.0),
// new LabeledDocument(3L, "hadoop mapreduce", 0.0)
// ), LabeledDocument.class);
//
//// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
// Tokenizer tokenizer = new Tokenizer()
// .setInputCol("text")
// .setOutputCol("words");
// HashingTF hashingTF = new HashingTF()
// .setNumFeatures(1000)
// .setInputCol(tokenizer.getOutputCol())
// .setOutputCol("features");
// LogisticRegression lr = new LogisticRegression()
// .setMaxIter(10)
// .setRegParam(0.01);
// Pipeline pipeline = new Pipeline()
// .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
//
//// Fit the pipeline to training documents.
// PipelineModel model = pipeline.fit(training);
//
//// Prepare test documents, which are unlabeled.
// DataFrame test = sqlContext.createDataFrame((JavaRDD<?>) Arrays.asList(
// new Document(4L, "spark i j k"),
// new Document(5L, "l m n"),
// new Document(6L, "mapreduce spark"),
// new Document(7L, "apache hadoop")
// ), Document.class);
//
//// Make predictions on test documents.
// DataFrame predictions = model.transform(test);
// for (Row r: predictions.select("id", "text", "probability", "prediction").collect()) {
// System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
// + ", prediction=" + r.get(3));
// }
// }
//}
//
//
//// Labeled and unlabeled instance types.
//// Spark SQL can infer schema from Java Beans.
//class Document implements Serializable {
// private long id;
// private String text;
//
// public Document(long id, String text) {
// this.id = id;
// this.text = text;
// }
//
// public long getId() { return this.id; }
// public void setId(long id) { this.id = id; }
//
// public String getText() { return this.text; }
// public void setText(String text) { this.text = text; }
//}
//
//class LabeledDocument extends Document implements Serializable {
// private double label;
//
// public LabeledDocument(long id, String text, double label) {
// super(id, text);
// this.label = label;
// }
//
// public double getLabel() { return this.label; }
// public void setLabel(double label) { this.label = label; }
//}
24 changes: 24 additions & 0 deletions SparkLearning1/src/main/java/org/apache/spark/ml/Example/test.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.spark.ml.Example;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;

import java.util.Arrays;
import java.util.List;

public class test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

System.out.println(distData.count());

sc.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* @author xubo
* need adam jar :
* https://github.com/bigdatagenomics/adam
*/
package org.apache.bdg.adam.learning
import org.apache.spark._
import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Projection }
import java.text.SimpleDateFormat
import java.util._;

object kmerSaveAsFile {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test Adam kmer").setMaster("local")
// val conf=new SparkConf().setAppName("test Adam kmer").setMaster("local")
// val conf=new SparkConf().setAppName("test Adam kmer")
val sc = new SparkContext(conf)
val ac = new ADAMContext(sc)
// Load alignments from disk
//val reads = ac.loadAlignments("/data/NA21144.chrom11.ILLUMINA.adam",
// val reads = ac.loadAlignments("/xubo/adam/output/small.adam",
val reads = ac.loadAlignments("file/data/examples/input/small.adam",
projection = Some(Projection(AlignmentRecordField.sequence, AlignmentRecordField.readMapped, AlignmentRecordField.mapq)))
// Generate, count and sort 21-mers
val kmers = reads.flatMap(_.getSequence.sliding(21).map(k => (k, 1L))).reduceByKey(_ + _).map(_.swap).sortByKey(ascending = false)
kmers.take(10).foreach(println)

//
val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
val soutput = "file/data/examples/output/kmer/" + iString + "/smallkmers21.adam";

println("kmers.count(reduceByKey):" + kmers.count)
kmers.saveAsTextFile(soutput)
val sum0 = for ((a, b) <- kmers) yield a
println("kmers.count(no reduce):" + sum0.sum)
sc.stop()
// Print the top 10 most common 21-mers
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.apache.spark.R.learning

object readFile {

def main(args: Array[String]): Unit = {

// val sc = sparkR.init()
// val sqlContext = sparkRSQL.init(sc)
println("fail");

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/streaming-programming-guide.html
* @time 20160425
* @spark-1.5.2
*/
package org.apache.spark.Streaming.learning

import org.apache.spark.streaming.dstream._

object AQuickExample {

def main(args: Array[String]): Unit = {

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
//words.g
import org.apache.spark.streaming.StreamingContext._
// not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.Streaming.learning

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

/**
* Counts words in new text files created in the given directory
* Usage: HdfsWordCount <directory>
* <directory> is the directory that Spark Streaming will use to find and read new text files.
*
* To run this on your local machine on directory `localdir`, run this example
* $ bin/run-example \
* org.apache.spark.examples.streaming.HdfsWordCount localdir
*
* Then create a text file in `localdir` and the words in the file will get counted.
*/
object HdfsWordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: HdfsWordCount <directory>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.Streaming.learning

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: NetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
*/
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.Streaming.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel

/**
* Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
* network every second.
*
* Usage: SqlNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount localhost 9999`
*/

object SqlNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

// Create the context with a 2 second batch size
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))

// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD((rdd: RDD[String], time: Time) => {
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._

// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()

// Register as table
wordsDataFrame.registerTempTable("words")

// Do word count on table using SQL and print it
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
})

ssc.start()
ssc.awaitTermination()
}
}


/** Case class for converting RDD to DataFrame */
case class Record(word: String)


/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {

@transient private var instance: SQLContext = _

def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.Streaming.learning

import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import scala.Option.option2Iterable

/**
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
* second starting with initial value of word count.
* Usage: StatefulNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
* data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example
* org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999`
*/
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
System.exit(1)
}

StreamingExamples.setStreamingLogLevels()

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)
}

val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}

val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")

// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.spark.Streaming.learning

object StreamingContextTest {

def main(args: Array[String]): Unit = {
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName("StreamingContextTest").setMaster("local")
val ssc = new StreamingContext(conf, Seconds(1))

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.Streaming.learning

import org.apache.spark.Logging

import org.apache.log4j.{Level, Logger}

/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {

/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.spark.Streaming.learning

import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object WindowsWordCount {
def main(args: Array[String]) {

val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)

//创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(".")

// //获取数据
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(","))

//windows操作
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
//val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_, _-_,Seconds(args(2).toInt), Seconds(args(3).toInt))

wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.spark.Streaming.learning

object fileStream {

def main(args: Array[String]): Unit = {
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName("StreamingContextTest").setMaster("local")
val ssc = new StreamingContext(conf, Seconds(1))
// ssc.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* @author xubo
* @time 20160502
* ref https://github.com/databricks/spark-avro
*/
package org.apache.spark.avro.learning

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.text.SimpleDateFormat
import java.util.Date
/**
* Avro Compression different level
*/
object AvroCompression {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("AvroCompression").setMaster("local")
val sc = new SparkContext(conf)
// import needed for the .avro method to be added
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate")
sqlContext.setConf("spark.sql.avro.deflate.level", "5")

// The Avro records get converted to Spark types, filtered, and
// then written back out as Avro records
val df = sqlContext.read
.format("com.databricks.spark.avro")
.load("file/data/avro/input/episodes.avro")
df.show

val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())

df.filter("doctor > 5").write
.format("com.databricks.spark.avro")
.save("file/data/avro/output/episodes/AvroCompression" + iString)
df.filter("doctor > 5").show

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.apache.spark.avro.learning

import org.apache.avro.Schema
import java.io.File
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.generic.GenericRecord
import java.text.SimpleDateFormat
import java.util.Date

object AvroFileGenerator {

def main(args: Array[String]): Unit = {
val schemaPath = "file/data/avro/input/benchmarkSchema.avsc"
val outputDir = "file/data/avro/output/avroForBenchmark/"
val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS")
.format(new Date());
val schema = new Schema.Parser().parse(new File(schemaPath))
val outputFile = new File(outputDir + "part" + iString + ".avro")
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
dataFileWriter.create(schema, outputFile)
println(schema);
println("end");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.apache.spark.avro.learning

import java.io.File
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.avro.Schema
import org.apache.avro.file.DataFileReader
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.generic.GenericRecord

object AvroNoCode {

def main(args: Array[String]): Unit = {
val schema = new Schema.Parser().parse(new File(
"file/data/avro/input/user.avsc"));
val user1 = new GenericData.Record(schema);
println(user1);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);

val user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");

println("create user:");
println(user1);
println(user2);

// Serialize user1 and user2 to disk
val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS")
.format(new Date());
val file = new File("file/data/avro/output/avro/users" + iString + ".avro");
val datumWriter = new GenericDatumWriter[GenericRecord] (
schema);
val dataFileWriter = new DataFileWriter[GenericRecord] (
datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.close();

// Deserialize users from disk
val datumReader = new GenericDatumReader[GenericRecord] (
schema);
val dataFileReader = new DataFileReader[GenericRecord] (
file, datumReader);
;
var user = null:GenericRecord;
while (dataFileReader.hasNext()) {
// Reuse user object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
user = dataFileReader.next(user);
println(user);
}

println("end");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* @author xubo
* @time 20160502
* ref https://github.com/databricks/spark-avro
*/
package org.apache.spark.avro.learning

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.text.SimpleDateFormat
import java.util.Date
import com.databricks.spark.avro._

/**
* specify the record name and namespace
*/
object AvroReadSpecifyName {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("AvroReadSpecifyName").setMaster("local")
val sc = new SparkContext(conf)
// import needed for the .avro method to be added

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val df = sqlContext.read.avro("file/data/avro/input/episodes.avro")
df.show

val name = "AvroTest"
val namespace = "com.databricks.spark.avro"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
df.write.options(parameters).avro("file/data/avro/output/episodes/AvroReadSpecifyName" + iString)

val dfread = sqlContext.read
.format("com.databricks.spark.avro")
.load("file/data/avro/output/episodes/AvroReadSpecifyName" + iString)
dfread.show

val dfread2 = sqlContext.read.avro("file/data/avro/output/episodes/AvroReadSpecifyName" + iString)
dfread2.show
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* @author xubo
* @time 20160502
* ref https://github.com/databricks/spark-avro
*/
package org.apache.spark.avro.learning

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.text.SimpleDateFormat
import java.util.Date
import com.databricks.spark.avro._

/**
* partitioned Avro records
*/
object AvroWritePartitioned {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("AvroWritePartitioned").setMaster("local")
val sc = new SparkContext(conf)
// import needed for the .avro method to be added

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val df = Seq((2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
.toDF("year", "month", "title", "rating")
df.show
val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
df.write.partitionBy("year", "month").avro("file/data/avro/output/episodes/WriteAvro" + iString)

val dfread = sqlContext.read
.format("com.databricks.spark.avro")
.load("file/data/avro/output/episodes/WriteAvro" + iString)
dfread.show

val dfread2 = sqlContext.read.avro("file/data/avro/output/episodes/WriteAvro" + iString)
dfread2.show
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* @author xubo
* @time 20160502
* ref https://github.com/databricks/spark-avro
*/
package org.apache.spark.avro.learning

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.text.SimpleDateFormat
import java.util.Date
/**
* read avro by DatabricksSparkAvro
*/
object byDatabricksSparkAvro {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("byDatabricksSparkAvro").setMaster("local")
val sc = new SparkContext(conf)
// import needed for the .avro method to be added
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

// The Avro records get converted to Spark types, filtered, and
// then written back out as Avro records
val df = sqlContext.read
.format("com.databricks.spark.avro")
.load("file/data/avro/input/episodes.avro")
df.show

val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())

df.filter("doctor > 5").write
.format("com.databricks.spark.avro")
.save("file/data/avro/output/episodes/byDatabricksSparkAvro" + iString)
df.filter("doctor > 5").show

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* @author xubo
* @time 20160502
* ref https://github.com/databricks/spark-avro
*
*/
package org.apache.spark.avro.learning

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.text.SimpleDateFormat
import java.util.Date
/**
* read avro by SparkAvro
*/
object bySparkSQL {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("bySparkSQL").setMaster("local")
val sc = new SparkContext(conf)
// import needed for the .avro method to be added
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

// The Avro records get converted to Spark types, filtered, and
// then written back out as Avro records
val df = sqlContext.read.avro("file/data/avro/input/episodes.avro")
df.show
val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())

df.filter("doctor > 5").write.avro("file/data/avro/output/episodes/bySparkSQL" + iString)
df.filter("doctor > 5").show
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.spark.avro.learning

object testPrint1 {
def main(args: Array[String]): Unit = {
println("hello");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
//package org.apache.spark.examples

//package org.apache.spark.examplesByXubo package org.apache.spark.examplesByXubo
package org.apache.spark.examples
import org.apache.spark._
import org.apache.spark.SparkContext._
import java.io.PrintWriter
import java.io.File
import java.util._
import java.text.SimpleDateFormat
//import org.apache.spark.SparkConf

object LocalProduceRandom {
def main(args: Array[String]) {
val n = 100000
val m = 100
val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
val soutput = "file/data/examples/output/" + iString;
val w1 = new PrintWriter(new File("file/data/examples/output/output" + iString + ".txt"))
var uu = new Random()
for (i <- 1 to n) { w1.println(uu.nextInt(m)) }
println("success")
w1.close()

// var count = 0
// for (i <- 1 to n) {
//// val x = random * 2 - 1
//// val y = random * 2 - 1
//// if (x*x + y*y < 1) count += 1
// }
// println("Pi is roughly " + 4 * count / 100000.0)
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.apache.spark.examples
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
//package Chapter

import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi ").setMaster("local[4]")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
println("slices:\n"+slices)
println("args.length:\n"+args.length)
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples
//import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkRDD1 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi ").setMaster("local[4]")
val spark = new SparkContext(conf)
val para = spark.parallelize(1 to 1000000, 3)
para.filter {
_ % 10000 == 0
}.foreach {
println
}
// val slices = if (args.length > 0) args(0).toInt else 2
// println("slices:\n"+slices)
// println("args.length:\n"+args.length)
// val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
// val count = spark.parallelize(1 until n, slices).map { i =>
// val x = random * 2 - 1
// val y = random * 2 - 1
// if (x*x + y*y < 1) 1 else 0
// }.reduce(_ + _)
// println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}

// scalastyle:on println
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.spark.examples
import java.util._
import java.text.SimpleDateFormat

object TestDate {
def main(args: Array[String]) {
//
// val sdf = new SimpleDateFormat("yyyy-MM-dd H:mm:ss")
//
val iString=new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date() );
val s1="file/data/examples/output/"+iString;
println(s1)
val s0= "file/data/examples/input/*"
println(s0)
}
}

//class can not export jar
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.apache.spark.examples

//class SparkWordCount {
//
//}
//package spark.examples

//import org.apache.spark.SparkConf
//import org.apache.spark.SparkContext
//
//import org.apache.spark.SparkContext._

import org.apache.spark._
import org.apache.spark.SparkContext._
import java.util._
import java.text.SimpleDateFormat
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object WordCountByTime {
def main(args: Array[String]) {

val conf = new SparkConf()
conf.setAppName("SparkWordCount").setMaster("local[4]")

val sc = new SparkContext(conf)
val s0 = "file/data/examples/input/wordCount/*"

val rdd = sc.textFile(s0)

val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
val s1 = "file/data/examples/output/wordCount" + iString;

rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).sortByKey(false).map(x => (x._2, x._1)).saveAsTextFile(s1)
println("end");
sc.stop
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.spark.examples
import org.apache.spark._
import org.apache.spark.SparkContext._
import java.util._
import java.text.SimpleDateFormat
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object WordCountByTimeNoSort {
def main(args: Array[String]) {
val s0="local"
val sinput= "file/data/examples/input/wordCount/*"
val iString=new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date() )
val soutput="file/data/examples/output/wordCount" + iString;

if (args.length > 0 ){
println("usage is org.test.WordCount <master> <input> <output>")
return
}
val sc = new SparkContext(s0, "WordCount",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
val textFile = sc.textFile(sinput)
val result = textFile.flatMap(line => line.split("\\s+"))
.map(word => (word, 1)).reduceByKey(_ + _)
// result.count();
result.saveAsTextFile(soutput)
println("end");
sc.stop
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@


//class SparkWordCount {
//
//}
//package spark.examples

//import org.apache.spark.SparkConf
//import org.apache.spark.SparkContext
//
//import org.apache.spark.SparkContext._

package org.apache.spark.examples

import org.apache.spark._
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object SparkWordCount {
def main(args: Array[String]) {

if (args.length < 1) {
System.err.println("Usage: <file>")
System.exit(1)
}

val conf = new SparkConf()
conf.setAppName("SparkWordCount")

val sc = new SparkContext(conf)

val rdd = sc.textFile(args(0))

rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).sortByKey(false).map(x => (x._2, x._1)).saveAsTextFile(args(1))

sc.stop
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* @author xubo
* You can change the number of output files by changing partitions
*/
package org.apache.spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object WordCountPartitions {
def main(args: Array[String]) {
// val conf = new SparkConf().setAppName("WordCountPartitions").setMaster("local")
val conf = new SparkConf().setAppName("WordCountPartitions").setMaster("local[4]")
val sc = new SparkContext(conf)
// val text1 = sc.textFile("file/wordCount").flatMap(_.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _)
var text1 = sc.textFile("file/data/examples/input/wordCount/*").flatMap(_.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _, 1)
// text1.map((k,v)=>(v,k))
// text1 = text1.sortBy(_._2, ascending = false) //down
text1 = text1.sortBy(_._2, ascending = true, 2) //up
val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
text1.saveAsTextFile("file/data/examples/output/wordCount" + iString);
text1.foreach(println)
println("WordCountPartitions Success");
sc.stop
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

package org.apache.spark.examples

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object WordCountSpark {
def main(args:Array[String]) {
if (args.length != 3 ){
println("usage is org.test.WordCount <master> <input> <output>")
return
}
val sc = new SparkContext(args(0), "WordCount",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
val textFile = sc.textFile(args(1))
val result = textFile.flatMap(line => line.split("\\s+"))
.map(word => (word, 1)).reduceByKey(_ + _)
// result.count();
result.saveAsTextFile(args(2))

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.spark.examples.saveAsFile
import org.apache.spark._
import java.text.SimpleDateFormat
import java.util._;

object SaveAsObjectFileByWordcountSort {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SaveAsObjectFileByWordcountSort").setMaster("local")
// val conf=new SparkConf().setAppName("SaveAsFileByWordcount").setMaster("local")
val sc = new SparkContext(conf)
val textFile = sc.textFile("file/data/examples/input/wordCount/*")

val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
val soutput = "file/data/examples/output/wordcount/SaveAsObjectFileByWordcountSort" + iString + "/";

textFile.flatMap(_.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _).map(_.swap).sortByKey(ascending = false).saveAsObjectFile(soutput)
println("success");
sc.stop()

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.apache.spark.examples.saveAsFile
import org.apache.spark._
import java.text.SimpleDateFormat
import java.util._;

object SaveAsTextFileByWordcount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SaveAsTextFileByWordcount").setMaster("local")
// val conf=new SparkConf().setAppName("SaveAsFileByWordcount").setMaster("local")
val sc = new SparkContext(conf)
val textFile = sc.textFile("file/data/examples/input/wordCount/*")

val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())

val soutput = "file/data/examples/output/wordcount/SaveAsTextFileByWordcount" + iString + "/";

textFile.flatMap(_.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _).saveAsTextFile(soutput)
println("success");
sc.stop()

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.spark.examples.saveAsFile
import org.apache.spark._
import java.text.SimpleDateFormat
import java.util._;

object SaveAsTextFileByWordcountSort {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SaveAsTextFileByWordcountSort").setMaster("local")
// val conf=new SparkConf().setAppName("SaveAsFileByWordcount").setMaster("local")
val sc = new SparkContext(conf)
val textFile = sc.textFile("file/data/examples/input/wordCount/*", 1)

val iString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())

val soutput = "file/data/examples/output/wordcount/SaveAsTextFileByWordcountSort" + iString + "/";
textFile.flatMap(_.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _).map(_.swap).sortByKey(ascending = false).saveAsTextFile(soutput)
println("success");
sc.stop()

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* @author xubo
* need adam jar :
* https://github.com/bigdatagenomics/adam
*/
package org.apache.spark.examples.saveAsFile
import org.apache.spark._
import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.projections.{AlignmentRecordField, Projection}
import java.text.SimpleDateFormat
import java.util._;

object kmerSaveAsFile{
def main(args:Array[String]){
val conf=new SparkConf().setAppName("test Adam kmer").setMaster("local")
// val conf=new SparkConf().setAppName("test Adam kmer").setMaster("local")
// val conf=new SparkConf().setAppName("test Adam kmer")
val sc=new SparkContext(conf)
val ac = new ADAMContext(sc)
// Load alignments from disk
//val reads = ac.loadAlignments("/data/NA21144.chrom11.ILLUMINA.adam",
// val reads = ac.loadAlignments("/xubo/adam/output/small.adam",
val reads = ac.loadAlignments("file/data/examples/input/small.adam",
projection = Some(Projection(AlignmentRecordField.sequence,AlignmentRecordField.readMapped,AlignmentRecordField.mapq)))
// Generate, count and sort 21-mers
val kmers =reads.flatMap(_.getSequence.sliding(21).map(k => (k, 1L))).reduceByKey(_ + _).map(_.swap).sortByKey(ascending = false)
kmers.take(10).foreach(println)

//
val iString=new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date() )
val soutput="file/data/examples/output/kmer/"+iString+"/smallkmers21.adam";

println("kmers.count(reduceByKey):"+kmers.count)
kmers.saveAsTextFile(soutput)
val sum0=for((a,b)<-kmers) yield a
println("kmers.count(no reduce):"+sum0.sum)
sc.stop()
// Print the top 10 most common 21-mers
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.spark.examples
import org.apache.spark.SparkContext._
import org.apache.spark._
object test2 {
def main(arg:Array[String]){
val sc = new SparkContext("local", "testpara",System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
val collection=sc.parallelize(1 to 10000000)
println("end");
sc.stop
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.util.GraphGenerators
import breeze.linalg.reverse
import breeze.linalg.reverse
import org.apache.spark.graphx.EdgeDirection

object CollectingNeighbors {

val K = 3
var arr = new Array[(Int, Int)](K)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CollectingNeighbors").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)

// Import random graph generation library
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 6).mapVertices((id, _) => id.toDouble)
// Compute the number of older followers and their total age

println("Graph:");
println("sc.defaultParallelism:" + sc.defaultParallelism);
println("vertices:");
graph.vertices.collect.foreach(println(_))
println("edges:");
graph.edges.collect.foreach(println(_))
println("count:" + graph.edges.count);
println("\ninDegrees");
graph.inDegrees.foreach(println)

println("\nneighborsIds:");
val neighbors0 = graph.collectNeighborIds(EdgeDirection.Out)
neighbors0.foreach(println)
neighbors0.collect.foreach { a =>
{
println(a._1 + ":")
a._2.foreach(b => print(b + " "))
println();
}
}

println("\nneighbors:");
val neighbors1 = graph.collectNeighbors(EdgeDirection.Out)
neighbors1.foreach(println)
neighbors1.collect.foreach { a =>
{
println(a._1 + ":")
a._2.foreach(b => print(b + " "))
println();
}
}
sc.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Graph.graphToGraphOps
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.graphx.GraphLoader

object ConnectedComponents {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ConnectedComponents").setMaster("local[4]")
val sc = new SparkContext(conf)

// Load the edges as a graph
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "file/data/graphx/input/followers.txt")

// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("file/data/graphx/input/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// Print the result

println("\ngraph edges");
println("edges:");
graph.edges.collect.foreach(println)
// graph.edges.collect.foreach(println)
println("vertices:");
graph.vertices.collect.foreach(println)
println("triplets:");
graph.triplets.collect.foreach(println)
println("\nusers");
users.collect.foreach(println)
println("\ncc:");
cc.collect.foreach(println)
println("\nccByUsername");
println(ccByUsername.collect().mkString("\n"))
sc.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.util.GraphGenerators

object GraphGeneratorsAndAggregateMessages {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GraphOperatorsStructuralMask").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)

// Import random graph generation library
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices((id, _) => id.toDouble)
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst(1, triplet.srcAttr)
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers

val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues((id, value) => value match { case (count, totalAge) => totalAge / count })
// Display the results
println("Graph:");
println("sc.defaultParallelism:" + sc.defaultParallelism);
println("vertices:");
graph.vertices.collect.foreach(println(_))
println("edges:");
graph.edges.collect.foreach(println(_))
println("count:" + graph.edges.count);
println("\nolderFollowers:");
olderFollowers.collect.foreach(println)
println("\navgAgeOfOlderFollowers:");
avgAgeOfOlderFollowers.collect.foreach(println(_))
// graph.inDegrees.foreach(println)
sc.stop()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.util.GraphGenerators

object GraphGeneratorsAndIndegreeOutdegree {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GraphOperatorsStructuralMask").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)

// Import random graph generation library
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 5).mapVertices((id, _) => id.toDouble)
// Compute the number of older followers and their total age

println("Graph:");
println("sc.defaultParallelism:" + sc.defaultParallelism);
println("vertices:");
graph.vertices.collect.foreach(println(_))
println("edges:");
graph.edges.collect.foreach(println(_))
println("count:" + graph.edges.count);
println("\ninDegrees");
graph.inDegrees.foreach(println)
println("\noutDegrees");
graph.outDegrees.foreach(println)

// println(graph.degrees.reduce(max));

println("\nreverse");
println("\nreverse vertices");
graph.reverse.vertices.collect.foreach(println)
println("\nreverse edges");
graph.reverse.edges.collect.foreach(println)
println("\nreverse inDegrees");
graph.reverse.inDegrees.foreach(println)
println("\nreverse inDegrees");
graph.reverse.outDegrees.foreach(println)
sc.stop()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.util.GraphGenerators

object GraphGeneratorsAndMaxMin {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GraphOperatorsStructuralMask").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)

// Import random graph generation library
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 5).mapVertices((id, _) => id.toDouble)
// Compute the number of older followers and their total age

println("Graph:");
println("sc.defaultParallelism:" + sc.defaultParallelism);
println("vertices:");
graph.vertices.collect.foreach(println(_))
println("edges:");
graph.edges.collect.foreach(println(_))
println("count:" + graph.edges.count);
println("\ndegrees");
graph.degrees.foreach(println)
println("\ninDegrees");
graph.inDegrees.foreach(println)
println("\noutDegrees");
graph.outDegrees.foreach(println)

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}

// Define a reduce operation to compute the highest degree vertex
def min(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 < b._2) a else b
}

// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)

println("\nmax:");
println("maxDegree:" + (graph.degrees.reduce(max)));
println("maxInDegree:" + graph.inDegrees.reduce(max));
println("maxoutDegree:" + graph.outDegrees.reduce(max));
println("\nmin:");
println("minDegree:" + (graph.degrees.reduce(min)));
println("minInDegree:" + graph.inDegrees.reduce(min));
println("minoutDegree:" + graph.outDegrees.reduce(min));
println("end");
sc.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.util.GraphGenerators
import breeze.linalg.reverse
import breeze.linalg.reverse

object GraphGeneratorsAndTopK {

val K = 3
var arr = new Array[(Int, Int)](K)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GraphGeneratorsAndTopK").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)

// Import random graph generation library
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices((id, _) => id.toDouble)
// Compute the number of older followers and their total age

println("Graph:");
println("sc.defaultParallelism:" + sc.defaultParallelism);
println("vertices:");
graph.vertices.collect.foreach(println(_))
println("edges:");
graph.edges.collect.foreach(println(_))
println("count:" + graph.edges.count);
println("\ninDegrees");
graph.inDegrees.foreach(println)

for (i <- 0 until K) {
arr(i) = (0, 0)
}

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}

// Define a reduce operation to compute the highest degree vertex
def min(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 < b._2) a else b
}
def minInt(a: (Int, Int), b: (Int, Int)): (Int, Int) = {
if (a._2 < b._2) a else b
}

// arr.reduce(minInt)

println("\ntopK:K=" + K);
def topK(a: (VertexId, Int)): Unit = {
if (a._2 >= arr.reduce(minInt)._2) {
arr = arr.sortBy(_._2).reverse
var tmp = (a._1.toInt, a._2)
var flag = true
for (i <- 0 until arr.length) {
if (a._2 >= arr(i)._2) { //newest max,remove = and last max
if (flag == true) {
for (j <- i + 1 until arr.length reverse) {
arr(j) = arr(j - 1)
}
arr(i) = tmp
}
flag = false
}
}
}
}

graph.inDegrees.foreach(topK(_))
arr.foreach(println)
println("end");
sc.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.util.GraphGenerators
import breeze.linalg.reverse
import breeze.linalg.reverse

object GraphGeneratorsAndTopKBackup {

val K = 3
var arr = new Array[(Int, Int)](K)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GraphGeneratorsAndTopKBackup").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)

// Import random graph generation library
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 10).mapVertices((id, _) => id.toDouble)
// Compute the number of older followers and their total age

println("Graph:");
println("sc.defaultParallelism:" + sc.defaultParallelism);
println("vertices:");
graph.vertices.collect.foreach(println(_))
println("edges:");
graph.edges.collect.foreach(println(_))
println("count:" + graph.edges.count);
// println("\ndegrees");
// graph.degrees.foreach(println)
println("\ninDegrees");
graph.inDegrees.foreach(println)
// println("\noutDegrees");
// graph.outDegrees.foreach(println)

// arr.foreach(println)
for (i <- 0 until K) {
arr(i) = (0, 0)
}
// arr.foreach(println)
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}

// Define a reduce operation to compute the highest degree vertex
def min(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 < b._2) a else b
}
def minInt(a: (Int, Int), b: (Int, Int)): (Int, Int) = {
if (a._2 < b._2) a else b
}

// arr.reduce(minInt)

println("\ntopK");
def topK(a: (VertexId, Int)): Unit = {
if (a._2 >= arr.reduce(minInt)._2) {
println(a._1 + " " + a._2 + "****");
println("min:" + arr.reduce(minInt)._2);
arr = arr.sortBy(_._2).reverse
arr.foreach(println)
println("sort end");
var tmp = (a._1.toInt, a._2)
var flag = true
for (i <- 0 until arr.length) {

if (a._2 > arr(i)._2) {
if (flag == true) {
for (j <- i + 1 until arr.length reverse) {
print(j + " ")
arr(j) = arr(j - 1)
}
println();
arr(i) = tmp
}
flag = false
}
}

arr.foreach(println)
}
}

graph.inDegrees.foreach(topK(_))
println("end");
arr.foreach(println)
// // Compute the max degrees
// val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
// val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
// val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
//
// println("\nmax:");
// println("maxDegree:" + (graph.degrees.reduce(max)));
// println("maxInDegree:" + graph.inDegrees.reduce(max));
// println("maxoutDegree:" + graph.outDegrees.reduce(max));
// println("\nmin:");
// println("minDegree:" + (graph.degrees.reduce(min)));
// println("minInDegree:" + graph.inDegrees.reduce(min));
// println("minoutDegree:" + graph.outDegrees.reduce(min));
// println("end");
sc.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

object GraphOperatorsStructuralMask {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GraphOperatorsStructuralMask").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
println("vertices:");
graph.subgraph(each => each.srcId != 100L).vertices.collect.foreach(println)
println("\ntriplets:");
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1).collect.foreach(println(_))
graph.edges.collect.foreach(println)

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)
println("\nccGraph:");
println("vertices:");
ccGraph.vertices.collect.foreach(println)
println("edegs:");
ccGraph.edges.collect.foreach(println)
println("\nvalidGraph:");
validGraph.vertices.collect.foreach(println)
println("\nvalidCCGraph:");
validCCGraph.vertices.collect.foreach(println)
sc.stop()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

object GraphOperatorsStructuralSubgraph {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GraphOperatorsStructuralSubgraph").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
println("vertices:");
graph.subgraph(each => each.srcId != 100L).vertices.collect.foreach(println)
println("\ntriplets:");
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1).collect.foreach(println(_))

// Remove missing vertices as well as the edges to connected to them
println("\nemove missing vertices as well as the edges to connected to them:");
// val validGraph = graph.subgraph(epred, vpred)

val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
println("new vertices:");
validGraph.vertices.collect.foreach(println(_))

println("\nnew triplets:");
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1).collect.foreach(println(_))
sc.stop()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object PageRankAboutBerkeleyWiki {
def main(args: Array[String]) {
//屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

//设置运行环境
val conf = new SparkConf().setAppName("PageRankAboutBerkeleyWiki").setMaster("local")
val sc = new SparkContext(conf)

//读入数据文件
val articles: RDD[String] = sc.textFile("file/data/graphx/input/graphx-wiki-vertices.txt")
val links: RDD[String] = sc.textFile("file/data/graphx/input/graphx-wiki-edges.txt")

//装载顶点和边
val vertices = articles.map { line =>
val fields = line.split('\t')
(fields(0).toLong, fields(1))
}

val edges = links.map { line =>
val fields = line.split('\t')
Edge(fields(0).toLong, fields(1).toLong, 0)
}

//cache操作
//val graph = Graph(vertices, edges, "").persist(StorageLevel.MEMORY_ONLY_SER)
val graph = Graph(vertices, edges, "").persist()
//graph.unpersistVertices(false)

//测试
println("**********************************************************")
println("获取5个triplet信息")
println("**********************************************************")
graph.triplets.take(5).foreach(println(_))

//pageRank算法里面的时候使用了cache(),故前面persist的时候只能使用MEMORY_ONLY
println("**********************************************************")
println("PageRank计算,获取最有价值的数据")
println("**********************************************************")
val prGraph = graph.pageRank(0.001).cache()

val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {
(v, title, rank) => (rank.getOrElse(0.0), title)
}

titleAndPrGraph.vertices.top(10) {
Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
}.foreach(t => println(t._2._2 + ": " + t._2._1))

sc.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Graph.graphToGraphOps
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.graphx.GraphLoader

object PageRankFromExamples {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("PageRank").setMaster("local[4]")
val sc = new SparkContext(conf)

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "file/data/graphx/input/followers.txt")

// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("file/data/graphx/input/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}

val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
println("graph:");
graph.edges.collect.foreach(println)
println("users:");
users.collect.foreach(println)
println("ranks:");
ranks.collect.foreach(println)

// Print the result
println("\nranksByUsername");
println(ranksByUsername.collect().mkString("\n"))
sc.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* @author xubo
* ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* time 20160503
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Graph.graphToGraphOps
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.util.GraphGenerators

object Pregeloperator {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Pregeloperator").setMaster("local[4]")
// Assume the SparkContext has already been constructed
val sc = new SparkContext(conf)
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 10).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 2 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.

println("graph:");
println("vertices:");
graph.vertices.collect.foreach(println)
println("edges:");
graph.edges.collect.foreach(println)
println();

val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
println("initialGraph:");
println("vertices:");
initialGraph.vertices.collect.foreach(println)
println("edges:");
initialGraph.edges.collect.foreach(println)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println();
println("sssp:");
println("vertices:");
println(sssp.vertices.collect.mkString("\n"))
println("edges:");
sssp.edges.collect.foreach(println)


sc.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* @author xubo
* @time 20160503
* ref:
* http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
* http://blog.csdn.net/zcf1002797280/article/details/50007913
*/

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Graph.graphToGraphOps
import org.apache.spark.graphx.lib.ShortestPaths

object ShortPaths {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ShortPaths").setMaster("local[4]")
val sc = new SparkContext(conf)

// 测试的真实结果,后面用于对比
val shortestPaths = Set(
(1, Map(1 -> 0, 4 -> 2)), (2, Map(1 -> 1, 4 -> 2)), (3, Map(1 -> 2, 4 -> 1)),
(4, Map(1 -> 2, 4 -> 0)), (5, Map(1 -> 1, 4 -> 1)), (6, Map(1 -> 3, 4 -> 1)))

// 构造无向图的边序列
val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap {
case e => Seq(e, e.swap)
}

// 构造无向图
val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) }
val graph = Graph.fromEdgeTuples(edges, 1)

// 要求最短路径的点集合
val landmarks = Seq(1, 2, 3,4,5,6).map(_.toLong)

// 计算最短路径
val results = ShortestPaths.run(graph, landmarks).vertices.collect.map {
case (v, spMap) => (v, spMap.mapValues(i => i))
}

val shortestPath1 = ShortestPaths.run(graph, landmarks)
// 与真实结果对比
println("\ngraph edges");
println("edges:");
graph.edges.collect.foreach(println)
// graph.edges.collect.foreach(println)
println("vertices:");
graph.vertices.collect.foreach(println)
// println("triplets:");
// graph.triplets.collect.foreach(println)
println();

println("\n shortestPath1");
println("edges:");
shortestPath1.edges.collect.foreach(println)
println("vertices:");
shortestPath1.vertices.collect.foreach(println)
// println("vertices:")

// assert(results.toSet == shortestPaths)
println("results.toSet:" + results.toSet);
println("end");

sc.stop()
}
}
Loading

0 comments on commit 1e12c30

Please sign in to comment.