Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 3.3.3 #26

Merged
merged 3 commits into from
Jun 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ configure(allProjs) {
ext {
scalaVersion = '2.11'
scalaVersionRevision = '8'
scalaTestVersion = '3.0.0'
scalaCheckVersion = '1.13.5'
scalaTestVersion = '3.0.5'
scalaCheckVersion = '1.14.0'
junitVersion = '4.11'
avroVersion = '1.7.7'
sparkVersion = '2.2.1'
Expand All @@ -68,7 +68,7 @@ configure(allProjs) {
jodaConvertVersion = '1.8.1'
algebirdVersion = '0.12.3'
jacksonVersion = '2.7.3'
luceneVersion = '7.1.0'
luceneVersion = '7.3.0'
enumeratumVersion = '1.4.12'
scoptVersion = '3.5.0'
googleLibPhoneNumberVersion = '8.8.5'
Expand Down Expand Up @@ -179,7 +179,8 @@ configure(allProjs) {
header = rootProject.file('LICENSE.txt')
ignoreFailures = true
include '**/*.java', '**/*.scala'
exclude '**/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala',
exclude '**/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala',
'**/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala',
'**/com/salesforce/op/test/TestSparkContext.scala',
'**/com/salesforce/op/test/TempDirectoryTest.scala',
'**/com/salesforce/op/utils/io/DirectOutputCommitter.scala',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ package com.salesforce.op.cli.gen.templates
import com.salesforce.op.features.{FeatureBuilder => FB}
import com.salesforce.op.features.types._


/**
* This is a template for generating binary feature handling in a generated project
*/
class BinaryFeatureTemplate {
private[templates] def feature =
// BEGIN
FB.Binary[SampleObject]
.extract(o => Option(o.codeGeneration_binaryField_codeGeneration).map(_.booleanValue).toBinary)
FB.Binary[SampleObject]
.extract(o => Option(o.codeGeneration_binaryField_codeGeneration).map(_.booleanValue).toBinary)
// END
}
2 changes: 2 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
compile project(':readers')
testRuntime project(':models')
testCompile project(':testkit')

// Google libphonenumber
Expand All @@ -16,6 +17,7 @@ dependencies {
// Lucene text analysis
compile "org.apache.lucene:lucene-analyzers-common:$luceneVersion"
compile "org.apache.lucene:lucene-analyzers-kuromoji:$luceneVersion"
compile "org.apache.lucene:lucene-analyzers-opennlp:$luceneVersion"
compile "org.apache.lucene:lucene-suggest:$luceneVersion"

// Scopt
Expand Down
159 changes: 143 additions & 16 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@

package com.salesforce.op

import com.salesforce.op.features.{Feature, OPFeature}
import com.salesforce.op.features.OPFeature
import com.salesforce.op.filters.RawFeatureFilter
import com.salesforce.op.readers.Reader
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.preparators.CorrelationType
import com.salesforce.op.stages.impl.selector.ModelSelectorBase
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.reflection.ReflectionUtils
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.utils.stages.FitStagesUtil.{CutDAG, FittedDAG, Layer, StagesDAG}
import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.{Estimator, Transformer}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.{MutableList => MList}
Expand Down Expand Up @@ -163,7 +167,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
Try {
stage.set(stage.getParam(k), v)
} orElse {
Try { ReflectionUtils.reflectSetterMethod(stage, k).get.apply(v) }
Try { ReflectionUtils.reflectSetterMethod(stage, k, Seq(v)) }
}
if (setStage.isFailure) log.error(
s"Setting parameter $k with value $v for stage $stage with params ${stage.params.toList} failed with an error",
Expand All @@ -180,7 +184,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
*/
private def setStagesDAG(features: Array[OPFeature]): OpWorkflow.this.type = {
// Unique stages layered by distance
val uniqueStagesLayered = DAG.compute(features)
val uniqueStagesLayered = FitStagesUtil.computeDAG(features)

if (log.isDebugEnabled) {
val total = uniqueStagesLayered.map(_.length).sum
Expand Down Expand Up @@ -311,11 +315,18 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
*/
def train(persistEveryKStages: Int = OpWorkflowModel.PersistEveryKStages)
(implicit spark: SparkSession): OpWorkflowModel = {
val rawData = generateRawData()

// Update features with fitted stages
val fittedStages = fitStages(data = rawData, stagesToFit = stages, persistEveryKStages)
val newResultFeatures = resultFeatures.map(_.copyWithNewStages(fittedStages))
val (fittedStages, newResultFeatures) =
if (stages.exists(_.isInstanceOf[Estimator[_]])) {
val rawData = generateRawData()

// Update features with fitted stages
val fittedStgs = fitStages(data = rawData, stagesToFit = stages, persistEveryKStages)
val newResultFtrs = resultFeatures.map(_.copyWithNewStages(fittedStgs))
fittedStgs -> newResultFtrs
} else {
stages -> resultFeatures
}

val model =
new OpWorkflowModel(uid, getParameters())
Expand All @@ -327,6 +338,93 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
reader.map(model.setReader).getOrElse(model)
}

/**
* Fit the estimators to return a sequence of only transformers
* Modified version of Spark 2.x Pipeline
*
* @param data dataframe to fit on
* @param stagesToFit stages that need to be converted to transformers
* @param persistEveryKStages persist data in transforms every k stages for performance improvement
* @return fitted transformers
*/
protected def fitStages(data: DataFrame, stagesToFit: Array[OPStage], persistEveryKStages: Int)
(implicit spark: SparkSession): Array[OPStage] = {

// TODO may want to make workflow take an optional reserve fraction
val splitters = stagesToFit.collect { case s: ModelSelectorBase[_, _] => s.splitter }.flatten
val splitter = splitters.reduceOption { (a, b) =>
if (a.getReserveTestFraction > b.getReserveTestFraction) a else b
}
val (train, test) = splitter.map(_.split(data)).getOrElse((data, spark.emptyDataFrame))
val hasTest = !test.isEmpty

val dag = FitStagesUtil.computeDAG(resultFeatures)
.map(_.filter(s => stagesToFit.contains(s._1)))
.filter(_.nonEmpty)

// Search for the last estimator
val indexOfLastEstimator: Option[Int] =
dag.collect { case seq if seq.exists(_._1.isInstanceOf[Estimator[_]]) => seq.head._2 }.lastOption

// doing regular workflow fit without workflow level CV
if (!isWorkflowCV) {
FitStagesUtil.fitAndTransformDAG(
dag = dag,
train = train,
test = test,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages
).transformers
} else {
// doing workflow level CV/TS
// Extract Model Selector and Split the DAG into
val CutDAG(modelSelectorOpt, before, during, after) = FitStagesUtil.cutDAG(dag)

log.info("Applying initial DAG before CV/TS. Stages: {}", before.flatMap(_.map(_._1.stageName)).mkString(", "))
val FittedDAG(beforeTrain, beforeTest, beforeTransformers) = FitStagesUtil.fitAndTransformDAG(
dag = before,
train = train,
test = test,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages
)

// Break up catalyst (cause it chokes) by converting into rdd, persisting it and then back to dataframe
val (trainRDD, testRDD) = (beforeTrain.rdd.persist(), beforeTest.rdd.persist())
val (trainFixed, testFixed) = (
spark.createDataFrame(trainRDD, beforeTrain.schema),
spark.createDataFrame(testRDD, beforeTest.schema)
)

modelSelectorOpt match {
case None => beforeTransformers
case Some((modelSelector, distance)) =>
// estimate best model
log.info("Estimate best Model with CV/TS. Stages included in CV are: {}, {}",
during.flatMap(_.map(_._1.stageName)).mkString(", "), modelSelector.uid: Any
)
modelSelector.findBestEstimator(trainFixed, during, persistEveryKStages)
val remainingDAG: StagesDAG = (during :+ (Array(modelSelector -> distance): Layer)) ++ after

log.info("Applying DAG after CV/TS. Stages: {}", remainingDAG.flatMap(_.map(_._1.stageName)).mkString(", "))
val fitted = FitStagesUtil.fitAndTransformDAG(
dag = remainingDAG,
train = trainFixed,
test = testFixed,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages,
fittedTransformers = beforeTransformers
).transformers
trainRDD.unpersist()
testRDD.unpersist()
fitted
}
}
}

/**
* Replaces any estimators in this workflow with their corresponding fit models from the OpWorkflowModel
* passed in. Note that the Stages UIDs must EXACTLY correspond in order to be replaced so the same features
Expand All @@ -352,15 +450,25 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
def loadModel(path: String): OpWorkflowModel = new OpWorkflowModelReader(this).load(path)

/**
* Returns a dataframe containing all the columns generated up to the feature input
* Returns a dataframe containing all the columns generated up to and including the feature input
*
* @param feature input feature to compute up to
* @param persistEveryKStages persist data in transforms every k stages for performance improvement
* @return Dataframe containing columns corresponding to all of the features generated before the feature given
* @return Dataframe containing columns corresponding to all of the features generated up to the feature given
*/
def computeDataUpTo(feature: OPFeature, persistEveryKStages: Int = OpWorkflowModel.PersistEveryKStages)
(implicit spark: SparkSession): DataFrame = {
computeDataUpTo(stopStage = findOriginStageId(feature), fitted = false, persistEveryKStages)
if (findOriginStageId(feature).isEmpty) {
log.warn("Could not find origin stage for feature in workflow!! Defaulting to generate raw features.")
generateRawData()
} else {
val rawData = generateRawData()
val stagesToFit = FitStagesUtil.computeDAG(Array(feature)).flatMap(_.map(_._1))
val fittedStages = fitStages(rawData, stagesToFit, persistEveryKStages)
val updatedFeature = feature.copyWithNewStages(fittedStages)
val dag = FitStagesUtil.computeDAG(Array(updatedFeature))
applyTransformationsDAG(rawData, dag, persistEveryKStages)
}
}

/**
Expand All @@ -383,16 +491,35 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
* @tparam T Type of the data read in
*/
@Experimental
def withRawFeatureFilter[T](trainingReader: Option[Reader[T]], scoringReader: Option[Reader[T]],
bins: Int = 100, minFillRate: Double = 0.001, maxFillDifference: Double = 0.90,
maxFillRatioDiff: Double = 20.0, maxJSDivergence: Double = 0.90, protectedFeatures: Array[OPFeature] = Array.empty
def withRawFeatureFilter[T](
trainingReader: Option[Reader[T]],
scoringReader: Option[Reader[T]],
bins: Int = 100,
minFillRate: Double = 0.001,
maxFillDifference: Double = 0.90,
maxFillRatioDiff: Double = 20.0,
maxJSDivergence: Double = 0.90,
maxCorrelation: Double = 0.95,
correlationType: CorrelationType = CorrelationType.Pearson,
protectedFeatures: Array[OPFeature] = Array.empty
): this.type = {
val training = trainingReader.orElse(reader).map(_.asInstanceOf[Reader[T]])
require(training.nonEmpty, "Reader for training data must be provided either in withRawFeatureFilter or directly" +
"as the reader for the workflow")
val protectedRawFeatures = protectedFeatures.flatMap(_.rawFeatures).map(_.name).toSet
rawFeatureFilter = Option( new RawFeatureFilter(training.get, scoringReader, bins, minFillRate,
maxFillDifference, maxFillRatioDiff, maxJSDivergence, protectedRawFeatures) )
rawFeatureFilter = Option {
new RawFeatureFilter(
trainingReader = training.get,
scoreReader = scoringReader,
bins = bins,
minFill = minFillRate,
maxFillDifference = maxFillDifference,
maxFillRatioDiff = maxFillRatioDiff,
maxJSDivergence = maxJSDivergence,
maxCorrelation = maxCorrelation,
correlationType = correlationType,
protectedFeatures = protectedRawFeatures)
}
this
}

Expand Down
Loading