-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dataframe #451
base: master
Are you sure you want to change the base?
Dataframe #451
Changes from 10 commits
aec6486
0df72a0
4b373c6
ace606f
0ce3fd2
26c6210
05b3e01
ae98cc8
1875588
1f36c78
77a8dc4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,9 +14,12 @@ | |
*/ | ||
package com.linkedin.photon.ml.algorithm | ||
|
||
import com.linkedin.photon.ml.data.{Dataset, FixedEffectDataset, RandomEffectDataset} | ||
import com.linkedin.photon.ml.function.ObjectiveFunctionHelper.{DistributedObjectiveFunctionFactory, ObjectiveFunctionFactoryFactory, SingleNodeObjectiveFunctionFactory} | ||
import org.apache.spark.sql.{DataFrame, SparkSession} | ||
|
||
import com.linkedin.photon.ml.Types.{FeatureShardId, REType} | ||
import com.linkedin.photon.ml.data.InputColumnsNames | ||
import com.linkedin.photon.ml.function.ObjectiveFunction | ||
import com.linkedin.photon.ml.function.ObjectiveFunctionHelper.{DistributedObjectiveFunctionFactory, ObjectiveFunctionFactoryFactory, SingleNodeObjectiveFunctionFactory} | ||
import com.linkedin.photon.ml.model.Coefficients | ||
import com.linkedin.photon.ml.normalization.NormalizationContext | ||
import com.linkedin.photon.ml.optimization.DistributedOptimizationProblem | ||
|
@@ -34,35 +37,40 @@ import com.linkedin.photon.ml.util.PhotonBroadcast | |
object CoordinateFactory { | ||
|
||
/** | ||
* Creates a [[Coordinate]] of the appropriate type, given the input [[Dataset]], | ||
* Creates a [[Coordinate]] of the appropriate type, given the input data set, | ||
* [[CoordinateOptimizationConfiguration]], and [[ObjectiveFunction]]. | ||
* | ||
* @tparam D Some type of [[Dataset]] | ||
* @param dataset The input data to use for training | ||
* @param featureShardId | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Miss parameter description. |
||
* @param inputColumnsNames | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Miss parameter description. |
||
* @param coordinateOptConfig The optimization settings for training | ||
* @param lossFunctionFactoryConstructor A constructor for the loss function factory function | ||
* @param glmConstructor A constructor for the type of [[GeneralizedLinearModel]] being trained | ||
* @param downSamplerFactory A factory function for the [[DownSampler]] (if down-sampling is enabled) | ||
* @param normalizationContext The [[NormalizationContext]] | ||
* @param varianceComputationType Should the trained coefficient variances be computed in addition to the means? | ||
* @param interceptIndexOpt The index of the intercept, if one is present | ||
* @return A [[Coordinate]] for the [[Dataset]] of type [[D]] | ||
* @param rETypeOpt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Miss parameter description. |
||
* @return A [[Coordinate]] instance | ||
*/ | ||
def build[D <: Dataset[D]]( | ||
dataset: D, | ||
def build( | ||
dataset: DataFrame, | ||
featureShardId: FeatureShardId, | ||
inputColumnsNames: InputColumnsNames, | ||
coordinateOptConfig: CoordinateOptimizationConfiguration, | ||
lossFunctionFactoryConstructor: ObjectiveFunctionFactoryFactory, | ||
glmConstructor: Coefficients => GeneralizedLinearModel, | ||
downSamplerFactory: DownSamplerFactory, | ||
normalizationContext: NormalizationContext, | ||
varianceComputationType: VarianceComputationType, | ||
interceptIndexOpt: Option[Int]): Coordinate[D] = { | ||
interceptIndexOpt: Option[Int], | ||
rETypeOpt: Option[REType]): Coordinate = { | ||
|
||
val lossFunctionFactory = lossFunctionFactoryConstructor(coordinateOptConfig) | ||
|
||
(dataset, coordinateOptConfig, lossFunctionFactory) match { | ||
(rETypeOpt, coordinateOptConfig, lossFunctionFactory) match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just do a match on ( |
||
case ( | ||
fEDataset: FixedEffectDataset, | ||
None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this becomes "None"? this is for fixed effect case? |
||
fEOptConfig: FixedEffectOptimizationConfiguration, | ||
distributedLossFunctionFactory: DistributedObjectiveFunctionFactory) => | ||
|
||
|
@@ -71,36 +79,43 @@ object CoordinateFactory { | |
} else { | ||
None | ||
} | ||
val normalizationPhotonBroadcast = PhotonBroadcast(fEDataset.sparkContext.broadcast(normalizationContext)) | ||
val normalizationPhotonBroadcast = PhotonBroadcast( | ||
SparkSession.builder.getOrCreate.sparkContext | ||
.broadcast(normalizationContext)) | ||
|
||
new FixedEffectCoordinate( | ||
fEDataset, | ||
dataset, | ||
DistributedOptimizationProblem( | ||
fEOptConfig, | ||
distributedLossFunctionFactory(interceptIndexOpt), | ||
downSamplerOpt, | ||
glmConstructor, | ||
normalizationPhotonBroadcast, | ||
varianceComputationType)).asInstanceOf[Coordinate[D]] | ||
varianceComputationType), | ||
featureShardId, | ||
inputColumnsNames).asInstanceOf[Coordinate] | ||
|
||
case ( | ||
rEDataset: RandomEffectDataset, | ||
Some(rEType), | ||
rEOptConfig: RandomEffectOptimizationConfiguration, | ||
singleNodeLossFunctionFactory: SingleNodeObjectiveFunctionFactory) => | ||
|
||
RandomEffectCoordinate( | ||
rEDataset, | ||
dataset, | ||
rEType, | ||
featureShardId, | ||
inputColumnsNames, | ||
rEOptConfig, | ||
singleNodeLossFunctionFactory, | ||
glmConstructor, | ||
normalizationContext, | ||
varianceComputationType, | ||
interceptIndexOpt).asInstanceOf[Coordinate[D]] | ||
interceptIndexOpt).asInstanceOf[Coordinate] | ||
|
||
case _ => | ||
throw new UnsupportedOperationException( | ||
s"""Cannot build coordinate for the following input class combination: | ||
| ${dataset.getClass.getName} | ||
| ${rETypeOpt.getOrElse("fixed-effect")} | ||
| ${coordinateOptConfig.getClass.getName} | ||
| ${lossFunctionFactory.getClass.getName}""".stripMargin) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,52 +14,41 @@ | |
*/ | ||
package com.linkedin.photon.ml.algorithm | ||
|
||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.ml.linalg.{Vector => SparkVector} | ||
import org.apache.spark.sql.DataFrame | ||
import org.apache.spark.sql.functions.col | ||
import org.apache.spark.storage.StorageLevel | ||
|
||
import com.linkedin.photon.ml.Types.{FeatureShardId, UniqueSampleId} | ||
import com.linkedin.photon.ml.constants.DataConst | ||
import com.linkedin.photon.ml.data._ | ||
import com.linkedin.photon.ml.data.scoring.CoordinateDataScores | ||
import com.linkedin.photon.ml.function.DistributedObjectiveFunction | ||
import com.linkedin.photon.ml.model.{DatumScoringModel, FixedEffectModel} | ||
import com.linkedin.photon.ml.optimization.{DistributedOptimizationProblem, FixedEffectOptimizationTracker, OptimizationTracker} | ||
import com.linkedin.photon.ml.util.{ApiUtils, VectorUtils} | ||
|
||
/** | ||
* The optimization problem coordinate for a fixed effect model. | ||
* | ||
* @tparam Objective The type of objective function used to solve the fixed effect optimization problem | ||
* @param dataset The training dataset | ||
* @param dataset The raw training data | ||
* @param optimizationProblem The fixed effect optimization problem | ||
* @param inputColumnsNames | ||
*/ | ||
protected[ml] class FixedEffectCoordinate[Objective <: DistributedObjectiveFunction]( | ||
override protected val dataset: FixedEffectDataset, | ||
optimizationProblem: DistributedOptimizationProblem[Objective]) | ||
extends Coordinate[FixedEffectDataset](dataset) { | ||
var dataset: DataFrame, | ||
optimizationProblem: DistributedOptimizationProblem[Objective], | ||
featureShardId: FeatureShardId, | ||
inputColumnsNames: InputColumnsNames) | ||
extends Coordinate { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't what I was picturing when writing the design document - I was thinking of something more like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What you described is exactly what is implemented in the comments, but I put the corresponding code logic in different methods (instead of the one you suggested). See CoordinateDescent line 192-208:
|
||
|
||
/** | ||
* Update the coordinate with a new dataset. | ||
* | ||
* @param dataset The updated dataset | ||
* @return A new coordinate with the updated dataset | ||
*/ | ||
override protected[algorithm] def updateCoordinateWithDataset( | ||
dataset: FixedEffectDataset): FixedEffectCoordinate[Objective] = | ||
new FixedEffectCoordinate[Objective](dataset, optimizationProblem) | ||
|
||
/** | ||
* Compute an optimized model (i.e. run the coordinate optimizer) for the current dataset. | ||
* | ||
* @return A (updated model, optimization state tracking information) tuple | ||
*/ | ||
override protected[algorithm] def trainModel(): (DatumScoringModel, OptimizationTracker) = { | ||
|
||
val updatedFixedEffectModel = FixedEffectCoordinate.trainModel( | ||
dataset.labeledPoints, | ||
optimizationProblem, | ||
dataset.featureShardId, | ||
None) | ||
val optimizationTracker = new FixedEffectOptimizationTracker(optimizationProblem.getStatesTracker) | ||
|
||
(updatedFixedEffectModel, optimizationTracker) | ||
override protected[algorithm] def updateOffset(model: DatumScoringModel) = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comments are missing for |
||
model match { | ||
case fixedEffectModel: FixedEffectModel => | ||
dataset = FixedEffectCoordinate.updateOffset(dataset, fixedEffectModel, featureShardId, inputColumnsNames) | ||
case _ => | ||
throw new UnsupportedOperationException(s"Unsupported model type: ${model.modelType}") | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -72,83 +61,100 @@ protected[ml] class FixedEffectCoordinate[Objective <: DistributedObjectiveFunct | |
override protected[algorithm] def trainModel(model: DatumScoringModel): (DatumScoringModel, OptimizationTracker) = | ||
model match { | ||
case fixedEffectModel: FixedEffectModel => | ||
val updatedFixedEffectModel = FixedEffectCoordinate.trainModel( | ||
dataset.labeledPoints, | ||
FixedEffectCoordinate.trainModel( | ||
dataset, | ||
optimizationProblem, | ||
dataset.featureShardId, | ||
featureShardId, | ||
inputColumnsNames, | ||
Some(fixedEffectModel)) | ||
val optimizationTracker = new FixedEffectOptimizationTracker(optimizationProblem.getStatesTracker) | ||
|
||
(updatedFixedEffectModel, optimizationTracker) | ||
|
||
case _ => | ||
throw new UnsupportedOperationException( | ||
s"Training model of type ${model.getClass} in ${this.getClass} is not supported") | ||
} | ||
|
||
|
||
/** | ||
* Compute scores for the coordinate dataset using the given model. | ||
* Compute an optimized model (i.e. run the coordinate optimizer) for the current dataset. | ||
* | ||
* @param model The input model | ||
* @return The dataset scores | ||
* @return A (updated model, optimization state tracking information) tuple | ||
*/ | ||
override protected[algorithm] def score(model: DatumScoringModel): CoordinateDataScores = model match { | ||
override protected[algorithm] def trainModel(): (DatumScoringModel, OptimizationTracker) = | ||
FixedEffectCoordinate.trainModel(dataset, optimizationProblem, featureShardId, inputColumnsNames, None) | ||
|
||
case fixedEffectModel: FixedEffectModel => | ||
FixedEffectCoordinate.score(dataset, fixedEffectModel) | ||
|
||
case _ => | ||
throw new UnsupportedOperationException( | ||
s"Scoring with model of type ${model.getClass} in ${this.getClass} is not supported") | ||
} | ||
} | ||
|
||
object FixedEffectCoordinate { | ||
|
||
def SCORE_FIELD = "fixed_score" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to call it |
||
|
||
/** | ||
* Train a new [[FixedEffectModel]] (i.e. run model optimization). | ||
* | ||
* @param input The training dataset | ||
* @param dataset The training dataset | ||
* @param optimizationProblem The optimization problem | ||
* @param featureShardId The ID of the feature shard for the training data | ||
* @param initialFixedEffectModelOpt An optional existing [[FixedEffectModel]] to use as a starting point for | ||
* optimization | ||
* @return A new [[FixedEffectModel]] | ||
*/ | ||
private def trainModel[Function <: DistributedObjectiveFunction]( | ||
input: RDD[(UniqueSampleId, LabeledPoint)], | ||
optimizationProblem: DistributedOptimizationProblem[Function], | ||
featureShardId: FeatureShardId, | ||
initialFixedEffectModelOpt: Option[FixedEffectModel]): FixedEffectModel = { | ||
|
||
val newModel = initialFixedEffectModelOpt | ||
.map { initialFixedEffectModel => | ||
optimizationProblem.runWithSampling(input, initialFixedEffectModel.model) | ||
dataset: DataFrame, | ||
optimizationProblem: DistributedOptimizationProblem[Function], | ||
featureShardId: FeatureShardId, | ||
inputColumnsNames: InputColumnsNames, | ||
initialFixedEffectModelOpt: Option[FixedEffectModel]): (FixedEffectModel, OptimizationTracker) = { | ||
|
||
val rdd = dataset | ||
.rdd | ||
.map { row => | ||
val uid = row.getAs[UniqueSampleId](DataConst.ID) | ||
val features = row.getAs[SparkVector](featureShardId) | ||
val label = row.getAs[Double](inputColumnsNames(InputColumnsNames.RESPONSE)) | ||
|
||
(uid, LabeledPoint(label, VectorUtils.mlToBreeze(features))) | ||
} | ||
.getOrElse(optimizationProblem.runWithSampling(input)) | ||
val updatedModelBroadcast = input.sparkContext.broadcast(newModel) | ||
rdd.persist(StorageLevel.MEMORY_ONLY) | ||
|
||
new FixedEffectModel(updatedModelBroadcast, featureShardId) | ||
} | ||
val (glm, stateTracker) = initialFixedEffectModelOpt | ||
.map ( initialFixedEffectModel => | ||
optimizationProblem.runWithSampling(rdd, initialFixedEffectModel.model) | ||
) | ||
.getOrElse(optimizationProblem.runWithSampling(rdd)) | ||
|
||
/** | ||
* Score a [[FixedEffectDataset]] using a given [[FixedEffectModel]]. | ||
* | ||
* @note The score is the dot product of the model coefficients with the feature values (i.e., it does not go | ||
* through a non-linear link function). | ||
* @param fixedEffectDataset The dataset to score | ||
* @param fixedEffectModel The model used to score the dataset | ||
* @return The computed scores | ||
*/ | ||
protected[algorithm] def score( | ||
fixedEffectDataset: FixedEffectDataset, | ||
fixedEffectModel: FixedEffectModel): CoordinateDataScores = { | ||
rdd.unpersist() | ||
|
||
val modelBroadcast = fixedEffectModel.modelBroadcast | ||
val scores = fixedEffectDataset.labeledPoints.mapValues { case LabeledPoint(_, features, _, _) => | ||
modelBroadcast.value.computeScore(features) | ||
} | ||
(FixedEffectModel(glm, featureShardId), new FixedEffectOptimizationTracker(stateTracker)) | ||
} | ||
|
||
new CoordinateDataScores(scores) | ||
def updateOffset( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Miss comments for |
||
dataset: DataFrame, fixedEffectModel: FixedEffectModel, featureShardId: FeatureShardId, | ||
inputColumnsNames: InputColumnsNames): DataFrame = { | ||
|
||
require( | ||
featureShardId == fixedEffectModel.featureShardId, | ||
s"Fixed effect coordinate featureShardId ${featureShardId} != model.featureShardId ${ | ||
fixedEffectModel | ||
.featureShardId | ||
}") | ||
|
||
val offset = inputColumnsNames(InputColumnsNames.OFFSET) | ||
val hasOffsetField = ApiUtils.hasColumn(dataset, offset) | ||
val hasCoordinateScoreField = ApiUtils.hasColumn(dataset, SCORE_FIELD) | ||
|
||
if (hasOffsetField && hasCoordinateScoreField) { | ||
// offset = offset - old_coordinateScore + new_coordinateScore | ||
dataset.withColumn(offset, col(offset) - col(SCORE_FIELD)) | ||
fixedEffectModel.computeScore(dataset, SCORE_FIELD) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where is the new score saved? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think new score is saved in SCORE_FIELD. |
||
.withColumn(offset, col(offset) + col(SCORE_FIELD)) | ||
} else if (!hasOffsetField && !hasCoordinateScoreField) { | ||
fixedEffectModel.computeScore(dataset, SCORE_FIELD) | ||
.withColumn(offset, col(SCORE_FIELD)) | ||
} else if (hasOffsetField && !hasCoordinateScoreField) { | ||
fixedEffectModel.computeScore(dataset, SCORE_FIELD) | ||
.withColumn(offset, col(offset) + col(SCORE_FIELD)) | ||
} else { | ||
throw new UnsupportedOperationException("It shouldn't happen!") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you make the error message more explicit? |
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep "dataset" as one word.