Skip to content

Commit

Permalink
Update unit and integration tests for previous commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Shelkovnykov committed Nov 22, 2019
1 parent e629b1a commit e50baa1
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,24 +484,18 @@ class DistributedObjectiveFunctionIntegTest extends SparkTestUtils {
0
}
}
val broadcastedInitParam = sc.broadcast(initParam)
val computed = objective.gradient(trainingData, broadcastedInitParam, normalizationContextBroadcast)
val computed = objective.gradient(trainingData, initParam, normalizationContextBroadcast)

// Element-wise numerical differentiation to get the gradient
for (idx <- 0 until PROBLEM_DIMENSION) {
val before = initParam.copy
before(idx) -= DERIVATIVE_DELTA
val broadcastedBefore = sc.broadcast(before)

val after = initParam.copy
after(idx) += DERIVATIVE_DELTA
val broadcastedAfter = sc.broadcast(after)

val objBefore = objective.value(trainingData, broadcastedBefore, normalizationContextBroadcast)
val objAfter = objective.value(trainingData, broadcastedAfter, normalizationContextBroadcast)

broadcastedBefore.unpersist()
broadcastedAfter.unpersist()
val objBefore = objective.value(trainingData, before, normalizationContextBroadcast)
val objAfter = objective.value(trainingData, after, normalizationContextBroadcast)

checkDerivativeError(
"Checking Gradient",
Expand All @@ -511,11 +505,9 @@ class DistributedObjectiveFunctionIntegTest extends SparkTestUtils {
computed(idx),
GRADIENT_TOLERANCE)
}

broadcastedInitParam.unpersist()
}

normalizationContextBroadcast.unpersist()
normalizationContextBroadcast.bv.unpersist()
}

