-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dataframe #451
base: master
Are you sure you want to change the base?
Dataframe #451
Conversation
Training datasets will be created directly before training a coordinate. FixedEffectDataset is merged into FixedEffectCoordinate; RandomEffectDataset is merged into RandomEffectCoordinate; Random effect vector projection will be disabled
2. Scores are changed to use Dataframe 3. Residuals will be computed by using a UDF on the training DataFrame. For random effects, the per-entity models will first need to be joined to the DataFrame by REID. A single UDF will do all scoring for fixed and random effects at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial comments on WIP
photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala
Outdated
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/estimators/GameEstimator.scala
Outdated
Show resolved
Hide resolved
@@ -653,25 +492,31 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P | |||
val interceptIndices = getOrDefault(coordinateInterceptIndices) | |||
|
|||
// Create the optimization coordinates for each component model | |||
val coordinates: Map[CoordinateId, C forSome { type C <: Coordinate[_] }] = | |||
val coordinates: Map[CoordinateId, C forSome { type C <: Coordinate }] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's nothing wrong here, but it might be easier to keep the Dataset
objects like we did for the tests to wrap the DataFrame
of training data (once it is generated) and the feature shard ID.
photon-api/src/main/scala/com/linkedin/photon/ml/Constants.scala
Outdated
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/Constants.scala
Outdated
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/model/RandomEffectModel.scala
Outdated
Show resolved
Hide resolved
photon-lib/src/main/scala/com/linkedin/photon/ml/algorithm/Coordinate.scala
Outdated
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/model/FixedEffectModel.scala
Outdated
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/algorithm/RandomEffectCoordinate.scala
Show resolved
Hide resolved
photon-api/src/main/scala/com/linkedin/photon/ml/supervised/model/GeneralizedLinearModel.scala
Outdated
Show resolved
Hide resolved
Forgot to comment - since all of these commits are related to one task and don't seem to have any logical separation, would you kindly crush them into one commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skipped reviewing much of the scoring changes as they looked like they were still early WIP and subject to many changes.
photon-api/src/main/scala/com/linkedin/photon/ml/util/ApiUtils.scala
Outdated
Show resolved
Hide resolved
photon-client/src/main/scala/com/linkedin/photon/ml/Constants.scala
Outdated
Show resolved
Hide resolved
optimizationProblem: DistributedOptimizationProblem[Objective], | ||
featureShardId: FeatureShardId, | ||
inputColumnsNames: InputColumnsNames) | ||
extends Coordinate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't what I was picturing when writing the design document - I was thinking of something more like generateDataset
in the proof-of-concept tests:
CoordinateDescent
callstrain
inFixedEffectCoordinate
with trainingDataFrame
train
callsgenerateDataset
generateDataset
drops unnecessary columns and trains aFixedEffectModel
CoordinateDescent
callsscore
inFixedEffectCoordinate
with trainingDataFrame
andFixedEffectModel
score
returns a newDataFrame
with a scores columnCoordinateDescent
callstrain
inRandomEffectCoordinate
with the scoredDataFrame
train
callsgenerateDataset
generateDataset
merges the offset column with the scores column, then drops unnecessary columns and trainsRandomEffectModel
CoordinateDescent
callsscore
inRandomEffectCoordinate
with the scoredDataFrame
andRandomEffectModel
score
returns a newDataFrame
with another scores column- etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you described is exactly what is implemented in the comments, but I put the corresponding code logic in different methods (instead of the one you suggested).
See CoordinateDescent line 192-208:
logger.debug(s"Updating coordinate of class ${coordinate.getClass}")
// compute scores using the previous coordinate model and update offsets
prevModelOpt.map(model => coordinate.updateOffset(model))
// Train a new model
val (model, tracker) = initialModelOpt.map(
initialModel => Timed(s"Train new model using existing model as starting point") {
coordinate.trainModel(initialModel)
}).getOrElse(
Timed(s"Train new model") {
coordinate.trainModel()
})
// Log summary
logOptimizationSummary(logger, coordinateId, model, tracker)
val modelBroadcast: Broadcast[GeneralizedLinearModel], | ||
val featureShardId: String) | ||
val modelBroadcast: Broadcast[GeneralizedLinearModel], | ||
val featureShardId: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two lines should be indented once more
...rc/main/scala/com/linkedin/photon/ml/optimization/game/RandomEffectOptimizationProblem.scala
Show resolved
Hide resolved
FixedEffectCoordinate.updateOffset (and RandomEffectCoordinate.updateOffset) are used to compute scores instead merging scores back to original dataset. |
case ( | ||
fEDataset: FixedEffectDataset, | ||
None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this becomes "None"? this is for fixed effect case?
if (hasOffsetField && hasCoordinateScoreField) { | ||
// offset = offset - old_coordinateScore + new_coordinateScore | ||
dataset.withColumn(offset, col(offset) - col(SCORE_FIELD)) | ||
fixedEffectModel.computeScore(dataset, SCORE_FIELD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is the new score saved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think new score is saved in SCORE_FIELD.
if (modelsRDD.first()._2.coefficients.variancesOption.isDefined) { | ||
stringBuilder.append(s"\nVariance: ${modelsRDD.values.map(_.coefficients.variancesL2NormOption.get).stats()}") | ||
} | ||
//stringBuilder.append(s"\nLength: ${modelsRDD.values.map(_.coefficients.means.length).stats()}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not delete them if they are not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this means we don't have stats in the log file any more?
case (_, model: RandomEffectModel) => model.unpersistRDD() | ||
case _ => | ||
} | ||
// gameModel.toMap.foreach { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete it if not used
@@ -34,35 +37,40 @@ import com.linkedin.photon.ml.util.PhotonBroadcast | |||
object CoordinateFactory { | |||
|
|||
/** | |||
* Creates a [[Coordinate]] of the appropriate type, given the input [[Dataset]], | |||
* Creates a [[Coordinate]] of the appropriate type, given the input data set, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep "dataset" as one word.
* @param dataset The input data to use for training | ||
* @param featureShardId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Miss parameter description.
* @param dataset The input data to use for training | ||
* @param featureShardId | ||
* @param inputColumnsNames |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Miss parameter description.
* @param coordinateOptConfig The optimization settings for training | ||
* @param lossFunctionFactoryConstructor A constructor for the loss function factory function | ||
* @param glmConstructor A constructor for the type of [[GeneralizedLinearModel]] being trained | ||
* @param downSamplerFactory A factory function for the [[DownSampler]] (if down-sampling is enabled) | ||
* @param normalizationContext The [[NormalizationContext]] | ||
* @param varianceComputationType Should the trained coefficient variances be computed in addition to the means? | ||
* @param interceptIndexOpt The index of the intercept, if one is present | ||
* @return A [[Coordinate]] for the [[Dataset]] of type [[D]] | ||
* @param rETypeOpt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Miss parameter description.
|
||
val lossFunctionFactory = lossFunctionFactoryConstructor(coordinateOptConfig) | ||
|
||
(dataset, coordinateOptConfig, lossFunctionFactory) match { | ||
(rETypeOpt, coordinateOptConfig, lossFunctionFactory) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just do a match on (coordinateOptConfig
, lossFunctionFactory
)? This rETypeOpt
seems to be redundant.
val optimizationTracker = new FixedEffectOptimizationTracker(optimizationProblem.getStatesTracker) | ||
|
||
(updatedFixedEffectModel, optimizationTracker) | ||
override protected[algorithm] def updateOffset(model: DatumScoringModel) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments are missing for updateOffset
.
|
||
new CoordinateDataScores(scores) | ||
def updateOffset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Miss comments for updateOffset
.
fixedEffectModel.computeScore(dataset, SCORE_FIELD) | ||
.withColumn(offset, col(offset) + col(SCORE_FIELD)) | ||
} else { | ||
throw new UnsupportedOperationException("It shouldn't happen!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make the error message more explicit?
} | ||
|
||
object FixedEffectCoordinate { | ||
|
||
def SCORE_FIELD = "fixed_score" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to call it fixed_effect_score
.
} | ||
} | ||
|
||
def toDataFrame(input: RDD[(REType, GeneralizedLinearModel)]): DataFrame = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Miss comments.
@@ -17,7 +17,6 @@ package com.linkedin.photon.ml.optimization | |||
import breeze.linalg.{Vector, cholesky, diag} | |||
import org.apache.spark.rdd.RDD | |||
import org.apache.spark.storage.StorageLevel | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is needed.
import org.apache.spark.rdd.RDD | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is needed.
|
||
score | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is not necessary.
|
||
var score = 0D | ||
|
||
coefficients match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the features are dense, then the coefficients are usually dense. If the features are sparse (for random effect), then the coefficients are sparse. So it seems that
features.foreachActive { case (index, value) => score += value * denseCoef(index)}
is good enough. Will there be cases that coefficients are sparse but features are dense?
.reduceByKey(_ + _) | ||
.values | ||
.stats() | ||
.groupBy(idTag).agg(count("*").alias("cnt")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent two spaces back.
@@ -0,0 +1,25 @@ | |||
/* | |||
* Copyright 2017 LinkedIn Corp. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copyright 2020
@@ -15,10 +15,9 @@ | |||
package com.linkedin.photon.ml.sampling | |||
|
|||
import java.util.Random | |||
|
|||
import com.linkedin.photon.ml.Types.UniqueSampleId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reorder this import.
The following changes in the design doc are covered.