Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataframe #451

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open

Dataframe #451

wants to merge 11 commits into from

Conversation

lguo
Copy link
Contributor

@lguo lguo commented Feb 12, 2020

The following changes in the design doc are covered.

  1. Training datasets will be created directly before training a coordinate.
  2. Residuals will be computed by using a UDF on the training DataFrame. For random effects, the per-entity models will first need to be joined to the DataFrame by REID. A single UDF will do all scoring for fixed and random effects at once. This UDF will also sum the residuals and offsets. Directly before aggregation, the DataFrame will be converted to a RDD, and then aggregation will proceed unmodified.
  3. Model scoring will work like coordinate scoring;
  4. Random effect vector projection will be disabled.

Training datasets will be created directly before training a coordinate.
  FixedEffectDataset is merged into FixedEffectCoordinate;
  RandomEffectDataset is merged into RandomEffectCoordinate;
Random effect vector projection will be disabled
2. Scores are changed to use Dataframe
3. Residuals will be computed by using a UDF on the training DataFrame. For random effects, the per-entity models will first need to be joined to the DataFrame by REID. A single UDF will do all scoring for fixed and random effects at once.
Copy link
Contributor

@ashelkovnykov ashelkovnykov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial comments on WIP

@@ -653,25 +492,31 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
val interceptIndices = getOrDefault(coordinateInterceptIndices)

// Create the optimization coordinates for each component model
val coordinates: Map[CoordinateId, C forSome { type C <: Coordinate[_] }] =
val coordinates: Map[CoordinateId, C forSome { type C <: Coordinate }] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's nothing wrong here, but it might be easier to keep the Dataset objects like we did for the tests to wrap the DataFrame of training data (once it is generated) and the feature shard ID.

@ashelkovnykov
Copy link
Contributor

Forgot to comment - since all of these commits are related to one task and don't seem to have any logical separation, would you kindly crush them into one commit.

Copy link
Contributor

@ashelkovnykov ashelkovnykov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I skipped reviewing much of the scoring changes as they looked like they were still early WIP and subject to many changes.

optimizationProblem: DistributedOptimizationProblem[Objective],
featureShardId: FeatureShardId,
inputColumnsNames: InputColumnsNames)
extends Coordinate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't what I was picturing when writing the design document - I was thinking of something more like generateDataset in the proof-of-concept tests:

  • CoordinateDescent calls train in FixedEffectCoordinate with training DataFrame
  • train calls generateDataset
  • generateDataset drops unnecessary columns and trains a FixedEffectModel
  • CoordinateDescent calls score in FixedEffectCoordinate with training DataFrame and FixedEffectModel
  • score returns a new DataFrame with a scores column
  • CoordinateDescent calls train in RandomEffectCoordinate with the scored DataFrame
  • train calls generateDataset
  • generateDataset merges the offset column with the scores column, then drops unnecessary columns and trains RandomEffectModel
  • CoordinateDescent calls score in RandomEffectCoordinate with the scored DataFrame and RandomEffectModel
  • score returns a new DataFrame with another scores column
  • etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you described is exactly what is implemented in the comments, but I put the corresponding code logic in different methods (instead of the one you suggested).

See CoordinateDescent line 192-208:

  logger.debug(s"Updating coordinate of class ${coordinate.getClass}")

 // compute scores using the previous coordinate model and update offsets
  prevModelOpt.map(model => coordinate.updateOffset(model))
 // Train a new model
  val (model, tracker) = initialModelOpt.map(
    initialModel => Timed(s"Train new model using existing model as starting point") {
      coordinate.trainModel(initialModel)
    }).getOrElse(
    Timed(s"Train new model") {
      coordinate.trainModel()
    })
  // Log summary
  logOptimizationSummary(logger, coordinateId, model, tracker)

val modelBroadcast: Broadcast[GeneralizedLinearModel],
val featureShardId: String)
val modelBroadcast: Broadcast[GeneralizedLinearModel],
val featureShardId: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two lines should be indented once more

@lguo
Copy link
Contributor Author

lguo commented Feb 24, 2020

I skipped reviewing much of the scoring changes as they looked like they were still early WIP and subject to many changes.

FixedEffectCoordinate.updateOffset (and RandomEffectCoordinate.updateOffset) are used to compute scores instead merging scores back to original dataset.

