Skip to content

Commit

Permalink
Release 3.3.3 (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
tovbinm authored Jun 22, 2018
1 parent b00afe3 commit b49d81c
Show file tree
Hide file tree
Showing 269 changed files with 10,042 additions and 4,165 deletions.
9 changes: 5 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ configure(allProjs) {
ext {
scalaVersion = '2.11'
scalaVersionRevision = '8'
scalaTestVersion = '3.0.0'
scalaCheckVersion = '1.13.5'
scalaTestVersion = '3.0.5'
scalaCheckVersion = '1.14.0'
junitVersion = '4.11'
avroVersion = '1.7.7'
sparkVersion = '2.2.1'
Expand All @@ -68,7 +68,7 @@ configure(allProjs) {
jodaConvertVersion = '1.8.1'
algebirdVersion = '0.12.3'
jacksonVersion = '2.7.3'
luceneVersion = '7.1.0'
luceneVersion = '7.3.0'
enumeratumVersion = '1.4.12'
scoptVersion = '3.5.0'
googleLibPhoneNumberVersion = '8.8.5'
Expand Down Expand Up @@ -179,7 +179,8 @@ configure(allProjs) {
header = rootProject.file('LICENSE.txt')
ignoreFailures = true
include '**/*.java', '**/*.scala'
exclude '**/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala',
exclude '**/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala',
'**/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala',
'**/com/salesforce/op/test/TestSparkContext.scala',
'**/com/salesforce/op/test/TempDirectoryTest.scala',
'**/com/salesforce/op/utils/io/DirectOutputCommitter.scala',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ package com.salesforce.op.cli.gen.templates
import com.salesforce.op.features.{FeatureBuilder => FB}
import com.salesforce.op.features.types._


/**
* This is a template for generating binary feature handling in a generated project
*/
class BinaryFeatureTemplate {
private[templates] def feature =
// BEGIN
FB.Binary[SampleObject]
.extract(o => Option(o.codeGeneration_binaryField_codeGeneration).map(_.booleanValue).toBinary)
FB.Binary[SampleObject]
.extract(o => Option(o.codeGeneration_binaryField_codeGeneration).map(_.booleanValue).toBinary)
// END
}
2 changes: 2 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
compile project(':readers')
testRuntime project(':models')
testCompile project(':testkit')

// Google libphonenumber
Expand All @@ -16,6 +17,7 @@ dependencies {
// Lucene text analysis
compile "org.apache.lucene:lucene-analyzers-common:$luceneVersion"
compile "org.apache.lucene:lucene-analyzers-kuromoji:$luceneVersion"
compile "org.apache.lucene:lucene-analyzers-opennlp:$luceneVersion"
compile "org.apache.lucene:lucene-suggest:$luceneVersion"

// Scopt
Expand Down
159 changes: 143 additions & 16 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@

package com.salesforce.op

import com.salesforce.op.features.{Feature, OPFeature}
import com.salesforce.op.features.OPFeature
import com.salesforce.op.filters.RawFeatureFilter
import com.salesforce.op.readers.Reader
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.preparators.CorrelationType
import com.salesforce.op.stages.impl.selector.ModelSelectorBase
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.reflection.ReflectionUtils
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.utils.stages.FitStagesUtil.{CutDAG, FittedDAG, Layer, StagesDAG}
import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.{Estimator, Transformer}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.{MutableList => MList}
Expand Down Expand Up @@ -163,7 +167,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
Try {
stage.set(stage.getParam(k), v)
} orElse {
Try { ReflectionUtils.reflectSetterMethod(stage, k).get.apply(v) }
Try { ReflectionUtils.reflectSetterMethod(stage, k, Seq(v)) }
}
if (setStage.isFailure) log.error(
s"Setting parameter $k with value $v for stage $stage with params ${stage.params.toList} failed with an error",
Expand All @@ -180,7 +184,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
*/
private def setStagesDAG(features: Array[OPFeature]): OpWorkflow.this.type = {
// Unique stages layered by distance
val uniqueStagesLayered = DAG.compute(features)
val uniqueStagesLayered = FitStagesUtil.computeDAG(features)

if (log.isDebugEnabled) {
val total = uniqueStagesLayered.map(_.length).sum
Expand Down Expand Up @@ -311,11 +315,18 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
*/
def train(persistEveryKStages: Int = OpWorkflowModel.PersistEveryKStages)
(implicit spark: SparkSession): OpWorkflowModel = {
val rawData = generateRawData()

// Update features with fitted stages
val fittedStages = fitStages(data = rawData, stagesToFit = stages, persistEveryKStages)
val newResultFeatures = resultFeatures.map(_.copyWithNewStages(fittedStages))
val (fittedStages, newResultFeatures) =
if (stages.exists(_.isInstanceOf[Estimator[_]])) {
val rawData = generateRawData()

// Update features with fitted stages
val fittedStgs = fitStages(data = rawData, stagesToFit = stages, persistEveryKStages)
val newResultFtrs = resultFeatures.map(_.copyWithNewStages(fittedStgs))
fittedStgs -> newResultFtrs
} else {
stages -> resultFeatures
}

val model =
new OpWorkflowModel(uid, getParameters())
Expand All @@ -327,6 +338,93 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
reader.map(model.setReader).getOrElse(model)
}

/**
* Fit the estimators to return a sequence of only transformers
* Modified version of Spark 2.x Pipeline
*
* @param data dataframe to fit on
* @param stagesToFit stages that need to be converted to transformers
* @param persistEveryKStages persist data in transforms every k stages for performance improvement
* @return fitted transformers
*/
protected def fitStages(data: DataFrame, stagesToFit: Array[OPStage], persistEveryKStages: Int)
(implicit spark: SparkSession): Array[OPStage] = {

// TODO may want to make workflow take an optional reserve fraction
val splitters = stagesToFit.collect { case s: ModelSelectorBase[_, _] => s.splitter }.flatten
val splitter = splitters.reduceOption { (a, b) =>
if (a.getReserveTestFraction > b.getReserveTestFraction) a else b
}
val (train, test) = splitter.map(_.split(data)).getOrElse((data, spark.emptyDataFrame))
val hasTest = !test.isEmpty

val dag = FitStagesUtil.computeDAG(resultFeatures)
.map(_.filter(s => stagesToFit.contains(s._1)))
.filter(_.nonEmpty)

// Search for the last estimator
val indexOfLastEstimator: Option[Int] =
dag.collect { case seq if seq.exists(_._1.isInstanceOf[Estimator[_]]) => seq.head._2 }.lastOption

// doing regular workflow fit without workflow level CV
if (!isWorkflowCV) {
FitStagesUtil.fitAndTransformDAG(
dag = dag,
train = train,
test = test,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages
).transformers
} else {
// doing workflow level CV/TS
// Extract Model Selector and Split the DAG into
val CutDAG(modelSelectorOpt, before, during, after) = FitStagesUtil.cutDAG(dag)

log.info("Applying initial DAG before CV/TS. Stages: {}", before.flatMap(_.map(_._1.stageName)).mkString(", "))
val FittedDAG(beforeTrain, beforeTest, beforeTransformers) = FitStagesUtil.fitAndTransformDAG(
dag = before,
train = train,
test = test,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages
)

// Break up catalyst (cause it chokes) by converting into rdd, persisting it and then back to dataframe
val (trainRDD, testRDD) = (beforeTrain.rdd.persist(), beforeTest.rdd.persist())
val (trainFixed, testFixed) = (
spark.createDataFrame(trainRDD, beforeTrain.schema),
spark.createDataFrame(testRDD, beforeTest.schema)
)

modelSelectorOpt match {
case None => beforeTransformers
case Some((modelSelector, distance)) =>
// estimate best model
log.info("Estimate best Model with CV/TS. Stages included in CV are: {}, {}",
during.flatMap(_.map(_._1.stageName)).mkString(", "), modelSelector.uid: Any
)
modelSelector.findBestEstimator(trainFixed, during, persistEveryKStages)
val remainingDAG: StagesDAG = (during :+ (Array(modelSelector -> distance): Layer)) ++ after

log.info("Applying DAG after CV/TS. Stages: {}", remainingDAG.flatMap(_.map(_._1.stageName)).mkString(", "))
val fitted = FitStagesUtil.fitAndTransformDAG(
dag = remainingDAG,
train = trainFixed,
test = testFixed,
hasTest = hasTest,
indexOfLastEstimator = indexOfLastEstimator,
persistEveryKStages = persistEveryKStages,
fittedTransformers = beforeTransformers
).transformers
trainRDD.unpersist()
testRDD.unpersist()
fitted
}
}
}

/**
* Replaces any estimators in this workflow with their corresponding fit models from the OpWorkflowModel
* passed in. Note that the Stages UIDs must EXACTLY correspond in order to be replaced so the same features
Expand All @@ -352,15 +450,25 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
def loadModel(path: String): OpWorkflowModel = new OpWorkflowModelReader(this).load(path)

/**
* Returns a dataframe containing all the columns generated up to the feature input
* Returns a dataframe containing all the columns generated up to and including the feature input
*
* @param feature input feature to compute up to
* @param persistEveryKStages persist data in transforms every k stages for performance improvement
* @return Dataframe containing columns corresponding to all of the features generated before the feature given
* @return Dataframe containing columns corresponding to all of the features generated up to the feature given
*/
def computeDataUpTo(feature: OPFeature, persistEveryKStages: Int = OpWorkflowModel.PersistEveryKStages)
(implicit spark: SparkSession): DataFrame = {
computeDataUpTo(stopStage = findOriginStageId(feature), fitted = false, persistEveryKStages)
if (findOriginStageId(feature).isEmpty) {
log.warn("Could not find origin stage for feature in workflow!! Defaulting to generate raw features.")
generateRawData()
} else {
val rawData = generateRawData()
val stagesToFit = FitStagesUtil.computeDAG(Array(feature)).flatMap(_.map(_._1))
val fittedStages = fitStages(rawData, stagesToFit, persistEveryKStages)
val updatedFeature = feature.copyWithNewStages(fittedStages)
val dag = FitStagesUtil.computeDAG(Array(updatedFeature))
applyTransformationsDAG(rawData, dag, persistEveryKStages)
}
}

/**
Expand All @@ -383,16 +491,35 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
* @tparam T Type of the data read in
*/
@Experimental
def withRawFeatureFilter[T](trainingReader: Option[Reader[T]], scoringReader: Option[Reader[T]],
bins: Int = 100, minFillRate: Double = 0.001, maxFillDifference: Double = 0.90,
maxFillRatioDiff: Double = 20.0, maxJSDivergence: Double = 0.90, protectedFeatures: Array[OPFeature] = Array.empty
def withRawFeatureFilter[T](
trainingReader: Option[Reader[T]],
scoringReader: Option[Reader[T]],
bins: Int = 100,
minFillRate: Double = 0.001,
maxFillDifference: Double = 0.90,
maxFillRatioDiff: Double = 20.0,
maxJSDivergence: Double = 0.90,
maxCorrelation: Double = 0.95,
correlationType: CorrelationType = CorrelationType.Pearson,
protectedFeatures: Array[OPFeature] = Array.empty
): this.type = {
val training = trainingReader.orElse(reader).map(_.asInstanceOf[Reader[T]])
require(training.nonEmpty, "Reader for training data must be provided either in withRawFeatureFilter or directly" +
"as the reader for the workflow")
val protectedRawFeatures = protectedFeatures.flatMap(_.rawFeatures).map(_.name).toSet
rawFeatureFilter = Option( new RawFeatureFilter(training.get, scoringReader, bins, minFillRate,
maxFillDifference, maxFillRatioDiff, maxJSDivergence, protectedRawFeatures) )
rawFeatureFilter = Option {
new RawFeatureFilter(
trainingReader = training.get,
scoreReader = scoringReader,
bins = bins,
minFill = minFillRate,
maxFillDifference = maxFillDifference,
maxFillRatioDiff = maxFillRatioDiff,
maxJSDivergence = maxJSDivergence,
maxCorrelation = maxCorrelation,
correlationType = correlationType,
protectedFeatures = protectedRawFeatures)
}
this
}

Expand Down
Loading

0 comments on commit b49d81c

Please sign in to comment.