/**
Expand All @@ -542,7 +534,6 @@ class DistributedObjectiveFunctionIntegTest extends SparkTestUtils {
0
}
}
val broadcastedInitParam = sc.broadcast(initParam)

// Loop over basis vectors. This will give us H*e_i = H(:,i) (so one column of H at a time)
for (basis <- 0 until PROBLEM_DIMENSION) {
Expand All @@ -551,28 +542,22 @@ class DistributedObjectiveFunctionIntegTest extends SparkTestUtils {
Array[Double](1.0),
1,
PROBLEM_DIMENSION)
val broadcastedBasis = sc.broadcast(basisVector)
val hessianVector = objective.hessianVector(
trainingData,
broadcastedInitParam,
broadcastedBasis,
initParam,
basisVector,
normalizationContextBroadcast)

// Element-wise numerical differentiation to get the Hessian
for (idx <- 0 until PROBLEM_DIMENSION) {
val before = initParam.copy
before(idx) -= DERIVATIVE_DELTA
val broadcastedBefore = sc.broadcast(before)

val after = initParam.copy
after(idx) += DERIVATIVE_DELTA
val broadcastedAfter = sc.broadcast(after)

val gradBefore = objective.gradient(trainingData, broadcastedBefore, normalizationContextBroadcast)
val gradAfter = objective.gradient(trainingData, broadcastedAfter, normalizationContextBroadcast)

broadcastedBefore.unpersist()
broadcastedAfter.unpersist()
val gradBefore = objective.gradient(trainingData, before, normalizationContextBroadcast)
val gradAfter = objective.gradient(trainingData, after, normalizationContextBroadcast)

checkDerivativeError(
"Checking Hessian",
Expand All @@ -583,14 +568,10 @@ class DistributedObjectiveFunctionIntegTest extends SparkTestUtils {
hessianVector(idx),
HESSIAN_TOLERANCE)
}

broadcastedBasis.unpersist()
}

broadcastedInitParam.unpersist()
}

normalizationContextBroadcast.unpersist()
normalizationContextBroadcast.bv.unpersist()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DistributedGLMLossFunctionIntegTest extends SparkTestUtils {
def testValueNoRegularization(): Unit = sparkTest("testValueNoRegularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = NoRegularizationContext
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand All @@ -66,7 +66,7 @@ class DistributedGLMLossFunctionIntegTest extends SparkTestUtils {
def testValueWithL2Regularization(): Unit = sparkTest("testValueWithL2Regularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = L2RegularizationContext
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand All @@ -93,7 +93,7 @@ class DistributedGLMLossFunctionIntegTest extends SparkTestUtils {
def testValueWithElasticNetRegularization(): Unit = sparkTest("testValueWithElasticNetRegularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = ElasticNetRegularizationContext(ALPHA)
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DistributedSmoothedHingeLossFunctionIntegTest extends SparkTestUtils {
def testValueNoRegularization(): Unit = sparkTest("testValueNoRegularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = NoRegularizationContext
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand All @@ -64,7 +64,7 @@ class DistributedSmoothedHingeLossFunctionIntegTest extends SparkTestUtils {
def testValueWithL2Regularization(): Unit = sparkTest("testValueWithL2Regularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = L2RegularizationContext
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand All @@ -90,7 +90,7 @@ class DistributedSmoothedHingeLossFunctionIntegTest extends SparkTestUtils {
def testValueWithElasticNetRegularization(): Unit = sparkTest("testValueWithElasticNetRegularization") {

val labeledPoints = sc.parallelize(Array(LABELED_POINT_1, LABELED_POINT_2))
val coefficients = sc.broadcast(COEFFICIENT_VECTOR)
val coefficients = COEFFICIENT_VECTOR

val fixedEffectRegularizationContext = ElasticNetRegularizationContext(ALPHA)
val fixedEffectOptimizationConfiguration = FixedEffectOptimizationConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class NormalizationContextIntegTest extends SparkTestUtils with GameTestUtils {
}

// Train the original data with a loss function binding normalization
val zero = Vector.zeros[Double](objectiveFunction.domainDimension(heartDataRDD))
val zero = Vector.zeros[Double](heartDataRDD.first.features.length)
val (model1, objective1) = optimizerNorm.optimize(objectiveFunction, zero)(heartDataRDD)
// Train the transformed data with a normal loss function
val (model2, objective2) = optimizerNoNorm.optimize(objectiveFunction, zero)(transformedRDD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package com.linkedin.photon.ml.function

import breeze.linalg.Vector
import org.testng.annotations.{DataProvider, Test}

import com.linkedin.photon.ml.normalization.NormalizationContext
Expand Down Expand Up @@ -46,7 +47,7 @@ object DistributedObjectiveFunctionTest {

override protected[ml] def value(
input: Data,
coefficients: Coefficients,
coefficients: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Double = 0D
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package com.linkedin.photon.ml.optimization

import breeze.linalg.{DenseMatrix, Vector, sum}
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD

import com.linkedin.photon.ml.data.LabeledPoint
Expand All @@ -33,13 +32,6 @@ import com.linkedin.photon.ml.util.{BroadcastWrapper, VectorUtils}
class IntegTestObjective(sc: SparkContext, treeAggregateDepth: Int) extends ObjectiveFunction with TwiceDiffFunction {

type Data = RDD[LabeledPoint]
type Coefficients = Broadcast[Vector[Double]]

// These 4 methods are copied directly from [[DistributedObjectiveFunction]] in photon-api.
override protected[ml] def domainDimension(input: Data): Int = input.first.features.size
override protected[ml] def convertFromVector(coefficients: Vector[Double]): Coefficients = sc.broadcast(coefficients)
override protected[ml] def convertToVector(coefficients: Coefficients): Vector[Double] = coefficients.value
override protected[ml] def cleanupCoefficients(coefficients: Coefficients): Unit = coefficients.unpersist()

/**
* Compute the value of the function over the given data for the given model coefficients.
Expand All @@ -51,7 +43,7 @@ class IntegTestObjective(sc: SparkContext, treeAggregateDepth: Int) extends Obje
*/
override protected[ml] def value(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
coefficients: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Double =
calculate(input, coefficients, normalizationContext)._1

Expand All @@ -65,7 +57,7 @@ class IntegTestObjective(sc: SparkContext, treeAggregateDepth: Int) extends Obje
*/
override protected[ml] def gradient(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
coefficients: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): Vector[Double] =
calculate(input, coefficients, normalizationContext)._2

Expand All @@ -80,15 +72,15 @@ class IntegTestObjective(sc: SparkContext, treeAggregateDepth: Int) extends Obje
*/
override protected[ml] def calculate(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
coefficients: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]): (Double, Vector[Double]) = {

val initialCumGradient = VectorUtils.zeroOfSameType(coefficients.value)
val initialCumGradient = VectorUtils.zeroOfSameType(coefficients)

input.treeAggregate((0.0, initialCumGradient))(
seqOp = {
case ((loss, cumGradient), datum) =>
val v = IntegTestObjective.calculateAt(datum, coefficients.value, cumGradient)
val v = IntegTestObjective.calculateAt(datum, coefficients, cumGradient)
(loss + v, cumGradient)
},
combOp = {
Expand All @@ -109,15 +101,15 @@ class IntegTestObjective(sc: SparkContext, treeAggregateDepth: Int) extends Obje
*/
override protected[ml] def hessianVector(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]],
multiplyVector: Broadcast[Vector[Double]],
coefficients: Vector[Double],
multiplyVector: Vector[Double],
normalizationContext: BroadcastWrapper[NormalizationContext]) : Vector[Double] = {

val initialCumHessianVector = VectorUtils.zeroOfSameType(coefficients.value)
val initialCumHessianVector = VectorUtils.zeroOfSameType(coefficients)

input.treeAggregate(initialCumHessianVector)(
seqOp = (cumHessianVector, datum) => {
IntegTestObjective.hessianVectorAt(datum, coefficients.value, multiplyVector.value, cumHessianVector)
IntegTestObjective.hessianVectorAt(datum, coefficients, multiplyVector, cumHessianVector)
cumHessianVector
},
combOp = _ += _,
Expand All @@ -127,24 +119,19 @@ class IntegTestObjective(sc: SparkContext, treeAggregateDepth: Int) extends Obje
/**
* Unused, only implemented as part of TwiceDiffFunction.
*/
override protected[ml] def hessianDiagonal(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]]): Vector[Double] =
Coefficients.initializeZeroCoefficients(coefficients.value.size).means
override protected[ml] def hessianDiagonal(input: RDD[LabeledPoint], coefficients: Vector[Double]): Vector[Double] =
Coefficients.initializeZeroCoefficients(coefficients.size).means

/**
* Unused, only implemented as part of TwiceDiffFunction.
*/
override protected[ml] def hessianMatrix(
input: RDD[LabeledPoint],
coefficients: Broadcast[Vector[Double]]): DenseMatrix[Double] =
DenseMatrix.zeros[Double](coefficients.value.length, coefficients.value.length)

override protected[ml] def hessianMatrix(input: RDD[LabeledPoint], coefficients: Vector[Double]): DenseMatrix[Double] =
DenseMatrix.zeros[Double](coefficients.length, coefficients.length)
}

