Skip to content

Commit

Permalink
add the confidence interval computation
Browse files Browse the repository at this point in the history
  • Loading branch information
Leon Gao committed Aug 15, 2018
1 parent 46208e2 commit 93d78d3
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class RandomEffectDataSetIntegTest extends SparkTestUtils {
Some(activeDataLowerBound))
val partitioner = new RandomEffectDataSetPartitioner(sc.broadcast(partitionMap))

val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, None)
val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, None, None)
val numUniqueRandomEffects = randomEffectDataSet.activeData.keys.count()

assertEquals(numUniqueRandomEffects, expectedUniqueRandomEffects)
Expand Down Expand Up @@ -155,7 +155,7 @@ class RandomEffectDataSetIntegTest extends SparkTestUtils {
Some(activeDataLowerBound))
val partitioner = new RandomEffectDataSetPartitioner(sc.broadcast(partitionMap))

val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, Some(existingIdsRDD))
val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, Some(existingIdsRDD), None)
val numUniqueRandomEffects = randomEffectDataSet.activeData.keys.count()

assertEquals(numUniqueRandomEffects, expectedUniqueRandomEffects)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import java.util.Random

import scala.collection.{Map, Set, mutable}
import scala.reflect.ClassTag

import breeze.linalg.{SparseVector, Vector}

import com.linkedin.photon.ml.constants.MathConst
import com.linkedin.photon.ml.projector.Projector
import com.linkedin.photon.ml.stat.BasicStatisticalSummary

/**
* Local dataset implementation.
Expand Down Expand Up @@ -126,6 +126,36 @@ protected[ml] case class LocalDataSet(dataPoints: Array[(Long, LabeledPoint)]) {
}
}

/**
*
* Filter features by Ratio Confidence Interval lower bound truncation
*
* @param intervalBound The lower bound of the confidence interval we are gonna use to
* @param percentage The percentage for lookup the fractile of standard normal distribution
* @param globalFeatureShardStats The feature stats for the global population for a certain feature shard
* @return The filtered dataset
*/

def filterFeaturesByRatioCIBound(
intervalBound: Double,
percentage: Double,
globalFeatureShardStats: BasicStatisticalSummary): LocalDataSet = {
val labelAndFeatures = dataPoints.map { case (_, labeledPoint) => (labeledPoint.label, labeledPoint.features) }
val lowerBounds = LocalDataSet.computeRatioCILowerBound(labelAndFeatures, percentage, globalFeatureShardStats)
val filteredFeaturesIndexSet = lowerBounds
.toArray
.filter(_._2 > intervalBound)
.map(_._1)
.toSet

val filteredActivities = dataPoints.map { case (id, LabeledPoint(label, features, offset, weight)) =>
val filteredFeatures = LocalDataSet.filterFeaturesWithFeatureIndexSet(features, filteredFeaturesIndexSet)
(id, LabeledPoint(label, filteredFeatures, offset, weight))
}
LocalDataSet(filteredActivities)
}


/**
* Filter features by Pearson correlation score.
*
Expand Down Expand Up @@ -212,6 +242,135 @@ object LocalDataSet {
new SparseVector(filteredIndexBuilder.result(), filteredDataBuilder.result(), features.length)
}

/**
* Compute Ratio Confidence Interval lower bounds.
*
* @param randomLabelAndFeatures An array of (label, feature) tuples
* @param quartile The quartile score of standard normal distribution
* @param globalFeatureShardStats The global population feature statistics to help compute ratio
*/

