Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataframe #451

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
* License for the specific language governing permissions and limitations
* under the License.
*/

lguo marked this conversation as resolved.
Show resolved Hide resolved
package com.linkedin.photon.ml
lguo marked this conversation as resolved.
Show resolved Hide resolved

import org.joda.time.DateTimeZone

import com.linkedin.photon.ml.constants.DataConst
import com.linkedin.photon.ml.util.Utils

/**
Expand Down Expand Up @@ -45,4 +46,6 @@ object Constants {
* Default time zone for relative date calculations
*/
val DEFAULT_TIME_ZONE = DateTimeZone.UTC

val UNIQUE_SAMPLE_ID = DataConst.ID
lguo marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ object ModelTraining extends Logging {
// Initialize the list with the result from the first regularization weight
optimizationProblem.updateRegularizationWeight(currentWeight)

val glm = if (numWarmStartModels == 0) {
val (glm, stateTracker) = if (numWarmStartModels == 0) {

logger.info(s"No warm start model found; beginning training with a 0-coefficients model")

Expand All @@ -199,14 +199,14 @@ object ModelTraining extends Logging {
optimizationProblem.run(trainingData, warmStartModels(maxLambda))
}

List((currentWeight, glm, optimizationProblem.getStatesTracker))
List((currentWeight, glm, stateTracker))

case (latestWeightsModelsAndTrackers, currentWeight) =>

optimizationProblem.updateRegularizationWeight(currentWeight)

// Train the rest of the models
val glm = if (useWarmStart) {
val (glm, stateTracker) = if (useWarmStart) {
val previousModel = latestWeightsModelsAndTrackers.head._2

logger.info(s"Training model with regularization weight $currentWeight started (warm start)")
Expand All @@ -219,7 +219,7 @@ object ModelTraining extends Logging {
optimizationProblem.run(trainingData)
}

(currentWeight, glm, optimizationProblem.getStatesTracker) +: latestWeightsModelsAndTrackers
(currentWeight, glm, stateTracker) +: latestWeightsModelsAndTrackers
}

broadcastNormalizationContext.unpersist()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
*/
package com.linkedin.photon.ml.algorithm

import com.linkedin.photon.ml.data.{Dataset, FixedEffectDataset, RandomEffectDataset}
import com.linkedin.photon.ml.function.ObjectiveFunctionHelper.{DistributedObjectiveFunctionFactory, ObjectiveFunctionFactoryFactory, SingleNodeObjectiveFunctionFactory}
import org.apache.spark.sql.{DataFrame, SparkSession}

import com.linkedin.photon.ml.Types.{FeatureShardId, REType}
import com.linkedin.photon.ml.data.InputColumnsNames
import com.linkedin.photon.ml.function.ObjectiveFunction
import com.linkedin.photon.ml.function.ObjectiveFunctionHelper.{DistributedObjectiveFunctionFactory, ObjectiveFunctionFactoryFactory, SingleNodeObjectiveFunctionFactory}
import com.linkedin.photon.ml.model.Coefficients
import com.linkedin.photon.ml.normalization.NormalizationContext
import com.linkedin.photon.ml.optimization.DistributedOptimizationProblem
Expand All @@ -34,35 +37,40 @@ import com.linkedin.photon.ml.util.PhotonBroadcast
object CoordinateFactory {

/**
* Creates a [[Coordinate]] of the appropriate type, given the input [[Dataset]],
* Creates a [[Coordinate]] of the appropriate type, given the input data set,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep "dataset" as one word.

* [[CoordinateOptimizationConfiguration]], and [[ObjectiveFunction]].
*
* @tparam D Some type of [[Dataset]]
* @param dataset The input data to use for training
* @param featureShardId
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss parameter description.

* @param inputColumnsNames
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss parameter description.

* @param coordinateOptConfig The optimization settings for training
* @param lossFunctionFactoryConstructor A constructor for the loss function factory function
* @param glmConstructor A constructor for the type of [[GeneralizedLinearModel]] being trained
* @param downSamplerFactory A factory function for the [[DownSampler]] (if down-sampling is enabled)
* @param normalizationContext The [[NormalizationContext]]
* @param varianceComputationType Should the trained coefficient variances be computed in addition to the means?
* @param interceptIndexOpt The index of the intercept, if one is present
* @return A [[Coordinate]] for the [[Dataset]] of type [[D]]
* @param rETypeOpt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss parameter description.

* @return A [[Coordinate]] instance
*/
def build[D <: Dataset[D]](
dataset: D,
def build(
dataset: DataFrame,
featureShardId: FeatureShardId,
inputColumnsNames: InputColumnsNames,
coordinateOptConfig: CoordinateOptimizationConfiguration,
lossFunctionFactoryConstructor: ObjectiveFunctionFactoryFactory,
glmConstructor: Coefficients => GeneralizedLinearModel,
downSamplerFactory: DownSamplerFactory,
normalizationContext: NormalizationContext,
varianceComputationType: VarianceComputationType,
interceptIndexOpt: Option[Int]): Coordinate[D] = {
interceptIndexOpt: Option[Int],
rETypeOpt: Option[REType]): Coordinate = {

val lossFunctionFactory = lossFunctionFactoryConstructor(coordinateOptConfig)

(dataset, coordinateOptConfig, lossFunctionFactory) match {
(rETypeOpt, coordinateOptConfig, lossFunctionFactory) match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just do a match on (coordinateOptConfig, lossFunctionFactory)? This rETypeOpt seems to be redundant.

case (
fEDataset: FixedEffectDataset,
None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this becomes "None"? this is for fixed effect case?

fEOptConfig: FixedEffectOptimizationConfiguration,
distributedLossFunctionFactory: DistributedObjectiveFunctionFactory) =>

Expand All @@ -71,36 +79,43 @@ object CoordinateFactory {
} else {
None
}
val normalizationPhotonBroadcast = PhotonBroadcast(fEDataset.sparkContext.broadcast(normalizationContext))
val normalizationPhotonBroadcast = PhotonBroadcast(
SparkSession.builder.getOrCreate.sparkContext
.broadcast(normalizationContext))

new FixedEffectCoordinate(
fEDataset,
dataset,
DistributedOptimizationProblem(
fEOptConfig,
distributedLossFunctionFactory(interceptIndexOpt),
downSamplerOpt,
glmConstructor,
normalizationPhotonBroadcast,
varianceComputationType)).asInstanceOf[Coordinate[D]]
varianceComputationType),
featureShardId,
inputColumnsNames).asInstanceOf[Coordinate]

case (
rEDataset: RandomEffectDataset,
Some(rEType),
rEOptConfig: RandomEffectOptimizationConfiguration,
singleNodeLossFunctionFactory: SingleNodeObjectiveFunctionFactory) =>

RandomEffectCoordinate(
rEDataset,
dataset,
rEType,
featureShardId,
inputColumnsNames,
rEOptConfig,
singleNodeLossFunctionFactory,
glmConstructor,
normalizationContext,
varianceComputationType,
interceptIndexOpt).asInstanceOf[Coordinate[D]]
interceptIndexOpt).asInstanceOf[Coordinate]

case _ =>
throw new UnsupportedOperationException(
s"""Cannot build coordinate for the following input class combination:
| ${dataset.getClass.getName}
| ${rETypeOpt.getOrElse("fixed-effect")}
| ${coordinateOptConfig.getClass.getName}
| ${lossFunctionFactory.getClass.getName}""".stripMargin)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,51 @@
*/
package com.linkedin.photon.ml.algorithm

import org.apache.spark.rdd.RDD

import com.linkedin.photon.ml.Types.{FeatureShardId, UniqueSampleId}
import org.apache.spark.ml.linalg.{Vector => SparkVector}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel

import com.linkedin.photon.ml.Constants
import com.linkedin.photon.ml.Types.FeatureShardId
import com.linkedin.photon.ml.constants.DataConst
import com.linkedin.photon.ml.data._
import com.linkedin.photon.ml.data.scoring.CoordinateDataScores
import com.linkedin.photon.ml.function.DistributedObjectiveFunction
import com.linkedin.photon.ml.model.{DatumScoringModel, FixedEffectModel}
import com.linkedin.photon.ml.optimization.{DistributedOptimizationProblem, FixedEffectOptimizationTracker, OptimizationTracker}
import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel
import com.linkedin.photon.ml.util.VectorUtils

/**
* The optimization problem coordinate for a fixed effect model.
*
* @tparam Objective The type of objective function used to solve the fixed effect optimization problem
* @param dataset The training dataset
* @param rawData The raw training data
* @param optimizationProblem The fixed effect optimization problem
* @param inputColumnsNames
*/
protected[ml] class FixedEffectCoordinate[Objective <: DistributedObjectiveFunction](
override protected val dataset: FixedEffectDataset,
optimizationProblem: DistributedOptimizationProblem[Objective])
extends Coordinate[FixedEffectDataset](dataset) {

/**
* Update the coordinate with a new dataset.
*
* @param dataset The updated dataset
* @return A new coordinate with the updated dataset
*/
override protected[algorithm] def updateCoordinateWithDataset(
dataset: FixedEffectDataset): FixedEffectCoordinate[Objective] =
new FixedEffectCoordinate[Objective](dataset, optimizationProblem)

/**
* Compute an optimized model (i.e. run the coordinate optimizer) for the current dataset.
*
* @return A (updated model, optimization state tracking information) tuple
*/
override protected[algorithm] def trainModel(): (DatumScoringModel, OptimizationTracker) = {

val updatedFixedEffectModel = FixedEffectCoordinate.trainModel(
dataset.labeledPoints,
optimizationProblem,
dataset.featureShardId,
None)
val optimizationTracker = new FixedEffectOptimizationTracker(optimizationProblem.getStatesTracker)

(updatedFixedEffectModel, optimizationTracker)
rawData: DataFrame,
optimizationProblem: DistributedOptimizationProblem[Objective],
featureShardId: FeatureShardId,
inputColumnsNames: InputColumnsNames)
extends Coordinate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't what I was picturing when writing the design document - I was thinking of something more like generateDataset in the proof-of-concept tests:

  • CoordinateDescent calls train in FixedEffectCoordinate with training DataFrame
  • train calls generateDataset
  • generateDataset drops unnecessary columns and trains a FixedEffectModel
  • CoordinateDescent calls score in FixedEffectCoordinate with training DataFrame and FixedEffectModel
  • score returns a new DataFrame with a scores column
  • CoordinateDescent calls train in RandomEffectCoordinate with the scored DataFrame
  • train calls generateDataset
  • generateDataset merges the offset column with the scores column, then drops unnecessary columns and trains RandomEffectModel
  • CoordinateDescent calls score in RandomEffectCoordinate with the scored DataFrame and RandomEffectModel
  • score returns a new DataFrame with another scores column
  • etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you described is exactly what is implemented in the comments, but I put the corresponding code logic in different methods (instead of the one you suggested).

See CoordinateDescent line 192-208:

  logger.debug(s"Updating coordinate of class ${coordinate.getClass}")

 // compute scores using the previous coordinate model and update offsets
  prevModelOpt.map(model => coordinate.updateOffset(model))
 // Train a new model
  val (model, tracker) = initialModelOpt.map(
    initialModel => Timed(s"Train new model using existing model as starting point") {
      coordinate.trainModel(initialModel)
    }).getOrElse(
    Timed(s"Train new model") {
      coordinate.trainModel()
    })
  // Log summary
  logOptimizationSummary(logger, coordinateId, model, tracker)


var dataset: DataFrame =
rawData
.select(Constants.UNIQUE_SAMPLE_ID, featureShardId, inputColumnsNames(InputColumnsNames.RESPONSE))
.withColumn(inputColumnsNames(InputColumnsNames.OFFSET), lit(0.0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither FixedEffectCoordinate nor RandomEffectCoordinate should have references like this one to the raw training data - it should be passed in as input and the column drops can occur somewhere in the train/score functions IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I merged the functions in FixedEffectDataset (same for RandomEffectDataset) into coordinate to reduce the code complexity.



override protected def updateDataset(scores: CoordinateDataScores) = {
dataset = scores.scores
.join(rawData, Constants.UNIQUE_SAMPLE_ID)
.withColumn(inputColumnsNames(InputColumnsNames.OFFSET),
col(inputColumnsNames(InputColumnsNames.OFFSET)) + col(DataConst.SCORE))
}


/**
* Compute an optimized model (i.e. run the coordinate optimizer) for the current dataset using an existing model as
* a starting point.
Expand All @@ -72,20 +69,18 @@ protected[ml] class FixedEffectCoordinate[Objective <: DistributedObjectiveFunct
override protected[algorithm] def trainModel(model: DatumScoringModel): (DatumScoringModel, OptimizationTracker) =
model match {
case fixedEffectModel: FixedEffectModel =>
val updatedFixedEffectModel = FixedEffectCoordinate.trainModel(
dataset.labeledPoints,
FixedEffectCoordinate.trainModel(
dataset,
optimizationProblem,
dataset.featureShardId,
featureShardId,
Some(fixedEffectModel))
val optimizationTracker = new FixedEffectOptimizationTracker(optimizationProblem.getStatesTracker)

(updatedFixedEffectModel, optimizationTracker)

case _ =>
throw new UnsupportedOperationException(
s"Training model of type ${model.getClass} in ${this.getClass} is not supported")
}


/**
* Compute scores for the coordinate dataset using the given model.
*
Expand All @@ -95,60 +90,76 @@ protected[ml] class FixedEffectCoordinate[Objective <: DistributedObjectiveFunct
override protected[algorithm] def score(model: DatumScoringModel): CoordinateDataScores = model match {

case fixedEffectModel: FixedEffectModel =>
FixedEffectCoordinate.score(dataset, fixedEffectModel)
FixedEffectCoordinate.score(dataset, fixedEffectModel, featureShardId)

case _ =>
throw new UnsupportedOperationException(
s"Scoring with model of type ${model.getClass} in ${this.getClass} is not supported")
}


/**
* Compute an optimized model (i.e. run the coordinate optimizer) for the current dataset.
*
* @return A (updated model, optimization state tracking information) tuple
*/
override protected def trainModel(): (DatumScoringModel, OptimizationTracker) =
FixedEffectCoordinate.trainModel(dataset, optimizationProblem, featureShardId, None)
}

object FixedEffectCoordinate {

/**
* Train a new [[FixedEffectModel]] (i.e. run model optimization).
*
* @param input The training dataset
* @param dataset The training dataset
* @param optimizationProblem The optimization problem
* @param featureShardId The ID of the feature shard for the training data
* @param initialFixedEffectModelOpt An optional existing [[FixedEffectModel]] to use as a starting point for
* optimization
* @return A new [[FixedEffectModel]]
*/
private def trainModel[Function <: DistributedObjectiveFunction](
input: RDD[(UniqueSampleId, LabeledPoint)],
optimizationProblem: DistributedOptimizationProblem[Function],
featureShardId: FeatureShardId,
initialFixedEffectModelOpt: Option[FixedEffectModel]): FixedEffectModel = {
dataset: DataFrame,
optimizationProblem: DistributedOptimizationProblem[Function],
featureShardId: FeatureShardId,
initialFixedEffectModelOpt: Option[FixedEffectModel]): (FixedEffectModel, OptimizationTracker) = {

val rdd = dataset
.rdd
.map { row =>
val features = row.getAs[SparkVector](0)
val label = row.getDouble(1)

LabeledPoint(label, VectorUtils.mlToBreeze(features))
}
rdd.persist(StorageLevel.MEMORY_ONLY)

val newModel = initialFixedEffectModelOpt
val (glm, stateTracker) = initialFixedEffectModelOpt
.map { initialFixedEffectModel =>
optimizationProblem.runWithSampling(input, initialFixedEffectModel.model)
optimizationProblem.runWithSampling(rdd, initialFixedEffectModel.model)
}
.getOrElse(optimizationProblem.runWithSampling(input))
val updatedModelBroadcast = input.sparkContext.broadcast(newModel)
.getOrElse(optimizationProblem.runWithSampling(rdd))

new FixedEffectModel(updatedModelBroadcast, featureShardId)
rdd.unpersist()

(new FixedEffectModel(SparkSession.builder.getOrCreate.sparkContext.broadcast(glm), featureShardId),
lguo marked this conversation as resolved.
Show resolved Hide resolved
new FixedEffectOptimizationTracker(stateTracker))
}

/**
* Score a [[FixedEffectDataset]] using a given [[FixedEffectModel]].
* Compute scores given a training dataset and a fixed effect model
*
* @note The score is the dot product of the model coefficients with the feature values (i.e., it does not go
* through a non-linear link function).
* @param fixedEffectDataset The dataset to score
* @param fixedEffectModel The model used to score the dataset
* @param dataset The dataset to score
* @param fixedEffectModel The model used to score the dataset
* @param featureShardId The ID of the feature shard for the training data
* @return The computed scores
*/
protected[algorithm] def score(
fixedEffectDataset: FixedEffectDataset,
fixedEffectModel: FixedEffectModel): CoordinateDataScores = {

val modelBroadcast = fixedEffectModel.modelBroadcast
val scores = fixedEffectDataset.labeledPoints.mapValues { case LabeledPoint(_, features, _, _) =>
modelBroadcast.value.computeScore(features)
}

def score(dataset: DataFrame, fixedEffectModel: FixedEffectModel, featureShardId: FeatureShardId): CoordinateDataScores = {
val cofs = VectorUtils.breezeToMl(fixedEffectModel.model.coefficients.means)
val scores = dataset
.withColumn(DataConst.SCORE, GeneralizedLinearModel.scoreUdf(lit(cofs), col(featureShardId)))
.select(Constants.UNIQUE_SAMPLE_ID, DataConst.SCORE)
new CoordinateDataScores(scores)
}
}
Loading