case (
fEDataset: FixedEffectDataset,
None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this becomes "None"? this is for fixed effect case?

if (hasOffsetField && hasCoordinateScoreField) {
// offset = offset - old_coordinateScore + new_coordinateScore
dataset.withColumn(offset, col(offset) - col(SCORE_FIELD))
fixedEffectModel.computeScore(dataset, SCORE_FIELD)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the new score saved?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think new score is saved in SCORE_FIELD.

if (modelsRDD.first()._2.coefficients.variancesOption.isDefined) {
stringBuilder.append(s"\nVariance: ${modelsRDD.values.map(_.coefficients.variancesL2NormOption.get).stats()}")
}
//stringBuilder.append(s"\nLength: ${modelsRDD.values.map(_.coefficients.means.length).stats()}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not delete them if they are not used.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this means we don't have stats in the log file any more?

case (_, model: RandomEffectModel) => model.unpersistRDD()
case _ =>
}
// gameModel.toMap.foreach {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete it if not used

@@ -34,35 +37,40 @@ import com.linkedin.photon.ml.util.PhotonBroadcast
object CoordinateFactory {

/**
* Creates a [[Coordinate]] of the appropriate type, given the input [[Dataset]],
* Creates a [[Coordinate]] of the appropriate type, given the input data set,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep "dataset" as one word.

* @param dataset The input data to use for training
* @param featureShardId
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss parameter description.

* @param dataset The input data to use for training
* @param featureShardId
* @param inputColumnsNames
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss parameter description.

* @param coordinateOptConfig The optimization settings for training
* @param lossFunctionFactoryConstructor A constructor for the loss function factory function
* @param glmConstructor A constructor for the type of [[GeneralizedLinearModel]] being trained
* @param downSamplerFactory A factory function for the [[DownSampler]] (if down-sampling is enabled)
* @param normalizationContext The [[NormalizationContext]]
* @param varianceComputationType Should the trained coefficient variances be computed in addition to the means?
* @param interceptIndexOpt The index of the intercept, if one is present
* @return A [[Coordinate]] for the [[Dataset]] of type [[D]]
* @param rETypeOpt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss parameter description.


val lossFunctionFactory = lossFunctionFactoryConstructor(coordinateOptConfig)

(dataset, coordinateOptConfig, lossFunctionFactory) match {
(rETypeOpt, coordinateOptConfig, lossFunctionFactory) match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just do a match on (coordinateOptConfig, lossFunctionFactory)? This rETypeOpt seems to be redundant.

val optimizationTracker = new FixedEffectOptimizationTracker(optimizationProblem.getStatesTracker)

(updatedFixedEffectModel, optimizationTracker)
override protected[algorithm] def updateOffset(model: DatumScoringModel) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments are missing for updateOffset.


new CoordinateDataScores(scores)
def updateOffset(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss comments for updateOffset.

fixedEffectModel.computeScore(dataset, SCORE_FIELD)
.withColumn(offset, col(offset) + col(SCORE_FIELD))
} else {
throw new UnsupportedOperationException("It shouldn't happen!")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make the error message more explicit?

}

object FixedEffectCoordinate {

def SCORE_FIELD = "fixed_score"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to call it fixed_effect_score.

}
}

def toDataFrame(input: RDD[(REType, GeneralizedLinearModel)]): DataFrame = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss comments.

@@ -17,7 +17,6 @@ package com.linkedin.photon.ml.optimization
import breeze.linalg.{Vector, cholesky, diag}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is needed.

import org.apache.spark.rdd.RDD

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is needed.


score
})

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is not necessary.


var score = 0D

coefficients match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the features are dense, then the coefficients are usually dense. If the features are sparse (for random effect), then the coefficients are sparse. So it seems that

features.foreachActive { case (index, value) => score += value * denseCoef(index)}

is good enough. Will there be cases that coefficients are sparse but features are dense?

.reduceByKey(_ + _)
.values
.stats()
.groupBy(idTag).agg(count("*").alias("cnt"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent two spaces back.

@@ -0,0 +1,25 @@
/*
* Copyright 2017 LinkedIn Corp. All rights reserved.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright 2020

@@ -15,10 +15,9 @@
package com.linkedin.photon.ml.sampling

import java.util.Random

import com.linkedin.photon.ml.Types.UniqueSampleId
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reorder this import.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants