Skip to content

Commit

Permalink
Merge pull request #446 from ashelkovnykov/tron
Browse files Browse the repository at this point in the history
Major aggregator documentation refactor and minor aggregator refactor
  • Loading branch information
yunboouyang authored Jan 9, 2020
2 parents e76a35a + e50baa1 commit b6be089
Show file tree
Hide file tree
Showing 37 changed files with 905 additions and 750 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,24 +484,18 @@ class DistributedObjectiveFunctionIntegTest extends SparkTestUtils {
0
}
}
val broadcastedInitParam = sc.broadcast(initParam)
val computed = objective.gradient(trainingData, broadcastedInitParam, normalizationContextBroadcast)
val computed = objective.gradient(trainingData, initParam, normalizationContextBroadcast)

// Element-wise numerical differentiation to get the gradient
for (idx <- 0 until PROBLEM_DIMENSION) {
val before = initParam.copy
before(idx) -= DERIVATIVE_DELTA
val broadcastedBefore = sc.broadcast(before)

val after = initParam.copy
after(idx) += DERIVATIVE_DELTA
val broadcastedAfter = sc.broadcast(after)

val objBefore = objective.value(trainingData, broadcastedBefore, normalizationContextBroadcast)
val objAfter = objective.value(trainingData, broadcastedAfter, normalizationContextBroadcast)

broadcastedBefore.unpersist()
broadcastedAfter.unpersist()
val objBefore = objective.value(trainingData, before, normalizationContextBroadcast)
val objAfter = objective.value(trainingData, after, normalizationContextBroadcast)

checkDerivativeError(
"Checking Gradient",
Expand All @@ -511,11 +505,9 @@ class DistributedObjectiveFunctionIntegTest extends SparkTestUtils {
computed(idx),
GRADIENT_TOLERANCE)
}

broadcastedInitParam.unpersist()
}

normalizationContextBroadcast.unpersist()
normalizationContextBroadcast.bv.unpersist()
}

/**
Expand All @@ -542,7 +534,6 @@ class DistributedObjectiveFunctionIntegTest extends SparkTestUtils {
0
}
}
val broadcastedInitParam = sc.broadcast(initParam)

// Loop over basis vectors. This will give us H*e_i = H(:,i) (so one column of H at a time)
for (basis <- 0 until PROBLEM_DIMENSION) {
Expand All @@ -551,28 +542,22 @@ class DistributedObjectiveFunctionIntegTest extends SparkTestUtils {
Array[Double](1.0),
1,
PROBLEM_DIMENSION)
val broadcastedBasis = sc.broadcast(basisVector)
val hessianVector = objective.hessianVector(
trainingData,
broadcastedInitParam,
broadcastedBasis,
initParam,
basisVector,
normalizationContextBroadcast)

// Element-wise numerical differentiation to get the Hessian
for (idx <- 0 until PROBLEM_DIMENSION) {
val before = initParam.copy
before(idx) -= DERIVATIVE_DELTA
val broadcastedBefore = sc.broadcast(before)

val after = initParam.copy
after(idx) += DERIVATIVE_DELTA
val broadcastedAfter = sc.broadcast(after)

val gradBefore = objective.gradient(trainingData, broadcastedBefore, normalizationContextBroadcast)
val gradAfter = objective.gradient(trainingData, broadcastedAfter, normalizationContextBroadcast)

broadcastedBefore.unpersist()
broadcastedAfter.unpersist()
val gradBefore = objective.gradient(trainingData, before, normalizationContextBroadcast)
val gradAfter = objective.gradient(trainingData, after, normalizationContextBroadcast)

checkDerivativeError(
"Checking Hessian",
Expand All @@ -583,14 +568,10 @@ class DistributedObjectiveFunctionIntegTest extends SparkTestUtils {
hessianVector(idx),
HESSIAN_TOLERANCE)
}

broadcastedBasis.unpersist()
}

broadcastedInitParam.unpersist()
}

normalizationContextBroadcast.unpersist()
normalizationContextBroadcast.bv.unpersist()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DistributedGLMLossFunctionIntegTest extends SparkTestUtils {
def testValueNoRegularization(): Unit = sparkTest("testValueNoRegularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = NoRegularizationContext
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand All @@ -66,7 +66,7 @@ class DistributedGLMLossFunctionIntegTest extends SparkTestUtils {
def testValueWithL2Regularization(): Unit = sparkTest("testValueWithL2Regularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = L2RegularizationContext
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand All @@ -93,7 +93,7 @@ class DistributedGLMLossFunctionIntegTest extends SparkTestUtils {
def testValueWithElasticNetRegularization(): Unit = sparkTest("testValueWithElasticNetRegularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = ElasticNetRegularizationContext(ALPHA)
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DistributedSmoothedHingeLossFunctionIntegTest extends SparkTestUtils {
def testValueNoRegularization(): Unit = sparkTest("testValueNoRegularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = NoRegularizationContext
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand All @@ -64,7 +64,7 @@ class DistributedSmoothedHingeLossFunctionIntegTest extends SparkTestUtils {
def testValueWithL2Regularization(): Unit = sparkTest("testValueWithL2Regularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = L2RegularizationContext
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand All @@ -90,7 +90,7 @@ class DistributedSmoothedHingeLossFunctionIntegTest extends SparkTestUtils {
def testValueWithElasticNetRegularization(): Unit = sparkTest("testValueWithElasticNetRegularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = ElasticNetRegularizationContext(ALPHA)
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class NormalizationContextIntegTest extends SparkTestUtils with GameTestUtils {
}

// Train the original data with a loss function binding normalization
val zero = Vector.zeros[Double](objectiveFunction.domainDimension(heartDataRDD))
val zero = Vector.zeros[Double](heartDataRDD.first.features.length)
val (model1, objective1) = optimizerNorm.optimize(objectiveFunction, zero)(heartDataRDD)
// Train the transformed data with a normal loss function
val (model2, objective2) = optimizerNoNorm.optimize(objectiveFunction, zero)(transformedRDD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ object ModelTraining extends Logging {
val optimizerConfig = OptimizerConfig(optimizerType, maxNumIter, tolerance, constraintMap)
val optimizationConfig = FixedEffectOptimizationConfiguration(optimizerConfig, regularizationContext)
// Initialize the broadcast normalization context
val broadcastNormalizationContext = PhotonBroadcast(trainingData.sparkContext.broadcast(normalizationContext))
val broadcastNormalizationContext = trainingData.sparkContext.broadcast(normalizationContext)
val wrappedBroadcastNormalizationContext = PhotonBroadcast(broadcastNormalizationContext)

// Construct the generalized linear optimization problem
val (glmConstructor, objectiveFunction) = taskType match {
Expand Down Expand Up @@ -160,7 +161,7 @@ object ModelTraining extends Logging {
objectiveFunction,
samplerOption = None,
glmConstructor,
broadcastNormalizationContext,
wrappedBroadcastNormalizationContext,
VarianceComputationType.NONE)

// Sort the regularization weights from high to low, which would potentially speed up the overall convergence time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
*/
package com.linkedin.photon.ml.function

import breeze.linalg.Vector
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import com.linkedin.photon.ml.data.LabeledPoint

Expand All @@ -28,49 +24,11 @@ import com.linkedin.photon.ml.data.LabeledPoint
*
* @param treeAggregateDepth The depth used by treeAggregate. Depth 1 indicates normal linear aggregate. Using
* depth > 1 can reduce memory consumption in the Driver and may also speed up the
* aggregation. It is experimental currently because treeAggregate is unstable in Spark
* versions 1.4 and 1.5.
* aggregation.
*/
abstract class DistributedObjectiveFunction(treeAggregateDepth: Int) extends ObjectiveFunction {

type Data = RDD[LabeledPoint]
type Coefficients = Broadcast[Vector[Double]]

require(treeAggregateDepth > 0, s"Tree aggregate depth must be greater than 0: $treeAggregateDepth")

private lazy val sc: SparkContext = SparkSession.builder().getOrCreate().sparkContext

/**
* Compute the size of the domain for the given input data (i.e. the number of features, including the intercept if
* there is one).
*
* @param input The given data for which to compute the domain dimension
* @return The domain dimension
*/
override protected[ml] def domainDimension(input: Data): Int = input.first.features.size

/**
* DistributedOptimizationProblems compute objective value over an RDD distributed across several tasks over one or
* more executors. Thus, DistributedObjectiveFunction expects broadcasted coefficients to reduce network overhead.
*
* @param coefficients A coefficients Vector to convert
* @return A broadcast of the given coefficients Vector
*/
override protected[ml] def convertFromVector(coefficients: Vector[Double]): Coefficients =
sc.broadcast(coefficients)

/**
* DistributedObjectiveFunctions handle broadcasted Vectors. Fetch the underlying Vector.
*
* @param coefficients A broadcasted coefficients vector
* @return The underlying coefficients Vector
*/
override protected[ml] def convertToVector(coefficients: Coefficients): Vector[Double] = coefficients.value

/**
* Unpersist the coefficients broadcast by convertFromVector.
*
* @param coefficients The broadcast coefficients to unpersist
*/
override protected[ml] def cleanupCoefficients(coefficients: Coefficients): Unit = coefficients.unpersist()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,13 @@
*/
package com.linkedin.photon.ml.function

import breeze.linalg.Vector

import com.linkedin.photon.ml.data.LabeledPoint

/**
* The base objective function used by SingleNodeOptimizationProblems. This function works with data locally as part of
* a single task (on a single executor).
*/
abstract class SingleNodeObjectiveFunction extends ObjectiveFunction with Serializable {
type Data = Iterable[LabeledPoint]
type Coefficients = Vector[Double]

/**
* Compute the size of the domain for the given input data (i.e. the number of features, including the intercept if
* there is one).
*
* @param input The given data for which to compute the domain dimension
* @return The domain dimension
*/
override protected[ml] def domainDimension(input: Iterable[LabeledPoint]): Int = input.head.features.size

/**
* SingleNodeOptimizationProblems compute objective value over all of the data at once as part of a single task (on
* a single executor). Thus, the SingleNodeObjectiveFunction handles Vectors directly.
*
* @param coefficients A coefficients Vector to convert
* @return The given coefficients Vector
*/
override protected[ml] def convertFromVector(coefficients: Vector[Double]): Coefficients = coefficients

/**
* SingleNodeOptimizationProblems handle Vectors directly, so the SingleNodeObjectiveFunction input is already a
* Vector.
*
* @param coefficients A Coefficients object to convert
* @return The given coefficients Vector
*/
override protected[ml] def convertToVector(coefficients: Vector[Double]): Vector[Double] = coefficients
type Data = Iterable[LabeledPoint]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package com.linkedin.photon.ml.function.glm

import breeze.linalg._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD

import com.linkedin.photon.ml.data.LabeledPoint
Expand Down Expand Up @@ -61,7 +60,7 @@ protected[ml] class DistributedGLMLossFunction private (
*/
override protected[ml] def value(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
coefficients: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Double =
calculate(input, coefficients, normalizationContext)._1

Expand All @@ -75,7 +74,7 @@ protected[ml] class DistributedGLMLossFunction private (
*/
override protected[ml] def gradient(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
coefficients: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Vector[Double] =
calculate(input, coefficients, normalizationContext)._2

Expand All @@ -90,7 +89,7 @@ protected[ml] class DistributedGLMLossFunction private (
*/
override protected[ml] def calculate(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
coefficients: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): (Double, Vector[Double]) =
ValueAndGradientAggregator.calculateValueAndGradient(
input,
Expand All @@ -99,6 +98,26 @@ protected[ml] class DistributedGLMLossFunction private (
normalizationContext,
treeAggregateDepth)

/**
* Compute the Hessian matrix over the given data for the given model coefficients.
*
* @param input The given data over which to compute the diagonal of the Hessian matrix
* @param coefficients The model coefficients used to compute the diagonal of the Hessian matrix
* @return The computed Hessian matrix
*/
override protected[ml] def hessianMatrix(input: RDD[LabeledPoint], coefficients: Vector[Double]): DenseMatrix[Double] =
HessianMatrixAggregator.calcHessianMatrix(input, coefficients, singlePointLossFunction, treeAggregateDepth)

/**
* Compute an approximation of the Hessian diagonal over the given data for the given model coefficients.
*
* @param input The given data over which to compute the diagonal of the Hessian matrix
* @param coefficients The model coefficients used to compute the diagonal of the Hessian matrix
* @return The computed diagonal of the Hessian matrix
*/
override protected[ml] def hessianDiagonal(input: RDD[LabeledPoint], coefficients: Vector[Double]): Vector[Double] =
HessianDiagonalAggregator.calcHessianDiagonal(input, coefficients, singlePointLossFunction, treeAggregateDepth)

/**
* Compute the Hessian of the function over the given data for the given model coefficients.
*
Expand All @@ -109,10 +128,10 @@ protected[ml] class DistributedGLMLossFunction private (
* @param normalizationContext The normalization context
* @return The computed Hessian multiplied by the given multiplyVector
*/
override protected[ml] def hessianVector(
override protected[ml] def hessianVector(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
multiplyVector: Broadcast[Vector[Double]],
coefficients: Vector[Double],
multiplyVector: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Vector[Double] =
HessianVectorAggregator.calcHessianVector(
input,
Expand All @@ -121,30 +140,6 @@ protected[ml] class DistributedGLMLossFunction private (
singlePointLossFunction,
normalizationContext,
treeAggregateDepth)

/**
* Compute an approximation of the Hessian diagonal over the given data for the given model coefficients.
*
* @param input The given data over which to compute the diagonal of the Hessian matrix
* @param coefficients The model coefficients used to compute the diagonal of the Hessian matrix
* @return The computed diagonal of the Hessian matrix
*/
override protected[ml] def hessianDiagonal(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]]): Vector[Double] =
HessianDiagonalAggregator.calcHessianDiagonal(input, coefficients, singlePointLossFunction, treeAggregateDepth)

/**
* Compute the Hessian matrix over the given data for the given model coefficients.
*
* @param input The given data over which to compute the diagonal of the Hessian matrix
* @param coefficients The model coefficients used to compute the diagonal of the Hessian matrix
* @return The computed Hessian matrix
*/
override protected[ml] def hessianMatrix(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]]): DenseMatrix[Double] =
HessianMatrixAggregator.calcHessianMatrix(input, coefficients, singlePointLossFunction, treeAggregateDepth)
}

object DistributedGLMLossFunction {
Expand Down
Loading

0 comments on commit b6be089

Please sign in to comment.