Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Pearson correlation score based feature filtering for random effects #457

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@
*/
package com.linkedin.photon.ml.data

import scala.collection.mutable

import breeze.linalg.Vector

import com.linkedin.photon.ml.Types.UniqueSampleId
import com.linkedin.photon.ml.constants.MathConst
import com.linkedin.photon.ml.util.VectorUtils

/**
* Local dataset implementation.
Expand Down Expand Up @@ -93,40 +87,6 @@ protected[ml] case class LocalDataset(dataPoints: Array[(UniqueSampleId, Labeled

LocalDataset(updatedDataPoints)
}

/**
* Filter features by Pearson correlation score.
*
* @param numFeaturesToKeep The number of features to keep
* @return The filtered dataset
*/
def filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep: Int): LocalDataset = {

val numActiveFeatures: Int = dataPoints.flatMap(_._2.features.activeKeysIterator).toSet.size

if (numFeaturesToKeep < numActiveFeatures) {
val labelAndFeatures = dataPoints.map { case (_, labeledPoint) => (labeledPoint.label, labeledPoint.features) }
val pearsonScores = LocalDataset.stableComputePearsonCorrelationScore(labelAndFeatures)

val filteredFeaturesIndexSet = pearsonScores
.toArray
.sortBy { case (_, score) => math.abs(score) }
.takeRight(numFeaturesToKeep)
.map(_._1)
.toSet

val filteredActivities = dataPoints.map { case (id, LabeledPoint(label, features, offset, weight)) =>

val filteredFeatures = LocalDataset.filterFeaturesWithFeatureIndexSet(features, filteredFeaturesIndexSet)

(id, LabeledPoint(label, filteredFeatures, offset, weight))
}

LocalDataset(filteredActivities)
} else {
this
}
}
}

object LocalDataset {
Expand All @@ -148,175 +108,4 @@ object LocalDataset {
LocalDataset(dataPoints.sortBy(_._1))
}
}

/**
* Filter features by feature index.
*
* @param features The original feature set
* @param featureIndexSet The feature index set
* @return The filtered feature vector
*/
private def filterFeaturesWithFeatureIndexSet(
features: Vector[Double],
featureIndexSet: Set[Int]): Vector[Double] = {

val result = VectorUtils.zeroOfSameType(features)

features.activeIterator.foreach { case (key, value) =>
if (featureIndexSet.contains(key)) {
result(key) = value
}
}

result
}

/**
* Compute Pearson correlation scores using a numerically stable algorithm.
*
* @param labelAndFeatures An array of (label, feature) tuples
* @return The Pearson correlation scores for each tuple
*/
protected[ml] def stableComputePearsonCorrelationScore(
labelAndFeatures: Array[(Double, Vector[Double])]): Map[Int, Double] = {

val featureMeans = mutable.Map[Int, Double]()
val featureUnscaledVars = mutable.Map[Int, Double]()
var labelMean = 0.0
var labelUnscaledVariance = 0.0
val unscaledCovariances = mutable.Map[Int, Double]()
var interceptAdded = false
var numSamples = 0

labelAndFeatures.foreach { case (label, features) =>
numSamples += 1

val deltaLabel = label - labelMean
labelMean += deltaLabel / numSamples
labelUnscaledVariance += deltaLabel * (label - labelMean)

// Note that, if there is duplicated keys in the feature vector, then the following Pearson correlation scores
// calculation will screw up
features.iterator.foreach { case (key, value) =>
val prevFeatureMean = featureMeans.getOrElse(key, 0.0)
val deltaFeature = value - prevFeatureMean
val featureMean = prevFeatureMean + deltaFeature / numSamples

val prevFeatureUnscaledVar = featureUnscaledVars.getOrElse(key, 0.0)
val featureUnscaledVar = prevFeatureUnscaledVar + deltaFeature * (value - featureMean)

val prevCovariance = unscaledCovariances.getOrElse(key, 0.0)
val unscaledCovariance = prevCovariance + deltaFeature * deltaLabel * (numSamples - 1) / numSamples

featureMeans.update(key, featureMean)
featureUnscaledVars.update(key, featureUnscaledVar)
unscaledCovariances.update(key, unscaledCovariance)
}
}

val labelStd = math.sqrt(labelUnscaledVariance)

featureMeans
.iterator
.map { case (key, featureMean) =>
val featureStd = math.sqrt(featureUnscaledVars(key))
val covariance = unscaledCovariances(key)

// When the standard deviation of the feature is close to 0 we treat it as the intercept term.
val score = if (featureStd < math.sqrt(numSamples) * MathConst.EPSILON) {
// Note that if the mean and standard deviation are equal to zero, it either means that the feature is constant
if (featureMean == 1.0 && !interceptAdded) {
interceptAdded = true
1.0
} else {
0.0
}
} else {
covariance / (labelStd * featureStd + MathConst.EPSILON)
}

require(math.abs(score) <= 1 + MathConst.EPSILON,
s"Computed pearson correlation score is $score, while the score's magnitude should be less than 1. " +
s"(Diagnosis:\n" +
s"featureKey=$key\n" +
s"featureStd=$featureStd\n" +
s"labelStd=$labelStd\n" +
s"covariance=$covariance\n" +
s"numSamples=$numSamples\n" +
s"labelAndFeatures used to compute Pearson correlation score:\n${labelAndFeatures.mkString("\n")}})")

(key, score)
}
.toMap
}