protected[ml] def computeRatioCILowerBound(
randomLabelAndFeatures: Array[(Double, Vector[Double])],
quartile: Double,
globalFeatureShardStats: BasicStatisticalSummary): Map[Int, Double] = {

val dummyBoundForNonBinaryOrIntercept = 2.0
val lowerBounds = mutable.Map[Int, Double]()
var globalPopulationNumSamples = globalFeatureShardStats.count
val globalPopulationFirstOrderMeans = globalFeatureShardStats.mean.toArray
val globallFeatureNonZero = globalFeatureShardStats.numNonzeros.toArray
val globalMean = globalFeatureShardStats.mean.toArray

val randomEffectNumSamples: Long = randomLabelAndFeatures.length.toLong
val randomFeatureFirstOrderSums = randomLabelAndFeatures.map(_._2).toSeq.reduce(_ + _)

val m: Long = randomEffectNumSamples
val n: Long = globalPopulationNumSamples
val lastColumn = randomFeatureFirstOrderSums.keySet.max

randomFeatureFirstOrderSums.keySet.foreach { key =>
// Do computation on only binary and non-intercept features
if (isBinary(globallFeatureNonZero, globalPopulationNumSamples, globalMean, key) && key != lastColumn) {
var x: Double = randomFeatureFirstOrderSums(key)
var y: Double = globalPopulationFirstOrderMeans(key) * globalPopulationNumSamples
var lowerBound = None: Option[Double]
var upperBound = None: Option[Double]
var py = None: Option[Double]
// deal with extreme cases of x and y
(x, y) match {
case d if (d._1 == 0 && d._2 == 0) => {
lowerBound = Some(0.0)
upperBound = Some(Double.PositiveInfinity)
}
case d if (d._1 == 0 && d._2 != 0) => {
lowerBound = Some(0.0)
x = 0.5
}
case d if (d._1 != 0 && d._2 == 0) => {
y = 0.5
py = Some(y / n)
upperBound = Some(Double.PositiveInfinity)
}
case d if (d._1 == m && d._2 == n) => {
x = m - 0.5
y = n - 0.5
py = Some(y / n)
}
case _ => None
}
// we have the mean already from the statistics, save computation here
py match {
case Some(py) => None
case _ => py = Some(globalPopulationFirstOrderMeans(key))
}
val (t,variance) = computeMeanAndVariance(x, y, m, n, py.get)

upperBound match {
case Some(upperBound) => None
case _ => {
upperBound = Some(computeUpperBound(t, variance, quartile))
}
}

lowerBound match {
case Some(lowerBound) => None
case _ => {
lowerBound = Some(computeLowerBound(t, variance, quartile))
if (t < 1.0) {
lowerBound = Some(1.0 / upperBound.get)
}
}
}

lowerBounds.update(key, lowerBound.get)
} else{
// not good for further ranking purpose
// currently set a dummy bound for them to get accepted

lowerBounds.update(key, dummyBoundForNonBinaryOrIntercept)
}
}

lowerBounds
}

protected[ml] def computeMeanAndVariance(
x : Double,
y : Double,
m : Long,
n : Long,
py : Double
) : (Double,Double) = {
val t = ( x / m ) / py
val variance = 1.0 / x - 1.0 / m + 1.0 / y - 1.0 / n
(t,variance)
}

protected[ml] def computeLowerBound(
t: Double,
variance: Double,
quartile: Double
): Double = {
t * math.exp(-math.sqrt(variance) * quartile)
}

protected[ml] def computeUpperBound(
t: Double,
variance: Double,
quartile: Double
): Double = {
t * math.exp(math.sqrt(variance) * quartile)
}

protected[ml] def isBinary(
globallFeatureNonZero: Array[Double],
globalPopulationNumSamples: Long,
globalMean: Array[Double],
key: Int
): Boolean = {
(globallFeatureNonZero(key) * 1.0) / globalPopulationNumSamples == globalMean(key)
}
/**
* Compute Pearson correlation scores.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ package com.linkedin.photon.ml.data

import scala.collection.Set
import scala.util.hashing.byteswap64

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{StorageLevel => SparkStorageLevel}
import org.apache.spark.{Partitioner, SparkContext}

import com.linkedin.photon.ml.Types.{FeatureShardId, REId, REType, UniqueSampleId}
import com.linkedin.photon.ml.Types._
import com.linkedin.photon.ml.constants.StorageLevel
import com.linkedin.photon.ml.data.scoring.CoordinateDataScores
import com.linkedin.photon.ml.spark.{BroadcastLike, RDDLike}
Expand Down Expand Up @@ -240,7 +239,8 @@ object RandomEffectDataSet {
gameDataSet: RDD[(UniqueSampleId, GameDatum)],
randomEffectDataConfiguration: RandomEffectDataConfiguration,
randomEffectPartitioner: Partitioner,
existingModelKeysRddOpt: Option[RDD[REId]]): RandomEffectDataSet = {
existingModelKeysRddOpt: Option[RDD[REId]],
globalFeatureStats: FeatureShardStatisticsMapOpt): RandomEffectDataSet = {

val randomEffectType = randomEffectDataConfiguration.randomEffectType
val featureShardId = randomEffectDataConfiguration.featureShardId
Expand All @@ -252,7 +252,7 @@ object RandomEffectDataSet {
randomEffectDataConfiguration,
randomEffectPartitioner,
existingModelKeysRddOpt)
val activeData = featureSelectionOnActiveData(rawActiveData, randomEffectDataConfiguration)
val activeData = featureSelectionOnActiveData(rawActiveData, randomEffectDataConfiguration, globalFeatureStats)
.setName("Active data")
.persist(StorageLevel.INFREQUENT_REUSE_RDD_STORAGE_LEVEL)

Expand Down Expand Up @@ -488,7 +488,8 @@ object RandomEffectDataSet {
*/
private def featureSelectionOnActiveData(
activeData: RDD[(REId, LocalDataSet)],
randomEffectDataConfiguration: RandomEffectDataConfiguration): RDD[(REId, LocalDataSet)] = {
randomEffectDataConfiguration: RandomEffectDataConfiguration,
globalFeatureStats: FeatureShardStatisticsMapOpt): RDD[(REId, LocalDataSet)] = {

randomEffectDataConfiguration
.numFeaturesToSamplesRatioUpperBound
Expand All @@ -498,9 +499,24 @@ object RandomEffectDataSet {

// In case the above product overflows
if (numFeaturesToKeep < 0) numFeaturesToKeep = Int.MaxValue
val filteredLocalDataSet = localDataSet.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep)
globalFeatureStats match {
case Some(globalFeatureStats) => {
val featureShardId = randomEffectDataConfiguration.featureShardId
val globalFeaturesShardStats = globalFeatureStats.get(featureShardId)
val filteredLocalDataSet = localDataSet.filterFeaturesByRatioCIBound(
1.0,
2.575,
globalFeatureStats.get(featureShardId).get)

filteredLocalDataSet
}
case None => {
val filteredLocalDataSet = localDataSet.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep)

filteredLocalDataSet
}
}