object IntegTestObjective {

val CENTROID = Math.PI
val CENTROID: Double = Math.PI

/**
* Compute the value and gradient at a single data point. Since the function has known minimum, the input data is
Expand All @@ -156,13 +143,14 @@ object IntegTestObjective {
* @return The value at the given data point
*/
protected def calculateAt(
dataPoint: LabeledPoint,
coefficients: Vector[Double],
cumGradient: Vector[Double]): Double = {
dataPoint: LabeledPoint,
coefficients: Vector[Double],
cumGradient: Vector[Double]): Double = {

val delta = coefficients - CENTROID
val expDeltaSq = delta.mapValues { x => Math.exp(Math.pow(x, 2.0)) }
cumGradient += expDeltaSq :* delta :* 2.0
cumGradient += expDeltaSq *:* delta * 2.0

sum(expDeltaSq) - expDeltaSq.length
}

Expand All @@ -175,14 +163,15 @@ object IntegTestObjective {
* @param cumHessianVector The cumulative Hessian vector for all points in the dataset
*/
protected def hessianVectorAt(
dataPoint: LabeledPoint,
coefficients: Vector[Double],
vector: Vector[Double],
cumHessianVector: Vector[Double]): Unit = {
dataPoint: LabeledPoint,
coefficients: Vector[Double],
vector: Vector[Double],
cumHessianVector: Vector[Double]): Unit = {

val delta = coefficients - CENTROID
val expDeltaSq = delta.mapValues { x => Math.exp(Math.pow(x, 2.0)) }
val hess = expDeltaSq :* (delta :* delta :+ 1.0) :* 4.0
cumHessianVector += hess :* vector
val hess = expDeltaSq *:* (delta *:* delta + 1.0) * 4.0

cumHessianVector += hess *:* vector
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class OptimizerIntegTest extends SparkTestUtils with Logging {
val pt = new LabeledPoint(label = 1, features, offset = 0, weight = 1)
val data = sc.parallelize(Seq(pt))
val objective: IntegTestObjective = new IntegTestObjective(sc, treeAggregateDepth = 1)
val zero = Vector.zeros[Double](objective.domainDimension(data))
val zero = Vector.zeros[Double](PROBLEM_DIMENSION)
optimizer.optimize(objective, zero)(data)
easyOptimizationStatesChecks(optimizer.getStateTracker)

Expand Down
Loading

0 comments on commit e50baa1

Please sign in to comment.