Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataframe #451

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ object ModelTraining extends Logging {
// Initialize the list with the result from the first regularization weight
optimizationProblem.updateRegularizationWeight(currentWeight)

val glm = if (numWarmStartModels == 0) {
val (glm, stateTracker) = if (numWarmStartModels == 0) {

logger.info(s"No warm start model found; beginning training with a 0-coefficients model")

Expand All @@ -199,14 +199,14 @@ object ModelTraining extends Logging {
optimizationProblem.run(trainingData, warmStartModels(maxLambda))
}

List((currentWeight, glm, optimizationProblem.getStatesTracker))
List((currentWeight, glm, stateTracker))

case (latestWeightsModelsAndTrackers, currentWeight) =>

optimizationProblem.updateRegularizationWeight(currentWeight)

// Train the rest of the models
val glm = if (useWarmStart) {
val (glm, stateTracker) = if (useWarmStart) {
val previousModel = latestWeightsModelsAndTrackers.head._2

logger.info(s"Training model with regularization weight $currentWeight started (warm start)")
Expand All @@ -219,7 +219,7 @@ object ModelTraining extends Logging {
optimizationProblem.run(trainingData)
}

(currentWeight, glm, optimizationProblem.getStatesTracker) +: latestWeightsModelsAndTrackers
(currentWeight, glm, stateTracker) +: latestWeightsModelsAndTrackers
}

broadcastNormalizationContext.unpersist()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

import com.linkedin.photon.ml.data.{GameDatum, LabeledPoint, LocalDataset}
import com.linkedin.photon.ml.data.{GameDatum, LabeledPoint}
import com.linkedin.photon.ml.function._
import com.linkedin.photon.ml.function.glm.{HessianVectorAggregator, ValueAndGradientAggregator}
import com.linkedin.photon.ml.model.Coefficients
Expand Down Expand Up @@ -57,7 +57,6 @@ object SparkSessionConfiguration {
classOf[LabeledPoint],
classOf[LBFGS],
classOf[LinearRegressionModel],
classOf[LocalDataset],
classOf[LogisticRegressionModel],
classOf[Matrix[Double]],
classOf[NormalizationContext],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
*/
package com.linkedin.photon.ml.algorithm

import com.linkedin.photon.ml.data.{Dataset, FixedEffectDataset, RandomEffectDataset}
import com.linkedin.photon.ml.function.ObjectiveFunctionHelper.{DistributedObjectiveFunctionFactory, ObjectiveFunctionFactoryFactory, SingleNodeObjectiveFunctionFactory}
import org.apache.spark.sql.{DataFrame, SparkSession}

import com.linkedin.photon.ml.Types.{FeatureShardId, REType}
import com.linkedin.photon.ml.data.InputColumnsNames
import com.linkedin.photon.ml.function.ObjectiveFunction
import com.linkedin.photon.ml.function.ObjectiveFunctionHelper.{DistributedObjectiveFunctionFactory, ObjectiveFunctionFactoryFactory, SingleNodeObjectiveFunctionFactory}
import com.linkedin.photon.ml.model.Coefficients
import com.linkedin.photon.ml.normalization.NormalizationContext
import com.linkedin.photon.ml.optimization.DistributedOptimizationProblem
Expand All @@ -34,35 +37,40 @@ import com.linkedin.photon.ml.util.PhotonBroadcast
object CoordinateFactory {

/**
* Creates a [[Coordinate]] of the appropriate type, given the input [[Dataset]],
* Creates a [[Coordinate]] of the appropriate type, given the input data set,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep "dataset" as one word.

* [[CoordinateOptimizationConfiguration]], and [[ObjectiveFunction]].
*
* @tparam D Some type of [[Dataset]]
* @param dataset The input data to use for training
* @param featureShardId
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss parameter description.

* @param inputColumnsNames
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss parameter description.

* @param coordinateOptConfig The optimization settings for training
* @param lossFunctionFactoryConstructor A constructor for the loss function factory function
* @param glmConstructor A constructor for the type of [[GeneralizedLinearModel]] being trained
* @param downSamplerFactory A factory function for the [[DownSampler]] (if down-sampling is enabled)
* @param normalizationContext The [[NormalizationContext]]
* @param varianceComputationType Should the trained coefficient variances be computed in addition to the means?
* @param interceptIndexOpt The index of the intercept, if one is present
* @return A [[Coordinate]] for the [[Dataset]] of type [[D]]
* @param rETypeOpt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss parameter description.

* @return A [[Coordinate]] instance
*/
def build[D <: Dataset[D]](
dataset: D,
def build(
dataset: DataFrame,
featureShardId: FeatureShardId,
inputColumnsNames: InputColumnsNames,
coordinateOptConfig: CoordinateOptimizationConfiguration,
lossFunctionFactoryConstructor: ObjectiveFunctionFactoryFactory,
glmConstructor: Coefficients => GeneralizedLinearModel,
downSamplerFactory: DownSamplerFactory,
normalizationContext: NormalizationContext,
varianceComputationType: VarianceComputationType,
interceptIndexOpt: Option[Int]): Coordinate[D] = {
interceptIndexOpt: Option[Int],
rETypeOpt: Option[REType]): Coordinate = {

val lossFunctionFactory = lossFunctionFactoryConstructor(coordinateOptConfig)

(dataset, coordinateOptConfig, lossFunctionFactory) match {
(rETypeOpt, coordinateOptConfig, lossFunctionFactory) match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just do a match on (coordinateOptConfig, lossFunctionFactory)? This rETypeOpt seems to be redundant.

case (
fEDataset: FixedEffectDataset,
None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this becomes "None"? this is for fixed effect case?

fEOptConfig: FixedEffectOptimizationConfiguration,
distributedLossFunctionFactory: DistributedObjectiveFunctionFactory) =>

Expand All @@ -71,36 +79,43 @@ object CoordinateFactory {
} else {
None
}
val normalizationPhotonBroadcast = PhotonBroadcast(fEDataset.sparkContext.broadcast(normalizationContext))
val normalizationPhotonBroadcast = PhotonBroadcast(
SparkSession.builder.getOrCreate.sparkContext
.broadcast(normalizationContext))

new FixedEffectCoordinate(
fEDataset,
dataset,
DistributedOptimizationProblem(
fEOptConfig,
distributedLossFunctionFactory(interceptIndexOpt),
downSamplerOpt,
glmConstructor,
normalizationPhotonBroadcast,
varianceComputationType)).asInstanceOf[Coordinate[D]]
varianceComputationType),
featureShardId,
inputColumnsNames).asInstanceOf[Coordinate]

case (
rEDataset: RandomEffectDataset,
Some(rEType),
rEOptConfig: RandomEffectOptimizationConfiguration,
singleNodeLossFunctionFactory: SingleNodeObjectiveFunctionFactory) =>

RandomEffectCoordinate(
rEDataset,
dataset,
rEType,
featureShardId,
inputColumnsNames,
rEOptConfig,
singleNodeLossFunctionFactory,
glmConstructor,
normalizationContext,
varianceComputationType,
interceptIndexOpt).asInstanceOf[Coordinate[D]]
interceptIndexOpt).asInstanceOf[Coordinate]

case _ =>
throw new UnsupportedOperationException(
s"""Cannot build coordinate for the following input class combination:
| ${dataset.getClass.getName}
| ${rETypeOpt.getOrElse("fixed-effect")}
| ${coordinateOptConfig.getClass.getName}
| ${lossFunctionFactory.getClass.getName}""".stripMargin)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,41 @@
*/
package com.linkedin.photon.ml.algorithm

import org.apache.spark.rdd.RDD
import org.apache.spark.ml.linalg.{Vector => SparkVector}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.storage.StorageLevel

import com.linkedin.photon.ml.Types.{FeatureShardId, UniqueSampleId}
import com.linkedin.photon.ml.constants.DataConst
import com.linkedin.photon.ml.data._
import com.linkedin.photon.ml.data.scoring.CoordinateDataScores
import com.linkedin.photon.ml.function.DistributedObjectiveFunction
import com.linkedin.photon.ml.model.{DatumScoringModel, FixedEffectModel}
import com.linkedin.photon.ml.optimization.{DistributedOptimizationProblem, FixedEffectOptimizationTracker, OptimizationTracker}
import com.linkedin.photon.ml.util.{ApiUtils, VectorUtils}

/**
* The optimization problem coordinate for a fixed effect model.
*
* @tparam Objective The type of objective function used to solve the fixed effect optimization problem
* @param dataset The training dataset
* @param dataset The raw training data
* @param optimizationProblem The fixed effect optimization problem
* @param inputColumnsNames
*/
protected[ml] class FixedEffectCoordinate[Objective <: DistributedObjectiveFunction](
override protected val dataset: FixedEffectDataset,
optimizationProblem: DistributedOptimizationProblem[Objective])
extends Coordinate[FixedEffectDataset](dataset) {
var dataset: DataFrame,
optimizationProblem: DistributedOptimizationProblem[Objective],
featureShardId: FeatureShardId,
inputColumnsNames: InputColumnsNames)
extends Coordinate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't what I was picturing when writing the design document - I was thinking of something more like generateDataset in the proof-of-concept tests:

  • CoordinateDescent calls train in FixedEffectCoordinate with training DataFrame
  • train calls generateDataset
  • generateDataset drops unnecessary columns and trains a FixedEffectModel
  • CoordinateDescent calls score in FixedEffectCoordinate with training DataFrame and FixedEffectModel
  • score returns a new DataFrame with a scores column
  • CoordinateDescent calls train in RandomEffectCoordinate with the scored DataFrame
  • train calls generateDataset
  • generateDataset merges the offset column with the scores column, then drops unnecessary columns and trains RandomEffectModel
  • CoordinateDescent calls score in RandomEffectCoordinate with the scored DataFrame and RandomEffectModel
  • score returns a new DataFrame with another scores column
  • etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you described is exactly what is implemented in the comments, but I put the corresponding code logic in different methods (instead of the one you suggested).

See CoordinateDescent line 192-208:

  logger.debug(s"Updating coordinate of class ${coordinate.getClass}")

 // compute scores using the previous coordinate model and update offsets
  prevModelOpt.map(model => coordinate.updateOffset(model))
 // Train a new model
  val (model, tracker) = initialModelOpt.map(
    initialModel => Timed(s"Train new model using existing model as starting point") {
      coordinate.trainModel(initialModel)
    }).getOrElse(
    Timed(s"Train new model") {
      coordinate.trainModel()
    })
  // Log summary
  logOptimizationSummary(logger, coordinateId, model, tracker)


/**
* Update the coordinate with a new dataset.
*
* @param dataset The updated dataset
* @return A new coordinate with the updated dataset
*/
override protected[algorithm] def updateCoordinateWithDataset(
dataset: FixedEffectDataset): FixedEffectCoordinate[Objective] =
new FixedEffectCoordinate[Objective](dataset, optimizationProblem)

/**
* Compute an optimized model (i.e. run the coordinate optimizer) for the current dataset.
*
* @return A (updated model, optimization state tracking information) tuple
*/
override protected[algorithm] def trainModel(): (DatumScoringModel, OptimizationTracker) = {

val updatedFixedEffectModel = FixedEffectCoordinate.trainModel(
dataset.labeledPoints,
optimizationProblem,
dataset.featureShardId,
None)
val optimizationTracker = new FixedEffectOptimizationTracker(optimizationProblem.getStatesTracker)

(updatedFixedEffectModel, optimizationTracker)
override protected[algorithm] def updateOffset(model: DatumScoringModel) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments are missing for updateOffset.

model match {
case fixedEffectModel: FixedEffectModel =>
dataset = FixedEffectCoordinate.updateOffset(dataset, fixedEffectModel, featureShardId, inputColumnsNames)
case _ =>
throw new UnsupportedOperationException(s"Unsupported model type: ${model.modelType}")
}
}

/**
Expand All @@ -72,83 +61,100 @@ protected[ml] class FixedEffectCoordinate[Objective <: DistributedObjectiveFunct
override protected[algorithm] def trainModel(model: DatumScoringModel): (DatumScoringModel, OptimizationTracker) =
model match {
case fixedEffectModel: FixedEffectModel =>
val updatedFixedEffectModel = FixedEffectCoordinate.trainModel(
dataset.labeledPoints,
FixedEffectCoordinate.trainModel(
dataset,
optimizationProblem,
dataset.featureShardId,
featureShardId,
inputColumnsNames,
Some(fixedEffectModel))
val optimizationTracker = new FixedEffectOptimizationTracker(optimizationProblem.getStatesTracker)

(updatedFixedEffectModel, optimizationTracker)

case _ =>
throw new UnsupportedOperationException(
s"Training model of type ${model.getClass} in ${this.getClass} is not supported")
}


/**
* Compute scores for the coordinate dataset using the given model.
* Compute an optimized model (i.e. run the coordinate optimizer) for the current dataset.
*
* @param model The input model
* @return The dataset scores
* @return A (updated model, optimization state tracking information) tuple
*/
override protected[algorithm] def score(model: DatumScoringModel): CoordinateDataScores = model match {
override protected[algorithm] def trainModel(): (DatumScoringModel, OptimizationTracker) =
FixedEffectCoordinate.trainModel(dataset, optimizationProblem, featureShardId, inputColumnsNames, None)

case fixedEffectModel: FixedEffectModel =>
FixedEffectCoordinate.score(dataset, fixedEffectModel)

case _ =>
throw new UnsupportedOperationException(
s"Scoring with model of type ${model.getClass} in ${this.getClass} is not supported")
}
}

object FixedEffectCoordinate {

def SCORE_FIELD = "fixed_score"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to call it fixed_effect_score.


/**
* Train a new [[FixedEffectModel]] (i.e. run model optimization).
*
* @param input The training dataset
* @param dataset The training dataset
* @param optimizationProblem The optimization problem
* @param featureShardId The ID of the feature shard for the training data
* @param initialFixedEffectModelOpt An optional existing [[FixedEffectModel]] to use as a starting point for
* optimization
* @return A new [[FixedEffectModel]]
*/
private def trainModel[Function <: DistributedObjectiveFunction](
input: RDD[(UniqueSampleId, LabeledPoint)],
optimizationProblem: DistributedOptimizationProblem[Function],
featureShardId: FeatureShardId,
initialFixedEffectModelOpt: Option[FixedEffectModel]): FixedEffectModel = {

val newModel = initialFixedEffectModelOpt
.map { initialFixedEffectModel =>
optimizationProblem.runWithSampling(input, initialFixedEffectModel.model)
dataset: DataFrame,
optimizationProblem: DistributedOptimizationProblem[Function],
featureShardId: FeatureShardId,
inputColumnsNames: InputColumnsNames,
initialFixedEffectModelOpt: Option[FixedEffectModel]): (FixedEffectModel, OptimizationTracker) = {

val rdd = dataset
.rdd
.map { row =>
val uid = row.getAs[UniqueSampleId](DataConst.ID)
val features = row.getAs[SparkVector](featureShardId)
val label = row.getAs[Double](inputColumnsNames(InputColumnsNames.RESPONSE))

(uid, LabeledPoint(label, VectorUtils.mlToBreeze(features)))
}
.getOrElse(optimizationProblem.runWithSampling(input))
val updatedModelBroadcast = input.sparkContext.broadcast(newModel)
rdd.persist(StorageLevel.MEMORY_ONLY)

new FixedEffectModel(updatedModelBroadcast, featureShardId)
}
val (glm, stateTracker) = initialFixedEffectModelOpt
.map ( initialFixedEffectModel =>
optimizationProblem.runWithSampling(rdd, initialFixedEffectModel.model)
)
.getOrElse(optimizationProblem.runWithSampling(rdd))

/**
* Score a [[FixedEffectDataset]] using a given [[FixedEffectModel]].
*
* @note The score is the dot product of the model coefficients with the feature values (i.e., it does not go
* through a non-linear link function).
* @param fixedEffectDataset The dataset to score
* @param fixedEffectModel The model used to score the dataset
* @return The computed scores
*/
protected[algorithm] def score(
fixedEffectDataset: FixedEffectDataset,
fixedEffectModel: FixedEffectModel): CoordinateDataScores = {
rdd.unpersist()

val modelBroadcast = fixedEffectModel.modelBroadcast
val scores = fixedEffectDataset.labeledPoints.mapValues { case LabeledPoint(_, features, _, _) =>
modelBroadcast.value.computeScore(features)
}
(FixedEffectModel(glm, featureShardId), new FixedEffectOptimizationTracker(stateTracker))
}

new CoordinateDataScores(scores)
def updateOffset(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss comments for updateOffset.

dataset: DataFrame, fixedEffectModel: FixedEffectModel, featureShardId: FeatureShardId,
inputColumnsNames: InputColumnsNames): DataFrame = {

require(
featureShardId == fixedEffectModel.featureShardId,
s"Fixed effect coordinate featureShardId ${featureShardId} != model.featureShardId ${
fixedEffectModel
.featureShardId
}")

val offset = inputColumnsNames(InputColumnsNames.OFFSET)
val hasOffsetField = ApiUtils.hasColumn(dataset, offset)
val hasCoordinateScoreField = ApiUtils.hasColumn(dataset, SCORE_FIELD)

if (hasOffsetField && hasCoordinateScoreField) {
// offset = offset - old_coordinateScore + new_coordinateScore
dataset.withColumn(offset, col(offset) - col(SCORE_FIELD))
fixedEffectModel.computeScore(dataset, SCORE_FIELD)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the new score saved?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think new score is saved in SCORE_FIELD.

.withColumn(offset, col(offset) + col(SCORE_FIELD))
} else if (!hasOffsetField && !hasCoordinateScoreField) {
fixedEffectModel.computeScore(dataset, SCORE_FIELD)
.withColumn(offset, col(SCORE_FIELD))
} else if (hasOffsetField && !hasCoordinateScoreField) {
fixedEffectModel.computeScore(dataset, SCORE_FIELD)
.withColumn(offset, col(offset) + col(SCORE_FIELD))
} else {
throw new UnsupportedOperationException("It shouldn't happen!")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make the error message more explicit?

}
}
}
Loading