Skip to content

Commit

Permalink
add the confidence interval computation
Browse files Browse the repository at this point in the history
  • Loading branch information
Leon Gao committed Sep 19, 2018
1 parent fc45c4a commit ffc2a44
Show file tree
Hide file tree
Showing 9 changed files with 500 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class RandomEffectDataSetIntegTest extends SparkTestUtils {
Some(activeDataLowerBound))
val partitioner = new RandomEffectDataSetPartitioner(NUM_PARTITIONS, sc.broadcast(partitionMap))

val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, None)
val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, None, None)
val numUniqueRandomEffects = randomEffectDataSet.activeData.keys.count()

assertEquals(numUniqueRandomEffects, expectedUniqueRandomEffects)
Expand Down Expand Up @@ -155,7 +155,7 @@ class RandomEffectDataSetIntegTest extends SparkTestUtils {
Some(activeDataLowerBound))
val partitioner = new RandomEffectDataSetPartitioner(NUM_PARTITIONS, sc.broadcast(partitionMap))

val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, Some(existingIdsRDD))
val randomEffectDataSet = RandomEffectDataSet(rdd, randomEffectDataConfig, partitioner, Some(existingIdsRDD), None)
val numUniqueRandomEffects = randomEffectDataSet.activeData.keys.count()

assertEquals(numUniqueRandomEffects, expectedUniqueRandomEffects)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
*/
package com.linkedin.photon.ml.algorithm

