Skip to content

Commit

Permalink
analysis: add analysis files (using Spark MLlib), #126
Browse files Browse the repository at this point in the history
  • Loading branch information
jtarraga committed Jun 1, 2017
1 parent 6d5333d commit 8e1d038
Show file tree
Hide file tree
Showing 5 changed files with 449 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opencb.hpg.bigdata.analysis;

/**
* Created by jtarraga on 30/05/17.
*/
public abstract class Analysis {

protected String datasetName;
protected String studyName;

public abstract void run();

public String getDatasetName() {
return datasetName;
}

public void setDatasetName(String datasetName) {
this.datasetName = datasetName;
}

public String getStudyName() {
return studyName;
}

public void setStudyName(String studyName) {
this.studyName = studyName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package org.opencb.hpg.bigdata.analysis.variant;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.LabeledPoint;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.opencb.hpg.bigdata.analysis.Analysis;

import java.util.ArrayList;
import java.util.List;

/**
* Created by jtarraga on 30/05/17.
*/
public class LinearRegressionAnalysis extends Analysis {

private String depVarName;
private String indepVarName;

private int numIterations = 10; // number of iterations
private double regularization = 0.3; // regularization parameter
private double elasticNet = 0.8; // elastic net mixing parameter

@Override
public void run() {
LinearRegression lr = new LinearRegression()
.setMaxIter(numIterations)
.setRegParam(regularization)
.setElasticNetParam(elasticNet);

// prepare dataset
int numFeatures = 10;
double target = Double.NaN;
double[] features = new double[numFeatures];
LabeledPoint lp = new LabeledPoint(target, Vectors.dense(features));

List<LabeledPoint> list = new ArrayList<LabeledPoint>();
list.add(lp);
JavaSparkContext jsc = new JavaSparkContext();
SQLContext sqlContext = new SQLContext(jsc);
JavaRDD<LabeledPoint> data = jsc.parallelize(list);
data.cache();

// fit the model
Dataset<Row> training = sqlContext.createDataFrame(data.rdd(), LabeledPoint.class);
LinearRegressionModel lrModel = lr.fit(training);

// print the coefficients and intercept for linear regression
System.out.println("Coefficients: "
+ lrModel.coefficients() + " Intercept: " + lrModel.intercept());

// summarize the model over the training set and print out some metrics
LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
System.out.println("numIterations: " + trainingSummary.totalIterations());
System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory()));
trainingSummary.residuals().show();
System.out.println("RMSE: " + trainingSummary.rootMeanSquaredError());
System.out.println("r2: " + trainingSummary.r2());
}

public LinearRegressionAnalysis(String datasetName, String studyName, String depVarName, String indepVarName) {
this(datasetName, studyName, depVarName, indepVarName, 10, 0.3, 0.8);
}

public LinearRegressionAnalysis(String datasetName, String studyName, String depVarName, String indepVarName,
int numIterations, double regularization, double elasticNet) {
this.datasetName = datasetName;
this.studyName = studyName;
this.depVarName = depVarName;
this.indepVarName = indepVarName;
this.numIterations = numIterations;
this.regularization = regularization;
this.elasticNet = elasticNet;
}

public String getDepVarName() {
return depVarName;
}

public void setDepVarName(String depVarName) {
this.depVarName = depVarName;
}

public String getIndepVarName() {
return indepVarName;
}

public void setIndepVarName(String indepVarName) {
this.indepVarName = indepVarName;
}

public int getNumIterations() {
return numIterations;
}

public void setNumIterations(int numIterations) {
this.numIterations = numIterations;
}

public double getRegularization() {
return regularization;
}

public void setRegularization(double regularization) {
this.regularization = regularization;
}

public double getElasticNet() {
return elasticNet;
}

public void setElasticNet(double elasticNet) {
this.elasticNet = elasticNet;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package org.opencb.hpg.bigdata.analysis.variant;

import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.classification.LogisticRegressionTrainingSummary;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.opencb.hpg.bigdata.analysis.Analysis;

/**
* Created by jtarraga on 30/05/17.
*/
public class LogisticRegressionAnalysis extends Analysis {

private String depVarName;
private String indepVarName;

private int numIterations = 10; // number of iterations
private double regularization = 0.3; // regularization parameter
private double elasticNet = 0.8; // elastic net mixing parameter

@Override
public void run() {
LogisticRegression lr = new LogisticRegression()
.setMaxIter(numIterations)
.setRegParam(regularization)
.setElasticNetParam(elasticNet);

// prepare dataset
Dataset<Row> training = null;

// fit the model
LogisticRegressionModel lrModel = lr.fit(training);

// print the coefficients and intercept for linear regression
System.out.println("Coefficients: "
+ lrModel.coefficients() + " Intercept: " + lrModel.intercept());

// summarize the model over the training set and print out some metrics
LogisticRegressionTrainingSummary trainingSummary = lrModel.summary();
System.out.println("numIterations: " + trainingSummary.totalIterations());
System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory()));

// obtain the loss per iteration
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
System.out.println(lossPerIteration);
}

// obtain the metrics useful to judge performance on test data
// we cast the summary to a BinaryLogisticRegressionSummary since the problem is a binary
// classification problem.
BinaryLogisticRegressionSummary binarySummary =
(BinaryLogisticRegressionSummary) trainingSummary;

// obtain the receiver-operating characteristic as a dataframe and areaUnderROC
Dataset<Row> roc = binarySummary.roc();
roc.show();
roc.select("FPR").show();
System.out.println(binarySummary.areaUnderROC());

// get the threshold corresponding to the maximum F-Measure and rerun LogisticRegression with
// this selected threshold
Dataset<Row> fMeasure = binarySummary.fMeasureByThreshold();
double maxFMeasure = fMeasure.select(functions.max("F-Measure")).head().getDouble(0);
double bestThreshold = fMeasure.where(fMeasure.col("F-Measure").equalTo(maxFMeasure))
.select("threshold").head().getDouble(0);
lrModel.setThreshold(bestThreshold);
}