filteredLocalDataSet
}
}
.getOrElse(activeData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
package com.linkedin.photon.ml.estimators

import scala.language.existentials

import org.apache.spark.SparkContext
import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators, Params}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.slf4j.Logger

import com.linkedin.photon.ml.TaskType
import com.linkedin.photon.ml.TaskType.TaskType
import com.linkedin.photon.ml.Types.{CoordinateId, FeatureShardId, UniqueSampleId}
import com.linkedin.photon.ml.Types.{CoordinateId, FeatureShardId, UniqueSampleId, FeatureShardStatisticsMapOpt}
import com.linkedin.photon.ml.algorithm._
import com.linkedin.photon.ml.constants.{MathConst, StorageLevel}
import com.linkedin.photon.ml.data._
Expand All @@ -43,6 +41,7 @@ import com.linkedin.photon.ml.supervised.classification.{LogisticRegressionModel
import com.linkedin.photon.ml.supervised.regression.{LinearRegressionModel, PoissonRegressionModel}
import com.linkedin.photon.ml.util._


/**
* Estimator implementation for GAME models.
*
Expand All @@ -57,6 +56,7 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
type SingleNodeLossFunctionConstructor = (PointwiseLossFunction) => SingleNodeGLMLossFunction
type DistributedLossFunctionConstructor = (PointwiseLossFunction) => DistributedGLMLossFunction


private implicit val parent: Identifiable = this
private val defaultNormalizationContext: NormalizationContextWrapper =
NormalizationContextBroadcast(sc.broadcast(NoNormalization()))
Expand Down Expand Up @@ -129,6 +129,10 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
"Flag to ignore the random effect samples lower bound when encountering a random effect ID without an existing " +
"model during warm-start training.")

val featureShardStats: Param[FeatureShardStatisticsMapOpt] = ParamUtils.createParam[FeatureShardStatisticsMapOpt](
"the global population feature statistics required to compute ratio confidence interval",
"We use this statistics to simply derive the ratio of random effect features and global" +
" ones to do feature selection on per-entity model")
//
// Initialize object
//
Expand Down Expand Up @@ -166,6 +170,7 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P

def setIgnoreThresholdForNewModels(value: Boolean): this.type = set(ignoreThresholdForNewModels, value)

def setFeatureStats(value: FeatureShardStatisticsMapOpt) : this.type = set(featureShardStats, value)
//
// Params trait extensions
//
Expand All @@ -192,6 +197,7 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
setDefault(computeVariance, false)
setDefault(treeAggregateDepth, DEFAULT_TREE_AGGREGATE_DEPTH)
setDefault(ignoreThresholdForNewModels, false)
setDefault(featureShardStats, None:FeatureShardStatisticsMapOpt)
}

/**
Expand Down Expand Up @@ -535,7 +541,12 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
None
}

val rawRandomEffectDataSet = RandomEffectDataSet(gameDataSet, reConfig, partitioner, existingModelKeysRddOpt)
val rawRandomEffectDataSet = RandomEffectDataSet(
gameDataSet,
reConfig,
partitioner,
existingModelKeysRddOpt,
getOrDefault(featureShardStats))
.setName(s"Random Effect Data Set: $coordinateId")
.persistRDD(StorageLevel.INFREQUENT_REUSE_RDD_STORAGE_LEVEL)
.materialize()
Expand Down
Loading

0 comments on commit 93d78d3

Please sign in to comment.