From 8e1d03803aa57397b5896d825673b5ee0baba963 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joaqu=C3=ADn=20T=C3=A1rraga=20Gim=C3=A9nez?= Date: Thu, 1 Jun 2017 13:19:23 +0100 Subject: [PATCH] analysis: add analysis files (using Spark MLlib), #126 --- .../opencb/hpg/bigdata/analysis/Analysis.java | 28 ++++ .../variant/LinearRegressionAnalysis.java | 121 +++++++++++++++++ .../variant/LogisticRegressionAnalysis.java | 128 ++++++++++++++++++ .../bigdata/analysis/variant/PCAAnalysis.java | 59 ++++++++ .../core/lib/VariantDatasetFacetTest.java | 113 ++++++++++++++++ 5 files changed, 449 insertions(+) create mode 100644 hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/Analysis.java create mode 100644 hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/LinearRegressionAnalysis.java create mode 100644 hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/LogisticRegressionAnalysis.java create mode 100644 hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/PCAAnalysis.java create mode 100644 hpg-bigdata-core/src/test/java/org/opencb/hpg/bigdata/core/lib/VariantDatasetFacetTest.java diff --git a/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/Analysis.java b/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/Analysis.java new file mode 100644 index 00000000..9241f414 --- /dev/null +++ b/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/Analysis.java @@ -0,0 +1,28 @@ +package org.opencb.hpg.bigdata.analysis; + +/** + * Created by jtarraga on 30/05/17. + */ +public abstract class Analysis { + + protected String datasetName; + protected String studyName; + + public abstract void run(); + + public String getDatasetName() { + return datasetName; + } + + public void setDatasetName(String datasetName) { + this.datasetName = datasetName; + } + + public String getStudyName() { + return studyName; + } + + public void setStudyName(String studyName) { + this.studyName = studyName; + } +} diff --git a/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/LinearRegressionAnalysis.java b/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/LinearRegressionAnalysis.java new file mode 100644 index 00000000..f61cc410 --- /dev/null +++ b/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/LinearRegressionAnalysis.java @@ -0,0 +1,121 @@ +package org.opencb.hpg.bigdata.analysis.variant; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.ml.feature.LabeledPoint; +import org.apache.spark.ml.linalg.Vectors; +import org.apache.spark.ml.regression.LinearRegression; +import org.apache.spark.ml.regression.LinearRegressionModel; +import org.apache.spark.ml.regression.LinearRegressionTrainingSummary; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.opencb.hpg.bigdata.analysis.Analysis; + +import java.util.ArrayList; +import java.util.List; + +/** + * Created by jtarraga on 30/05/17. + */ +public class LinearRegressionAnalysis extends Analysis { + + private String depVarName; + private String indepVarName; + + private int numIterations = 10; // number of iterations + private double regularization = 0.3; // regularization parameter + private double elasticNet = 0.8; // elastic net mixing parameter + + @Override + public void run() { + LinearRegression lr = new LinearRegression() + .setMaxIter(numIterations) + .setRegParam(regularization) + .setElasticNetParam(elasticNet); + + // prepare dataset + int numFeatures = 10; + double target = Double.NaN; + double[] features = new double[numFeatures]; + LabeledPoint lp = new LabeledPoint(target, Vectors.dense(features)); + + List list = new ArrayList(); + list.add(lp); + JavaSparkContext jsc = new JavaSparkContext(); + SQLContext sqlContext = new SQLContext(jsc); + JavaRDD data = jsc.parallelize(list); + data.cache(); + + // fit the model + Dataset training = sqlContext.createDataFrame(data.rdd(), LabeledPoint.class); + LinearRegressionModel lrModel = lr.fit(training); + + // print the coefficients and intercept for linear regression + System.out.println("Coefficients: " + + lrModel.coefficients() + " Intercept: " + lrModel.intercept()); + + // summarize the model over the training set and print out some metrics + LinearRegressionTrainingSummary trainingSummary = lrModel.summary(); + System.out.println("numIterations: " + trainingSummary.totalIterations()); + System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory())); + trainingSummary.residuals().show(); + System.out.println("RMSE: " + trainingSummary.rootMeanSquaredError()); + System.out.println("r2: " + trainingSummary.r2()); + } + + public LinearRegressionAnalysis(String datasetName, String studyName, String depVarName, String indepVarName) { + this(datasetName, studyName, depVarName, indepVarName, 10, 0.3, 0.8); + } + + public LinearRegressionAnalysis(String datasetName, String studyName, String depVarName, String indepVarName, + int numIterations, double regularization, double elasticNet) { + this.datasetName = datasetName; + this.studyName = studyName; + this.depVarName = depVarName; + this.indepVarName = indepVarName; + this.numIterations = numIterations; + this.regularization = regularization; + this.elasticNet = elasticNet; + } + + public String getDepVarName() { + return depVarName; + } + + public void setDepVarName(String depVarName) { + this.depVarName = depVarName; + } + + public String getIndepVarName() { + return indepVarName; + } + + public void setIndepVarName(String indepVarName) { + this.indepVarName = indepVarName; + } + + public int getNumIterations() { + return numIterations; + } + + public void setNumIterations(int numIterations) { + this.numIterations = numIterations; + } + + public double getRegularization() { + return regularization; + } + + public void setRegularization(double regularization) { + this.regularization = regularization; + } + + public double getElasticNet() { + return elasticNet; + } + + public void setElasticNet(double elasticNet) { + this.elasticNet = elasticNet; + } +} diff --git a/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/LogisticRegressionAnalysis.java b/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/LogisticRegressionAnalysis.java new file mode 100644 index 00000000..05ea14da --- /dev/null +++ b/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/LogisticRegressionAnalysis.java @@ -0,0 +1,128 @@ +package org.opencb.hpg.bigdata.analysis.variant; + +import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary; +import org.apache.spark.ml.classification.LogisticRegression; +import org.apache.spark.ml.classification.LogisticRegressionModel; +import org.apache.spark.ml.classification.LogisticRegressionTrainingSummary; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.functions; +import org.opencb.hpg.bigdata.analysis.Analysis; + +/** + * Created by jtarraga on 30/05/17. + */ +public class LogisticRegressionAnalysis extends Analysis { + + private String depVarName; + private String indepVarName; + + private int numIterations = 10; // number of iterations + private double regularization = 0.3; // regularization parameter + private double elasticNet = 0.8; // elastic net mixing parameter + + @Override + public void run() { + LogisticRegression lr = new LogisticRegression() + .setMaxIter(numIterations) + .setRegParam(regularization) + .setElasticNetParam(elasticNet); + + // prepare dataset + Dataset training = null; + + // fit the model + LogisticRegressionModel lrModel = lr.fit(training); + + // print the coefficients and intercept for linear regression + System.out.println("Coefficients: " + + lrModel.coefficients() + " Intercept: " + lrModel.intercept()); + + // summarize the model over the training set and print out some metrics + LogisticRegressionTrainingSummary trainingSummary = lrModel.summary(); + System.out.println("numIterations: " + trainingSummary.totalIterations()); + System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory())); + + // obtain the loss per iteration + double[] objectiveHistory = trainingSummary.objectiveHistory(); + for (double lossPerIteration : objectiveHistory) { + System.out.println(lossPerIteration); + } + + // obtain the metrics useful to judge performance on test data + // we cast the summary to a BinaryLogisticRegressionSummary since the problem is a binary + // classification problem. + BinaryLogisticRegressionSummary binarySummary = + (BinaryLogisticRegressionSummary) trainingSummary; + + // obtain the receiver-operating characteristic as a dataframe and areaUnderROC + Dataset roc = binarySummary.roc(); + roc.show(); + roc.select("FPR").show(); + System.out.println(binarySummary.areaUnderROC()); + + // get the threshold corresponding to the maximum F-Measure and rerun LogisticRegression with + // this selected threshold + Dataset fMeasure = binarySummary.fMeasureByThreshold(); + double maxFMeasure = fMeasure.select(functions.max("F-Measure")).head().getDouble(0); + double bestThreshold = fMeasure.where(fMeasure.col("F-Measure").equalTo(maxFMeasure)) + .select("threshold").head().getDouble(0); + lrModel.setThreshold(bestThreshold); + } + + public LogisticRegressionAnalysis(String datasetName, String studyName, String depVarName, String indepVarName) { + this(datasetName, studyName, depVarName, indepVarName, 10, 0.3, 0.8); + } + + public LogisticRegressionAnalysis(String datasetName, String studyName, String depVarName, String indepVarName, + int numIterations, double regularization, double elasticNet) { + this.datasetName = datasetName; + this.studyName = studyName; + this.depVarName = depVarName; + this.indepVarName = indepVarName; + this.numIterations = numIterations; + this.regularization = regularization; + this.elasticNet = elasticNet; + } + + public String getDepVarName() { + return depVarName; + } + + public void setDepVarName(String depVarName) { + this.depVarName = depVarName; + } + + public String getIndepVarName() { + return indepVarName; + } + + public void setIndepVarName(String indepVarName) { + this.indepVarName = indepVarName; + } + + public int getNumIterations() { + return numIterations; + } + + public void setNumIterations(int numIterations) { + this.numIterations = numIterations; + } + + public double getRegularization() { + return regularization; + } + + public void setRegularization(double regularization) { + this.regularization = regularization; + } + + public double getElasticNet() { + return elasticNet; + } + + public void setElasticNet(double elasticNet) { + this.elasticNet = elasticNet; + } +} diff --git a/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/PCAAnalysis.java b/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/PCAAnalysis.java new file mode 100644 index 00000000..104e518d --- /dev/null +++ b/hpg-bigdata-analysis/src/main/java/org/opencb/hpg/bigdata/analysis/variant/PCAAnalysis.java @@ -0,0 +1,59 @@ +package org.opencb.hpg.bigdata.analysis.variant; + +import org.apache.spark.ml.feature.PCA; +import org.apache.spark.ml.feature.PCAModel; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.opencb.hpg.bigdata.analysis.Analysis; + +/** + * Created by jtarraga on 30/05/17. + */ +public class PCAAnalysis extends Analysis { + + private int kValue = 3; + private String featureName; + + @Override + public void run() { + // prepare dataset + Dataset dataset = null; + + // fit PCA + PCAModel pca = new PCA() + .setInputCol(featureName) + .setOutputCol("pca") + .setK(kValue) + .fit(dataset); + + Dataset result = pca.transform(dataset).select("pca"); + result.show(false); + } + + public PCAAnalysis(String datasetName, String studyName, String featureName) { + this(datasetName, studyName, featureName, 3); + } + + public PCAAnalysis(String datasetName, String studyName, String featureName, int kValue) { + this.datasetName = datasetName; + this.studyName = studyName; + this.featureName = featureName; + this.kValue = kValue; + } + + public int getkValue() { + return kValue; + } + + public void setkValue(int kValue) { + this.kValue = kValue; + } + + public String getFeatureName() { + return featureName; + } + + public void setFeatureName(String featureName) { + this.featureName = featureName; + } +} diff --git a/hpg-bigdata-core/src/test/java/org/opencb/hpg/bigdata/core/lib/VariantDatasetFacetTest.java b/hpg-bigdata-core/src/test/java/org/opencb/hpg/bigdata/core/lib/VariantDatasetFacetTest.java new file mode 100644 index 00000000..9f5e622c --- /dev/null +++ b/hpg-bigdata-core/src/test/java/org/opencb/hpg/bigdata/core/lib/VariantDatasetFacetTest.java @@ -0,0 +1,113 @@ +package org.opencb.hpg.bigdata.core.lib; + +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; +import org.junit.Test; +import org.opencb.biodata.models.variant.Variant; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.List; + +/** + * Created by jtarraga on 17/05/17. + */ +public class VariantDatasetFacetTest implements Serializable { + public static class Employee implements Serializable { + private String name; + private long salary; + private String type; + + public Employee () { + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long getSalary() { + return salary; + } + + public void setSalary(long salary) { + this.salary = salary; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + } + + + @Test + public void test1() { + String path; + SparkConf sparkConf = SparkConfCreator.getConf("Dataset Variant", "local", 1, + false, ""); + //sparkConf.set(); + //sparkConf.registerKryoClasses(Array(classOf[Variant])); + //sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + //sparkConf.set("spark.kryoserializer.buffer.max", "512m"); + //sparkConf.set("spark.kryoserializer.buffer", "256m"); + + SparkSession spark = new SparkSession(new SparkContext(sparkConf)); + SQLContext sqlContext = new SQLContext(spark); +// SparkSession spark = SparkSession +// .builder() +// .appName("Java Spark SQL user-defined Datasets aggregation example") +// .getOrCreate(); + + // $example on:typed_custom_aggregation$ + Encoder employeeEncoder = Encoders.bean(Employee.class); + path = "/tmp/employees.json"; //examples/src/main/resources/employees.json"; + Dataset ds = spark.read().json(path).as(employeeEncoder); + ds.show(); + // +-------+------+ + // | name|salary| + // +-------+------+ + // |Michael| 3000| + // | Andy| 4500| + // | Justin| 3500| + // | Berta| 4000| + // +-------+------+ + + + Encoder vEncoder = Encoders.bean(Variant.class); + Variant variant; + +/* + path = "/tmp/kk/1K.vcf.annot.avro.json"; + //String avroPath = "/tmp/employees.json"; //examples/src/main/resources/employees.json"; + //Dataset dsVariant = spark.read().json(path).as(vEncoder); + Dataset dsVariant = spark.read().json(path); + dsVariant.show(4); +*/ + path = "/tmp/kk/1K.vcf.annot.avro"; + Dataset dsRow = sqlContext.read().format("com.databricks.spark.avro").load(path); + dsRow.show(4); +/* + JavaRDD employees = ds.javaRDD(); + JavaPairRDD ones = employees.mapToPair(e -> new Tuple2<>(e.getType(), 1)); + JavaPairRDD counts = ones.reduceByKey((a, b) -> a + b); + + List> output = counts.collect(); + for (Tuple2 tuple : output) { + System.out.println(tuple._1() + ": " + tuple._2()); + } +*/ + spark.stop(); + } +}