public LogisticRegressionAnalysis(String datasetName, String studyName, String depVarName, String indepVarName) {
this(datasetName, studyName, depVarName, indepVarName, 10, 0.3, 0.8);
}

public LogisticRegressionAnalysis(String datasetName, String studyName, String depVarName, String indepVarName,
int numIterations, double regularization, double elasticNet) {
this.datasetName = datasetName;
this.studyName = studyName;
this.depVarName = depVarName;
this.indepVarName = indepVarName;
this.numIterations = numIterations;
this.regularization = regularization;
this.elasticNet = elasticNet;
}

public String getDepVarName() {
return depVarName;
}

public void setDepVarName(String depVarName) {
this.depVarName = depVarName;
}

public String getIndepVarName() {
return indepVarName;
}

public void setIndepVarName(String indepVarName) {
this.indepVarName = indepVarName;
}

public int getNumIterations() {
return numIterations;
}

public void setNumIterations(int numIterations) {
this.numIterations = numIterations;
}

public double getRegularization() {
return regularization;
}

public void setRegularization(double regularization) {
this.regularization = regularization;
}

public double getElasticNet() {
return elasticNet;
}

public void setElasticNet(double elasticNet) {
this.elasticNet = elasticNet;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.opencb.hpg.bigdata.analysis.variant;

import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.opencb.hpg.bigdata.analysis.Analysis;

/**
* Created by jtarraga on 30/05/17.
*/
public class PCAAnalysis extends Analysis {

private int kValue = 3;
private String featureName;

@Override
public void run() {
// prepare dataset
Dataset<Row> dataset = null;

// fit PCA
PCAModel pca = new PCA()
.setInputCol(featureName)
.setOutputCol("pca")
.setK(kValue)
.fit(dataset);

Dataset<Row> result = pca.transform(dataset).select("pca");
result.show(false);
}

public PCAAnalysis(String datasetName, String studyName, String featureName) {
this(datasetName, studyName, featureName, 3);
}

public PCAAnalysis(String datasetName, String studyName, String featureName, int kValue) {
this.datasetName = datasetName;
this.studyName = studyName;
this.featureName = featureName;
this.kValue = kValue;
}

public int getkValue() {
return kValue;
}

public void setkValue(int kValue) {
this.kValue = kValue;
}

public String getFeatureName() {
return featureName;
}

public void setFeatureName(String featureName) {
this.featureName = featureName;
}
}
Loading

0 comments on commit 8e1d038

Please sign in to comment.