/**
* Compute Pearson correlation scores.
*
* @param labelAndFeatures An array of (label, feature) tuples
* @return The Pearson correlation scores for each tuple
*/
protected[ml] def computePearsonCorrelationScore(
labelAndFeatures: Array[(Double, Vector[Double])]): Map[Int, Double] = {

val featureLabelProductSums = mutable.Map[Int, Double]()
val featureFirstOrderSums = mutable.Map[Int, Double]()
val featureSecondOrderSums = mutable.Map[Int, Double]()
var labelFirstOrderSum = 0.0
var labelSecondOrderSum = 0.0
var numSamples = 0
var interceptAdded = false

labelAndFeatures.foreach { case (label, features) =>
numSamples += 1
labelFirstOrderSum += label
labelSecondOrderSum += label * label
// Note that, if there is duplicated keys in the feature vector, then the following Pearson correlation scores
// calculation will screw up
features.activeIterator.foreach { case (key, value) =>
featureFirstOrderSums.update(key, featureFirstOrderSums.getOrElse(key, 0.0) + value)
featureSecondOrderSums.update(key, featureSecondOrderSums.getOrElse(key, 0.0) + value * value)
featureLabelProductSums.update(key, featureLabelProductSums.getOrElse(key, 0.0) + value * label)
}
}

featureFirstOrderSums
.keySet
.map { key =>
val featureFirstOrderSum = featureFirstOrderSums(key)
val featureSecondOrderSum = featureSecondOrderSums(key)
val featureLabelProductSum = featureLabelProductSums(key)
val numerator = numSamples * featureLabelProductSum - featureFirstOrderSum * labelFirstOrderSum
val std = math.sqrt(math.abs(numSamples * featureSecondOrderSum - featureFirstOrderSum * featureFirstOrderSum))
val denominator = std * math.sqrt(numSamples * labelSecondOrderSum - labelFirstOrderSum * labelFirstOrderSum)

// When the standard deviation of the feature is close to 0, we treat it as the intercept term
val score = if (std < MathConst.EPSILON) {
if (interceptAdded) {
0.0
} else {
interceptAdded = true
1.0
}
} else {
numerator / (denominator + MathConst.EPSILON)
}

require(math.abs(score) <= 1 + MathConst.EPSILON,
s"Computed pearson correlation score is $score, while the score's magnitude should be less than 1. " +
s"(Diagnosis:\n" +
s"numerator=$numerator\n" +
s"denominator=$denominator\n" +
s"numSamples=$numSamples\n" +
s"featureFirstOrderSum=$featureFirstOrderSum\n" +
s"featureSecondOrderSum=$featureSecondOrderSum\n" +
s"featureLabelProductSum=$featureLabelProductSum\n" +
s"labelFirstOrderSum=$labelFirstOrderSum\n" +
s"labelSecondOrderSum=$labelSecondOrderSum\n" +
s"labelAndFeatures used to compute Pearson correlation score:\n${labelAndFeatures.mkString("\n")}})")

(key, score)
}
.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import com.linkedin.photon.ml.data.scoring.CoordinateDataScores
import com.linkedin.photon.ml.model.RandomEffectModel
import com.linkedin.photon.ml.projector.LinearSubspaceProjector
import com.linkedin.photon.ml.spark.{BroadcastLike, RDDLike}
import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel
import com.linkedin.photon.ml.util.VectorUtils

/**
Expand Down Expand Up @@ -287,27 +286,24 @@ object RandomEffectDataset {
randomEffectDataConfiguration,
randomEffectPartitioner)
val projectedGroupedActiveData = generateProjectedActiveData(unfilteredActiveData, unfilteredProjectors)
val projectedUnfilteredActiveData = featureSelectionOnActiveData(
projectedGroupedActiveData,
randomEffectDataConfiguration.numFeaturesToSamplesRatioUpperBound)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to remove the numFeaturesToSamplesRatioUpperBound from the data configuration


val (activeData, passiveData, uniqueIdToRandomEffectIds, projectors) =
randomEffectDataConfiguration.numActiveDataPointsLowerBound match {

case Some(activeDataLowerBound) =>

projectedUnfilteredActiveData.persist(StorageLevel.MEMORY_ONLY_SER)
projectedGroupedActiveData.persist(StorageLevel.MEMORY_ONLY_SER)

// Filter entities which do not meet active data lower bound threshold
val filteredActiveData = filterActiveData(
projectedUnfilteredActiveData,
projectedGroupedActiveData,
activeDataLowerBound,
existingModelKeysRddOpt)
filteredActiveData.persist(storageLevel).count

val passiveData = generatePassiveData(
keyedGameDataset,
generateIdMap(projectedUnfilteredActiveData, uniqueIdPartitioner))
generateIdMap(projectedGroupedActiveData, uniqueIdPartitioner))
passiveData.persist(storageLevel).count

val uniqueIdToRandomEffectIds = generateIdMap(filteredActiveData, uniqueIdPartitioner)
Expand All @@ -316,22 +312,22 @@ object RandomEffectDataset {
val filteredProjectors = filterProjectors(unfilteredProjectors, filteredActiveData)
filteredProjectors.persist(storageLevel).count

projectedUnfilteredActiveData.unpersist()
projectedGroupedActiveData.unpersist()
unfilteredProjectors.unpersist()

(filteredActiveData, passiveData, uniqueIdToRandomEffectIds, filteredProjectors)

case None =>

projectedUnfilteredActiveData.persist(storageLevel).count
projectedGroupedActiveData.persist(storageLevel).count

val uniqueIdToRandomEffectIds = generateIdMap(projectedUnfilteredActiveData, uniqueIdPartitioner)
val uniqueIdToRandomEffectIds = generateIdMap(projectedGroupedActiveData, uniqueIdPartitioner)
uniqueIdToRandomEffectIds.persist(storageLevel).count

val passiveData = generatePassiveData(keyedGameDataset, uniqueIdToRandomEffectIds)
passiveData.persist(storageLevel).count

(projectedUnfilteredActiveData, passiveData, uniqueIdToRandomEffectIds, unfilteredProjectors)
(projectedGroupedActiveData, passiveData, uniqueIdToRandomEffectIds, unfilteredProjectors)
}

//
Expand Down Expand Up @@ -521,7 +517,7 @@ object RandomEffectDataset {
val weightMultiplierOpt = if (count > sampleCap) Some(1D * count / sampleCap) else None

data.map { case ComparableLabeledPointWithId(_, uniqueId, LabeledPoint(label, features, offset, weight)) =>
(uniqueId, LabeledPoint(label, features, offset, weightMultiplierOpt.map(_ * weight).getOrElse(weight)))
(uniqueId, LabeledPoint(label, features, offset, weightMultiplierOpt.fold(weight)(_ * weight)))
}
}
}
Expand Down Expand Up @@ -549,32 +545,6 @@ object RandomEffectDataset {
LocalDataset(projectedData.toArray, isSortedByFirstIndex = false)
}

/**
* Reduce active data feature dimension for entities with few samples. The maximum feature dimension is limited to
* the number of samples multiplied by the feature dimension ratio. Features are chosen by greatest Pearson
* correlation score.
*
* @param activeData An [[RDD]] of data grouped by entity ID
* @param numFeaturesToSamplesRatioUpperBoundOpt Optional ratio of samples to feature dimension
* @return The input data with feature dimension reduced for entities whose feature dimension greatly exceeded the
* number of available samples
*/
private def featureSelectionOnActiveData(
activeData: RDD[(REId, LocalDataset)],
numFeaturesToSamplesRatioUpperBoundOpt: Option[Double]): RDD[(REId, LocalDataset)] =
numFeaturesToSamplesRatioUpperBoundOpt
.map { numFeaturesToSamplesRatioUpperBound =>
activeData.mapValues { localDataset =>

var numFeaturesToKeep = math.ceil(numFeaturesToSamplesRatioUpperBound * localDataset.numDataPoints).toInt
// In case the above product overflows
if (numFeaturesToKeep < 0) numFeaturesToKeep = Int.MaxValue

localDataset.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep)
}
}
.getOrElse(activeData)

/**
* Filter out entities with less data than a given threshold.
*
Expand Down Expand Up @@ -644,7 +614,7 @@ object RandomEffectDataset {
*
* @param unfilteredProjectors The unfiltered projectors
* @param filteredActiveData The filtered active data
* @return [[unfilteredProjectors]] with all projectors for entities not in [[filteredActiveData]] removed
* @return A [[RDD]] of projectors, but only for entities in the filtered active data
*/
protected[data] def filterProjectors(
unfilteredProjectors: RDD[(REId, LinearSubspaceProjector)],
Expand Down
Loading