Skip to content

Commit

Permalink
Major aggregator documentation refactor and minor aggregator refactor
Browse files Browse the repository at this point in the history
- Updated all aggregator documentation
- Updated BroadcastWrapper documentation
- Removed Coefficients type from ObjectiveFunction
- Moved coefficient and vector broadcast logic into aggregators
- Added checks on input to NormalizationContext
  • Loading branch information
Alex Shelkovnykov committed Nov 22, 2019
1 parent e76a35a commit e629b1a
Show file tree
Hide file tree
Showing 27 changed files with 850 additions and 654 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ object ModelTraining extends Logging {
val optimizerConfig = OptimizerConfig(optimizerType, maxNumIter, tolerance, constraintMap)
val optimizationConfig = FixedEffectOptimizationConfiguration(optimizerConfig, regularizationContext)
// Initialize the broadcast normalization context
val broadcastNormalizationContext = PhotonBroadcast(trainingData.sparkContext.broadcast(normalizationContext))
val broadcastNormalizationContext = trainingData.sparkContext.broadcast(normalizationContext)
val wrappedBroadcastNormalizationContext = PhotonBroadcast(broadcastNormalizationContext)

// Construct the generalized linear optimization problem
val (glmConstructor, objectiveFunction) = taskType match {
Expand Down Expand Up @@ -160,7 +161,7 @@ object ModelTraining extends Logging {
objectiveFunction,
samplerOption = None,
glmConstructor,
broadcastNormalizationContext,
wrappedBroadcastNormalizationContext,
VarianceComputationType.NONE)

// Sort the regularization weights from high to low, which would potentially speed up the overall convergence time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
*/
package com.linkedin.photon.ml.function

import breeze.linalg.Vector
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import com.linkedin.photon.ml.data.LabeledPoint

Expand All @@ -28,49 +24,11 @@ import com.linkedin.photon.ml.data.LabeledPoint
*
* @param treeAggregateDepth The depth used by treeAggregate. Depth 1 indicates normal linear aggregate. Using
* depth > 1 can reduce memory consumption in the Driver and may also speed up the
* aggregation. It is experimental currently because treeAggregate is unstable in Spark
* versions 1.4 and 1.5.
* aggregation.
*/
abstract class DistributedObjectiveFunction(treeAggregateDepth: Int) extends ObjectiveFunction {

type Data = RDD[LabeledPoint]
type Coefficients = Broadcast[Vector[Double]]

require(treeAggregateDepth > 0, s"Tree aggregate depth must be greater than 0: $treeAggregateDepth")

private lazy val sc: SparkContext = SparkSession.builder().getOrCreate().sparkContext

/**
* Compute the size of the domain for the given input data (i.e. the number of features, including the intercept if
* there is one).
*
* @param input The given data for which to compute the domain dimension
* @return The domain dimension
*/
override protected[ml] def domainDimension(input: Data): Int = input.first.features.size

/**
* DistributedOptimizationProblems compute objective value over an RDD distributed across several tasks over one or
* more executors. Thus, DistributedObjectiveFunction expects broadcasted coefficients to reduce network overhead.
*
* @param coefficients A coefficients Vector to convert
* @return A broadcast of the given coefficients Vector
*/
override protected[ml] def convertFromVector(coefficients: Vector[Double]): Coefficients =
sc.broadcast(coefficients)

/**
* DistributedObjectiveFunctions handle broadcasted Vectors. Fetch the underlying Vector.
*
* @param coefficients A broadcasted coefficients vector
* @return The underlying coefficients Vector
*/
override protected[ml] def convertToVector(coefficients: Coefficients): Vector[Double] = coefficients.value

/**
* Unpersist the coefficients broadcast by convertFromVector.
*
* @param coefficients The broadcast coefficients to unpersist
*/
override protected[ml] def cleanupCoefficients(coefficients: Coefficients): Unit = coefficients.unpersist()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,13 @@
*/
package com.linkedin.photon.ml.function

import breeze.linalg.Vector

import com.linkedin.photon.ml.data.LabeledPoint

/**
* The base objective function used by SingleNodeOptimizationProblems. This function works with data locally as part of
* a single task (on a single executor).
*/
abstract class SingleNodeObjectiveFunction extends ObjectiveFunction with Serializable {
type Data = Iterable[LabeledPoint]
type Coefficients = Vector[Double]

/**
* Compute the size of the domain for the given input data (i.e. the number of features, including the intercept if
* there is one).
*
* @param input The given data for which to compute the domain dimension
* @return The domain dimension
*/
override protected[ml] def domainDimension(input: Iterable[LabeledPoint]): Int = input.head.features.size

/**
* SingleNodeOptimizationProblems compute objective value over all of the data at once as part of a single task (on
* a single executor). Thus, the SingleNodeObjectiveFunction handles Vectors directly.
*
* @param coefficients A coefficients Vector to convert
* @return The given coefficients Vector
*/
override protected[ml] def convertFromVector(coefficients: Vector[Double]): Coefficients = coefficients

/**
* SingleNodeOptimizationProblems handle Vectors directly, so the SingleNodeObjectiveFunction input is already a
* Vector.
*
* @param coefficients A Coefficients object to convert
* @return The given coefficients Vector
*/
override protected[ml] def convertToVector(coefficients: Vector[Double]): Vector[Double] = coefficients
type Data = Iterable[LabeledPoint]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package com.linkedin.photon.ml.function.glm

import breeze.linalg._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD

import com.linkedin.photon.ml.data.LabeledPoint
Expand Down Expand Up @@ -61,7 +60,7 @@ protected[ml] class DistributedGLMLossFunction private (
*/
override protected[ml] def value(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
coefficients: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Double =
calculate(input, coefficients, normalizationContext)._1

Expand All @@ -75,7 +74,7 @@ protected[ml] class DistributedGLMLossFunction private (
*/
override protected[ml] def gradient(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
coefficients: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Vector[Double] =
calculate(input, coefficients, normalizationContext)._2

Expand All @@ -90,7 +89,7 @@ protected[ml] class DistributedGLMLossFunction private (
*/
override protected[ml] def calculate(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
coefficients: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): (Double, Vector[Double]) =
ValueAndGradientAggregator.calculateValueAndGradient(
input,
Expand All @@ -99,6 +98,26 @@ protected[ml] class DistributedGLMLossFunction private (
normalizationContext,
treeAggregateDepth)

/**
* Compute the Hessian matrix over the given data for the given model coefficients.
*
* @param input The given data over which to compute the diagonal of the Hessian matrix
* @param coefficients The model coefficients used to compute the diagonal of the Hessian matrix
* @return The computed Hessian matrix
*/
override protected[ml] def hessianMatrix(input: RDD[LabeledPoint], coefficients: Vector[Double]): DenseMatrix[Double] =
HessianMatrixAggregator.calcHessianMatrix(input, coefficients, singlePointLossFunction, treeAggregateDepth)

/**
* Compute an approximation of the Hessian diagonal over the given data for the given model coefficients.
*
* @param input The given data over which to compute the diagonal of the Hessian matrix
* @param coefficients The model coefficients used to compute the diagonal of the Hessian matrix
* @return The computed diagonal of the Hessian matrix
*/
override protected[ml] def hessianDiagonal(input: RDD[LabeledPoint], coefficients: Vector[Double]): Vector[Double] =
HessianDiagonalAggregator.calcHessianDiagonal(input, coefficients, singlePointLossFunction, treeAggregateDepth)

/**
* Compute the Hessian of the function over the given data for the given model coefficients.
*
Expand All @@ -109,10 +128,10 @@ protected[ml] class DistributedGLMLossFunction private (
* @param normalizationContext The normalization context
* @return The computed Hessian multiplied by the given multiplyVector
*/
override protected[ml] def hessianVector(
override protected[ml] def hessianVector(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
multiplyVector: Broadcast[Vector[Double]],
coefficients: Vector[Double],
multiplyVector: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Vector[Double] =
HessianVectorAggregator.calcHessianVector(
input,
Expand All @@ -121,30 +140,6 @@ protected[ml] class DistributedGLMLossFunction private (
singlePointLossFunction,
normalizationContext,
treeAggregateDepth)

/**
* Compute an approximation of the Hessian diagonal over the given data for the given model coefficients.
*
* @param input The given data over which to compute the diagonal of the Hessian matrix
* @param coefficients The model coefficients used to compute the diagonal of the Hessian matrix
* @return The computed diagonal of the Hessian matrix
*/
override protected[ml] def hessianDiagonal(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]]): Vector[Double] =
HessianDiagonalAggregator.calcHessianDiagonal(input, coefficients, singlePointLossFunction, treeAggregateDepth)

/**
* Compute the Hessian matrix over the given data for the given model coefficients.
*
* @param input The given data over which to compute the diagonal of the Hessian matrix
* @param coefficients The model coefficients used to compute the diagonal of the Hessian matrix
* @return The computed Hessian matrix
*/
override protected[ml] def hessianMatrix(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]]): DenseMatrix[Double] =
HessianMatrixAggregator.calcHessianMatrix(input, coefficients, singlePointLossFunction, treeAggregateDepth)
}

object DistributedGLMLossFunction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package com.linkedin.photon.ml.function.glm

import breeze.numerics.sigmoid

import com.linkedin.photon.ml.constants.MathConst
import com.linkedin.photon.ml.util.MathUtils

Expand Down Expand Up @@ -43,16 +45,6 @@ import com.linkedin.photon.ml.util.MathUtils
*/
@SerialVersionUID(1L)
object LogisticLossFunction extends PointwiseLossFunction {
/**
* The sigmoid function:
*
* 1 / (1 + exp(-z))
*
* @param z The margin, i.e. z in l(z, y)
* @return The value
*/
private def sigmoid(z: Double): Double = 1.0 / (1.0 + math.exp(-z))


/**
* l(z, y) = - log [1 / (1 + exp(-z))] = log [1 + exp(-z)] if this is a positive sample
Expand All @@ -65,26 +57,28 @@ object LogisticLossFunction extends PointwiseLossFunction {
*
* @param margin The margin, i.e. z in l(z, y)
* @param label The label, i.e. y in l(z, y)
* @return The value and the 1st derivative
* @return The value and the 1st derivative with respect to z
*/
override def lossAndDzLoss(margin: Double, label: Double): (Double, Double) = {
override def lossAndDzLoss(margin: Double, label: Double): (Double, Double) =

if (label > MathConst.POSITIVE_RESPONSE_THRESHOLD) {
// The following is equivalent to log(1 + exp(-margin)) but more numerically stable.
(MathUtils.log1pExp(-margin), -sigmoid(-margin))
} else {
(MathUtils.log1pExp(margin), sigmoid(margin))
}
}

/**
* d^2^l/dz^2^ = sigmoid(z) * (1 - sigmoid(z))
*
* @param margin The margin, i.e. z in l(z, y)
* @param label The label, i.e. y in l(z, y)
* @return The value and the 2nd derivative with respect to z
* @return The 2nd derivative with respect to z
*/
override def DzzLoss(margin: Double, label: Double): Double = {

val s = sigmoid(margin)

s * (1 - s)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ package com.linkedin.photon.ml.function.glm
*/
@SerialVersionUID(1L)
object PoissonLossFunction extends PointwiseLossFunction {

/**
* l(z, y) = exp(z) - y * z
* dl/dz = exp(z) - y
*
* @param margin The margin, i.e. z in l(z, y)
* @param label The label, i.e. y in l(z, y)
* @return The value and the 1st derivative
* @return The value and the 1st derivative with respect to z
*/
override def lossAndDzLoss(margin: Double, label: Double): (Double, Double) = {
val prediction = math.exp(margin)
Expand All @@ -47,7 +48,7 @@ object PoissonLossFunction extends PointwiseLossFunction {
*
* @param margin The margin, i.e. z in l(z, y)
* @param label The label, i.e. y in l(z, y)
* @return The value and the 2st derivative with respect to z
* @return The 2nd derivative with respect to z
*/
override def DzzLoss(margin: Double, label: Double): Double = math.exp(margin)
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,16 @@ protected[ml] class SingleNodeGLMLossFunction private (singlePointLossFunction:
normalizationContext)

/**
* Compute the Hessian of the function over the given data for the given model coefficients.
* Compute the Hessian matrix over the given data for the given model coefficients.
*
* @param input The given data over which to compute the Hessian
* @param coefficients The model coefficients used to compute the function's hessian, multiplied by a given vector
* @param multiplyVector The given vector to be dot-multiplied with the Hessian. For example, in conjugate
* gradient method this would correspond to the gradient multiplyVector.
* @param normalizationContext The normalization context
* @return The computed Hessian multiplied by the given multiplyVector
* @param input The given data over which to compute the diagonal of the Hessian matrix
* @param coefficients The model coefficients used to compute the diagonal of the Hessian matrix
* @return The computed Hessian matrix
*/
override protected[ml] def hessianVector(
override protected[ml] def hessianMatrix(
input: Iterable[LabeledPoint],
coefficients: Vector[Double],
multiplyVector: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Vector[Double] =
HessianVectorAggregator.calcHessianVector(
input,
coefficients,
multiplyVector,
singlePointLossFunction,
normalizationContext)
coefficients: Vector[Double]): DenseMatrix[Double] =
HessianMatrixAggregator.calcHessianMatrix(input, coefficients, singlePointLossFunction)

/**
* Compute an approximation of the Hessian diagonal over the given data for the given model coefficients.
Expand All @@ -125,16 +115,26 @@ protected[ml] class SingleNodeGLMLossFunction private (singlePointLossFunction:
HessianDiagonalAggregator.calcHessianDiagonal(input, coefficients, singlePointLossFunction)

/**
* Compute the Hessian matrix over the given data for the given model coefficients.
* Compute the Hessian of the function over the given data for the given model coefficients.
*
* @param input The given data over which to compute the diagonal of the Hessian matrix
* @param coefficients The model coefficients used to compute the diagonal of the Hessian matrix
* @return The computed Hessian matrix
* @param input The given data over which to compute the Hessian
* @param coefficients The model coefficients used to compute the function's hessian, multiplied by a given vector
* @param multiplyVector The given vector to be dot-multiplied with the Hessian. For example, in conjugate
* gradient method this would correspond to the gradient multiplyVector.
* @param normalizationContext The normalization context
* @return The computed Hessian multiplied by the given multiplyVector
*/
override protected[ml] def hessianMatrix(
override protected[ml] def hessianVector(
input: Iterable[LabeledPoint],
coefficients: Vector[Double]): DenseMatrix[Double] =
HessianMatrixAggregator.calcHessianMatrix(input, coefficients, singlePointLossFunction)
coefficients: Vector[Double],
multiplyVector: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Vector[Double] =
HessianVectorAggregator.calcHessianVector(
input,
coefficients,
multiplyVector,
singlePointLossFunction,
normalizationContext)
}

object SingleNodeGLMLossFunction {
Expand Down
Loading

0 comments on commit e629b1a

Please sign in to comment.