import scala.collection.Set

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,46 @@ protected[ml] case class LocalDataSet(dataPoints: Array[(UniqueSampleId, Labeled
LocalDataSet(projectedDataPoints)
}

/**
* Filter features by binomial ratio confidence intervals.
*
* @param globalFeatureInstances The global instances with the features present
* @param globalPositiveInstances The global positive instances with the features present
* @param binaryIndices The binary feature columns indices
* @param nonBinaryIndices The binary feature columns indices
* @param intervalBound The lower bound threshold of the confidence interval used to filter features
* @param zScore The Z-score for the chosen two-tailed confidence level
* @return The filtered dataset
*/
def filterFeaturesByRatioCIBound(
globalFeatureInstances: Array[Double],
globalPositiveInstances: Array[Double],
binaryIndices: Set[Int],
nonBinaryIndices: Set[Int],
intervalBound: Double = 1.0,
zScore: Double = 2.575): LocalDataSet = {

val labelAndFeatures = dataPoints.map { case (_, labeledPoint) => (labeledPoint.label, labeledPoint.features) }
val lowerBounds = LocalDataSet.computeRatioCILowerBound(
labelAndFeatures,
globalFeatureInstances,
globalPositiveInstances,
binaryIndices,
numFeatures,
zScore)
val filteredBinaryFeaturesIndexSet = lowerBounds.filter(_._2 > intervalBound).keySet
val filteredFeaturesIndexSet = filteredBinaryFeaturesIndexSet ++ nonBinaryIndices

val filteredActivities = dataPoints.map { case (id, LabeledPoint(label, features, offset, weight)) =>

val filteredFeatures = LocalDataSet.filterFeaturesWithFeatureIndexSet(features, filteredFeaturesIndexSet)

(id, LabeledPoint(label, filteredFeatures, offset, weight))
}

LocalDataSet(filteredActivities)
}

/**
* Filter features by Pearson correlation score.
*
Expand Down Expand Up @@ -143,6 +183,8 @@ object LocalDataSet {
* @param isSortedByFirstIndex Whether or not to sort the data by global ID
* @return A new LocalDataSet
*/
val EPSILON = 0.5

protected[ml] def apply(
dataPoints: Array[(UniqueSampleId, LabeledPoint)],
isSortedByFirstIndex: Boolean): LocalDataSet = {
Expand Down Expand Up @@ -176,6 +218,105 @@ object LocalDataSet {
result
}

/**
* Compute feature ratio confidence interval lower bounds.
*
* @param labelAndFeatures An [[Array]] of (label, feature vector) tuples
* @param zScore The Z-score for the chosen two-tailed confidence level
* @param globalFeatureInstances The global instances with the features present
* @param globalPositiveInstances The global positive instances with the features present
* @param binaryIndices The binary feature columns indices
* @param epsilon The constant used to compute for extreme case of ratio modeling
* @return the lowerBounds for feature columns
*/
protected[ml] def computeRatioCILowerBound(
labelAndFeatures: Array[(Double, Vector[Double])],
globalFeatureInstances: Array[Double],
globalPositiveInstances: Array[Double],
binaryIndices: Set[Int],
numFeatures: Int,
zScore: Double): Map[Int, Double] = {

val n = globalFeatureInstances
val y = globalPositiveInstances

val m = labelAndFeatures.map(_._2).reduce(_ + _)
val x = labelAndFeatures
.filter(_._1 > MathConst.POSITIVE_RESPONSE_THRESHOLD)
.map(_._2)
.foldLeft(Vector.zeros[Double](numFeatures))(_ + _)

binaryIndices
.map { key =>
val x_col = x(key)
val m_col = m(key)
val y_col = y(key)
val n_col = n(key)

val lowerBound = if (y_col == 0.0 || y_col == n_col) {
0D
} else {
val (t, variance) = computeTAndVariance(math.max(x_col, EPSILON), m_col, y_col, n_col)

if (t < 1D) {
1D / computeUpperBound(t, variance, zScore)
} else {
computeLowerBound(t, variance, zScore)
}
}

(key, lowerBound)
}
.toMap
}

/**
* Compute t value and variance for ratio modelling.
*
* @param x The count for f_i == 1 and label == 1 in the local population
* @param m The count for f_i == 1 in the local population
* @param y The count for f_i == 1 and label == 1 in the global population
* @param n The count for f_i == 1 in the global population
* @return The mean and variance for ratio t
*/
protected[ml] def computeTAndVariance(x: Double, m: Double, y: Double, n: Double): (Double, Double) =
if (m == 0.0 || n == 0.0 || y == 0.0 || x == 0.0) {
(0.0, 0.0)
} else {
val t = (x / m) / (y / n)
val variance = 1.0 / x - 1.0 / m + 1.0 / y - 1.0 / n

(t, variance)
}

/**
* Compute the confidence interval lowerbound for ratio modelling.
*
* @param t The value of ratio
* @param variance The variance of the ratio
* @param zScore The Z-score for the chosen two-tailed confidence level
* @return The lowerbound for ratio t
*/
protected[ml] def computeLowerBound(
t: Double,
variance: Double,
zScore: Double): Double =
t * math.exp(-math.sqrt(variance) * zScore)

/**
* Compute the confidence interval upperbound for ratio modelling.
*
* @param t The value of ratio
* @param variance The variance of the ratio
* @param zScore The Z-score for the chosen two-tailed confidence level
* @return The upperbound for ratio t
*/
protected[ml] def computeUpperBound(
t: Double,
variance: Double,
zScore: Double): Double =
t * math.exp(math.sqrt(variance) * zScore)

/**
* Compute Pearson correlation scores.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
package com.linkedin.photon.ml.data

import scala.collection.Set
import scala.util.hashing.byteswap64

import org.apache.spark.broadcast.Broadcast
Expand All @@ -23,8 +22,10 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Partitioner, SparkContext}

import com.linkedin.photon.ml.Types.{FeatureShardId, REId, REType, UniqueSampleId}
import com.linkedin.photon.ml.constants.MathConst
import com.linkedin.photon.ml.data.scoring.CoordinateDataScores
import com.linkedin.photon.ml.spark.{BroadcastLike, RDDLike}
import com.linkedin.photon.ml.stat.BasicStatisticalSummary

/**
* Data set implementation for random effect datasets:
Expand Down Expand Up @@ -239,19 +240,28 @@ object RandomEffectDataSet {
gameDataSet: RDD[(UniqueSampleId, GameDatum)],
randomEffectDataConfiguration: RandomEffectDataConfiguration,
randomEffectPartitioner: Partitioner,
existingModelKeysRddOpt: Option[RDD[REId]]): RandomEffectDataSet = {
existingModelKeysRddOpt: Option[RDD[REId]],
featureShardStatsMapOpt: Option[Map[FeatureShardId, BasicStatisticalSummary]]): RandomEffectDataSet = {

val randomEffectType = randomEffectDataConfiguration.randomEffectType
val featureShardId = randomEffectDataConfiguration.featureShardId

val gameDataPartitioner = gameDataSet.partitioner.get

val globalPositiveInstances = gameDataSet
.values
.filter( _.response > MathConst.POSITIVE_RESPONSE_THRESHOLD)
.map( _.featureShardContainer(randomEffectDataConfiguration.featureShardId))
.reduce(_ + _)
.toArray

val rawActiveData = generateActiveData(
gameDataSet,
randomEffectDataConfiguration,
randomEffectPartitioner,
existingModelKeysRddOpt)
val activeData = featureSelectionOnActiveData(rawActiveData, randomEffectDataConfiguration)

val activeData = featureSelectionOnActiveData(rawActiveData, randomEffectDataConfiguration, featureShardStatsMapOpt, globalPositiveInstances)
.setName("Active data")
.persist(StorageLevel.DISK_ONLY)

Expand Down Expand Up @@ -462,6 +472,7 @@ object RandomEffectDataSet {
val passiveDataRandomEffectIds = passiveDataRandomEffectIdCountsMap
.filter(_._2 > passiveDataLowerBound)
.keySet
.toSet
val sparkContext = gameDataSet.sparkContext
val passiveDataRandomEffectIdsBroadcast = sparkContext.broadcast(passiveDataRandomEffectIds)
val filteredPassiveData = passiveData
Expand All @@ -488,21 +499,66 @@ object RandomEffectDataSet {
*/
private def featureSelectionOnActiveData(
activeData: RDD[(REId, LocalDataSet)],
randomEffectDataConfiguration: RandomEffectDataConfiguration): RDD[(REId, LocalDataSet)] = {
randomEffectDataConfiguration: RandomEffectDataConfiguration,
featureShardStatsOpt: Option[Map[FeatureShardId, BasicStatisticalSummary]],
globalPositiveInstances: Array[Double]): RDD[(REId, LocalDataSet)] = {

featureShardStatsOpt match {
case Some(featureShardStats) =>
val globalFeatureShardStats = featureShardStats(randomEffectDataConfiguration.featureShardId)
val (binaryIndices, nonBinaryIndices) = segregateBinaryFeatures(globalFeatureShardStats)

val broadcastBinaryFeatureIndices = activeData.sparkContext.broadcast(binaryIndices)
val broadcastNonBinaryFeatureIndices = activeData.sparkContext.broadcast(nonBinaryIndices)
val broadcastGlobalPositiveInstances = activeData.sparkContext.broadcast(globalPositiveInstances)
val broadcastGlobalFeatureInstances = activeData.sparkContext.broadcast(globalFeatureShardStats.numNonzeros.toArray)
val filteredActiveData = activeData.mapValues{ localDataSet =>
localDataSet.filterFeaturesByRatioCIBound(
broadcastGlobalFeatureInstances.value,
broadcastGlobalPositiveInstances.value,
broadcastBinaryFeatureIndices.value,
broadcastNonBinaryFeatureIndices.value)
}

randomEffectDataConfiguration
.numFeaturesToSamplesRatioUpperBound
.map { numFeaturesToSamplesRatioUpperBound =>
activeData.mapValues { localDataSet =>
var numFeaturesToKeep = math.ceil(numFeaturesToSamplesRatioUpperBound * localDataSet.numDataPoints).toInt
broadcastGlobalFeatureInstances.unpersist
broadcastBinaryFeatureIndices.unpersist
broadcastNonBinaryFeatureIndices.unpersist
broadcastGlobalPositiveInstances.unpersist

// In case the above product overflows
if (numFeaturesToKeep < 0) numFeaturesToKeep = Int.MaxValue
val filteredLocalDataSet = localDataSet.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep)
filteredActiveData

filteredLocalDataSet
}
}
.getOrElse(activeData)
case None =>
randomEffectDataConfiguration
.numFeaturesToSamplesRatioUpperBound
.map { numFeaturesToSamplesRatioUpperBound =>
activeData.mapValues { localDataSet =>
var numFeaturesToKeep = math.ceil(numFeaturesToSamplesRatioUpperBound * localDataSet.numDataPoints).toInt
// In case the above product overflows
if (numFeaturesToKeep < 0) numFeaturesToKeep = Int.MaxValue

localDataSet.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep)
}
}
.getOrElse(activeData)
}
}

/**
* Split the feature indices into binary features and non-binary features
*
* @param featureStatsSummary The feature statistical summary for global population
* @return The indices for binary and non-binary feature columns
*/
private def segregateBinaryFeatures(featureStatsSummary: BasicStatisticalSummary): (Set[Int], Set[Int]) = {

val nonZerosDiff = (featureStatsSummary.mean * featureStatsSummary.count.toDouble) - featureStatsSummary.numNonzeros
val allFeatureIndices = nonZerosDiff.keySet
val candidates = nonZerosDiff.findAll(_ < MathConst.EPSILON).toSet
val maxOne = featureStatsSummary.max.findAll(_ == 1D).toSet

val binaryIndices = candidates.intersect(maxOne)
val nonBinaryIndices = allFeatureIndices.diff(binaryIndices) ++ featureStatsSummary.interceptIndex

(binaryIndices, nonBinaryIndices)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import com.linkedin.photon.ml.optimization.game._
import com.linkedin.photon.ml.projector.{IdentityProjection, IndexMapProjectorRDD, ProjectionMatrixBroadcast}
import com.linkedin.photon.ml.sampling.DownSamplerHelper
import com.linkedin.photon.ml.spark.{BroadcastLike, RDDLike}
import com.linkedin.photon.ml.stat.BasicStatisticalSummary
import com.linkedin.photon.ml.supervised.classification.{LogisticRegressionModel, SmoothedHingeLossLinearSVMModel}
import com.linkedin.photon.ml.supervised.regression.{LinearRegressionModel, PoissonRegressionModel}
import com.linkedin.photon.ml.util._
Expand Down Expand Up @@ -102,6 +103,13 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
"coordinate, but the shifts and factors are different for each shard.",
PhotonParamValidators.nonEmpty[TraversableOnce, (CoordinateId, NormalizationContext)])

val featureShardStatistics: Param[Map[FeatureShardId, BasicStatisticalSummary]] =
ParamUtils.createParam[Map[FeatureShardId, BasicStatisticalSummary]](
"feature shard statistics",
"A map of shard name to feature space statistical summary. " +
"Used to filter random effect features by binomial proportions.",
PhotonParamValidators.nonEmpty[TraversableOnce, (FeatureShardId, BasicStatisticalSummary)])

val initialModel: Param[GameModel] = ParamUtils.createParam(
"initial model",
"Prior model to use as a starting point for training.")
Expand Down Expand Up @@ -154,6 +162,9 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
def setCoordinateNormalizationContexts(value: Map[CoordinateId, NormalizationContext]): this.type =
set(coordinateNormalizationContexts, value)

def setFeatureShardStatistics(value: Map[FeatureShardId, BasicStatisticalSummary]) : this.type =
set(featureShardStatistics, value)

def setInitialModel(value: GameModel): this.type = set(initialModel, value)

def setPartialRetrainLockedCoordinates(value: Set[CoordinateId]): this.type =
Expand Down Expand Up @@ -526,7 +537,12 @@ class GameEstimator(val sc: SparkContext, implicit val logger: Logger) extends P
None
}

val rawRandomEffectDataSet = RandomEffectDataSet(gameDataSet, reConfig, rePartitioner, existingModelKeysRddOpt)
val rawRandomEffectDataSet = RandomEffectDataSet(
gameDataSet,
reConfig,
rePartitioner,
existingModelKeysRddOpt,
get(featureShardStatistics))
.setName(s"Random Effect Data Set: $coordinateId")
.persistRDD(StorageLevel.DISK_ONLY)
.materialize()
Expand Down
Loading

0 comments on commit ffc2a44

Please sign in to comment.