From e861e53bb1ccdc7a054925c7a30aaa45b728ca6f Mon Sep 17 00:00:00 2001 From: Alex Shelkovnykov Date: Wed, 19 Feb 2020 10:03:13 -0800 Subject: [PATCH] Remove Pearson correlation score based feature filtering for random effects --- .../photon/ml/data/LocalDataset.scala | 211 ------------------ .../photon/ml/data/RandomEffectDataset.scala | 48 +--- .../photon/ml/data/LocalDatasetTest.scala | 192 +--------------- 3 files changed, 10 insertions(+), 441 deletions(-) diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/data/LocalDataset.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/data/LocalDataset.scala index 487d7ceb..7881dccf 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/data/LocalDataset.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/data/LocalDataset.scala @@ -14,13 +14,7 @@ */ package com.linkedin.photon.ml.data -import scala.collection.mutable - -import breeze.linalg.Vector - import com.linkedin.photon.ml.Types.UniqueSampleId -import com.linkedin.photon.ml.constants.MathConst -import com.linkedin.photon.ml.util.VectorUtils /** * Local dataset implementation. @@ -93,40 +87,6 @@ protected[ml] case class LocalDataset(dataPoints: Array[(UniqueSampleId, Labeled LocalDataset(updatedDataPoints) } - - /** - * Filter features by Pearson correlation score. - * - * @param numFeaturesToKeep The number of features to keep - * @return The filtered dataset - */ - def filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep: Int): LocalDataset = { - - val numActiveFeatures: Int = dataPoints.flatMap(_._2.features.activeKeysIterator).toSet.size - - if (numFeaturesToKeep < numActiveFeatures) { - val labelAndFeatures = dataPoints.map { case (_, labeledPoint) => (labeledPoint.label, labeledPoint.features) } - val pearsonScores = LocalDataset.stableComputePearsonCorrelationScore(labelAndFeatures) - - val filteredFeaturesIndexSet = pearsonScores - .toArray - .sortBy { case (_, score) => math.abs(score) } - .takeRight(numFeaturesToKeep) - .map(_._1) - .toSet - - val filteredActivities = dataPoints.map { case (id, LabeledPoint(label, features, offset, weight)) => - - val filteredFeatures = LocalDataset.filterFeaturesWithFeatureIndexSet(features, filteredFeaturesIndexSet) - - (id, LabeledPoint(label, filteredFeatures, offset, weight)) - } - - LocalDataset(filteredActivities) - } else { - this - } - } } object LocalDataset { @@ -148,175 +108,4 @@ object LocalDataset { LocalDataset(dataPoints.sortBy(_._1)) } } - - /** - * Filter features by feature index. - * - * @param features The original feature set - * @param featureIndexSet The feature index set - * @return The filtered feature vector - */ - private def filterFeaturesWithFeatureIndexSet( - features: Vector[Double], - featureIndexSet: Set[Int]): Vector[Double] = { - - val result = VectorUtils.zeroOfSameType(features) - - features.activeIterator.foreach { case (key, value) => - if (featureIndexSet.contains(key)) { - result(key) = value - } - } - - result - } - - /** - * Compute Pearson correlation scores using a numerically stable algorithm. - * - * @param labelAndFeatures An array of (label, feature) tuples - * @return The Pearson correlation scores for each tuple - */ - protected[ml] def stableComputePearsonCorrelationScore( - labelAndFeatures: Array[(Double, Vector[Double])]): Map[Int, Double] = { - - val featureMeans = mutable.Map[Int, Double]() - val featureUnscaledVars = mutable.Map[Int, Double]() - var labelMean = 0.0 - var labelUnscaledVariance = 0.0 - val unscaledCovariances = mutable.Map[Int, Double]() - var interceptAdded = false - var numSamples = 0 - - labelAndFeatures.foreach { case (label, features) => - numSamples += 1 - - val deltaLabel = label - labelMean - labelMean += deltaLabel / numSamples - labelUnscaledVariance += deltaLabel * (label - labelMean) - - // Note that, if there is duplicated keys in the feature vector, then the following Pearson correlation scores - // calculation will screw up - features.iterator.foreach { case (key, value) => - val prevFeatureMean = featureMeans.getOrElse(key, 0.0) - val deltaFeature = value - prevFeatureMean - val featureMean = prevFeatureMean + deltaFeature / numSamples - - val prevFeatureUnscaledVar = featureUnscaledVars.getOrElse(key, 0.0) - val featureUnscaledVar = prevFeatureUnscaledVar + deltaFeature * (value - featureMean) - - val prevCovariance = unscaledCovariances.getOrElse(key, 0.0) - val unscaledCovariance = prevCovariance + deltaFeature * deltaLabel * (numSamples - 1) / numSamples - - featureMeans.update(key, featureMean) - featureUnscaledVars.update(key, featureUnscaledVar) - unscaledCovariances.update(key, unscaledCovariance) - } - } - - val labelStd = math.sqrt(labelUnscaledVariance) - - featureMeans - .iterator - .map { case (key, featureMean) => - val featureStd = math.sqrt(featureUnscaledVars(key)) - val covariance = unscaledCovariances(key) - - // When the standard deviation of the feature is close to 0 we treat it as the intercept term. - val score = if (featureStd < math.sqrt(numSamples) * MathConst.EPSILON) { - // Note that if the mean and standard deviation are equal to zero, it either means that the feature is constant - if (featureMean == 1.0 && !interceptAdded) { - interceptAdded = true - 1.0 - } else { - 0.0 - } - } else { - covariance / (labelStd * featureStd + MathConst.EPSILON) - } - - require(math.abs(score) <= 1 + MathConst.EPSILON, - s"Computed pearson correlation score is $score, while the score's magnitude should be less than 1. " + - s"(Diagnosis:\n" + - s"featureKey=$key\n" + - s"featureStd=$featureStd\n" + - s"labelStd=$labelStd\n" + - s"covariance=$covariance\n" + - s"numSamples=$numSamples\n" + - s"labelAndFeatures used to compute Pearson correlation score:\n${labelAndFeatures.mkString("\n")}})") - - (key, score) - } - .toMap - } - - /** - * Compute Pearson correlation scores. - * - * @param labelAndFeatures An array of (label, feature) tuples - * @return The Pearson correlation scores for each tuple - */ - protected[ml] def computePearsonCorrelationScore( - labelAndFeatures: Array[(Double, Vector[Double])]): Map[Int, Double] = { - - val featureLabelProductSums = mutable.Map[Int, Double]() - val featureFirstOrderSums = mutable.Map[Int, Double]() - val featureSecondOrderSums = mutable.Map[Int, Double]() - var labelFirstOrderSum = 0.0 - var labelSecondOrderSum = 0.0 - var numSamples = 0 - var interceptAdded = false - - labelAndFeatures.foreach { case (label, features) => - numSamples += 1 - labelFirstOrderSum += label - labelSecondOrderSum += label * label - // Note that, if there is duplicated keys in the feature vector, then the following Pearson correlation scores - // calculation will screw up - features.activeIterator.foreach { case (key, value) => - featureFirstOrderSums.update(key, featureFirstOrderSums.getOrElse(key, 0.0) + value) - featureSecondOrderSums.update(key, featureSecondOrderSums.getOrElse(key, 0.0) + value * value) - featureLabelProductSums.update(key, featureLabelProductSums.getOrElse(key, 0.0) + value * label) - } - } - - featureFirstOrderSums - .keySet - .map { key => - val featureFirstOrderSum = featureFirstOrderSums(key) - val featureSecondOrderSum = featureSecondOrderSums(key) - val featureLabelProductSum = featureLabelProductSums(key) - val numerator = numSamples * featureLabelProductSum - featureFirstOrderSum * labelFirstOrderSum - val std = math.sqrt(math.abs(numSamples * featureSecondOrderSum - featureFirstOrderSum * featureFirstOrderSum)) - val denominator = std * math.sqrt(numSamples * labelSecondOrderSum - labelFirstOrderSum * labelFirstOrderSum) - - // When the standard deviation of the feature is close to 0, we treat it as the intercept term - val score = if (std < MathConst.EPSILON) { - if (interceptAdded) { - 0.0 - } else { - interceptAdded = true - 1.0 - } - } else { - numerator / (denominator + MathConst.EPSILON) - } - - require(math.abs(score) <= 1 + MathConst.EPSILON, - s"Computed pearson correlation score is $score, while the score's magnitude should be less than 1. " + - s"(Diagnosis:\n" + - s"numerator=$numerator\n" + - s"denominator=$denominator\n" + - s"numSamples=$numSamples\n" + - s"featureFirstOrderSum=$featureFirstOrderSum\n" + - s"featureSecondOrderSum=$featureSecondOrderSum\n" + - s"featureLabelProductSum=$featureLabelProductSum\n" + - s"labelFirstOrderSum=$labelFirstOrderSum\n" + - s"labelSecondOrderSum=$labelSecondOrderSum\n" + - s"labelAndFeatures used to compute Pearson correlation score:\n${labelAndFeatures.mkString("\n")}})") - - (key, score) - } - .toMap - } } diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/data/RandomEffectDataset.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/data/RandomEffectDataset.scala index eb54d140..af275374 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/data/RandomEffectDataset.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/data/RandomEffectDataset.scala @@ -29,7 +29,6 @@ import com.linkedin.photon.ml.data.scoring.CoordinateDataScores import com.linkedin.photon.ml.model.RandomEffectModel import com.linkedin.photon.ml.projector.LinearSubspaceProjector import com.linkedin.photon.ml.spark.{BroadcastLike, RDDLike} -import com.linkedin.photon.ml.supervised.model.GeneralizedLinearModel import com.linkedin.photon.ml.util.VectorUtils /** @@ -287,27 +286,24 @@ object RandomEffectDataset { randomEffectDataConfiguration, randomEffectPartitioner) val projectedGroupedActiveData = generateProjectedActiveData(unfilteredActiveData, unfilteredProjectors) - val projectedUnfilteredActiveData = featureSelectionOnActiveData( - projectedGroupedActiveData, - randomEffectDataConfiguration.numFeaturesToSamplesRatioUpperBound) val (activeData, passiveData, uniqueIdToRandomEffectIds, projectors) = randomEffectDataConfiguration.numActiveDataPointsLowerBound match { case Some(activeDataLowerBound) => - projectedUnfilteredActiveData.persist(StorageLevel.MEMORY_ONLY_SER) + projectedGroupedActiveData.persist(StorageLevel.MEMORY_ONLY_SER) // Filter entities which do not meet active data lower bound threshold val filteredActiveData = filterActiveData( - projectedUnfilteredActiveData, + projectedGroupedActiveData, activeDataLowerBound, existingModelKeysRddOpt) filteredActiveData.persist(storageLevel).count val passiveData = generatePassiveData( keyedGameDataset, - generateIdMap(projectedUnfilteredActiveData, uniqueIdPartitioner)) + generateIdMap(projectedGroupedActiveData, uniqueIdPartitioner)) passiveData.persist(storageLevel).count val uniqueIdToRandomEffectIds = generateIdMap(filteredActiveData, uniqueIdPartitioner) @@ -316,22 +312,22 @@ object RandomEffectDataset { val filteredProjectors = filterProjectors(unfilteredProjectors, filteredActiveData) filteredProjectors.persist(storageLevel).count - projectedUnfilteredActiveData.unpersist() + projectedGroupedActiveData.unpersist() unfilteredProjectors.unpersist() (filteredActiveData, passiveData, uniqueIdToRandomEffectIds, filteredProjectors) case None => - projectedUnfilteredActiveData.persist(storageLevel).count + projectedGroupedActiveData.persist(storageLevel).count - val uniqueIdToRandomEffectIds = generateIdMap(projectedUnfilteredActiveData, uniqueIdPartitioner) + val uniqueIdToRandomEffectIds = generateIdMap(projectedGroupedActiveData, uniqueIdPartitioner) uniqueIdToRandomEffectIds.persist(storageLevel).count val passiveData = generatePassiveData(keyedGameDataset, uniqueIdToRandomEffectIds) passiveData.persist(storageLevel).count - (projectedUnfilteredActiveData, passiveData, uniqueIdToRandomEffectIds, unfilteredProjectors) + (projectedGroupedActiveData, passiveData, uniqueIdToRandomEffectIds, unfilteredProjectors) } // @@ -521,7 +517,7 @@ object RandomEffectDataset { val weightMultiplierOpt = if (count > sampleCap) Some(1D * count / sampleCap) else None data.map { case ComparableLabeledPointWithId(_, uniqueId, LabeledPoint(label, features, offset, weight)) => - (uniqueId, LabeledPoint(label, features, offset, weightMultiplierOpt.map(_ * weight).getOrElse(weight))) + (uniqueId, LabeledPoint(label, features, offset, weightMultiplierOpt.fold(weight)(_ * weight))) } } } @@ -549,32 +545,6 @@ object RandomEffectDataset { LocalDataset(projectedData.toArray, isSortedByFirstIndex = false) } - /** - * Reduce active data feature dimension for entities with few samples. The maximum feature dimension is limited to - * the number of samples multiplied by the feature dimension ratio. Features are chosen by greatest Pearson - * correlation score. - * - * @param activeData An [[RDD]] of data grouped by entity ID - * @param numFeaturesToSamplesRatioUpperBoundOpt Optional ratio of samples to feature dimension - * @return The input data with feature dimension reduced for entities whose feature dimension greatly exceeded the - * number of available samples - */ - private def featureSelectionOnActiveData( - activeData: RDD[(REId, LocalDataset)], - numFeaturesToSamplesRatioUpperBoundOpt: Option[Double]): RDD[(REId, LocalDataset)] = - numFeaturesToSamplesRatioUpperBoundOpt - .map { numFeaturesToSamplesRatioUpperBound => - activeData.mapValues { localDataset => - - var numFeaturesToKeep = math.ceil(numFeaturesToSamplesRatioUpperBound * localDataset.numDataPoints).toInt - // In case the above product overflows - if (numFeaturesToKeep < 0) numFeaturesToKeep = Int.MaxValue - - localDataset.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep) - } - } - .getOrElse(activeData) - /** * Filter out entities with less data than a given threshold. * @@ -644,7 +614,7 @@ object RandomEffectDataset { * * @param unfilteredProjectors The unfiltered projectors * @param filteredActiveData The filtered active data - * @return [[unfilteredProjectors]] with all projectors for entities not in [[filteredActiveData]] removed + * @return A [[RDD]] of projectors, but only for entities in the filtered active data */ protected[data] def filterProjectors( unfilteredProjectors: RDD[(REId, LinearSubspaceProjector)], diff --git a/photon-api/src/test/scala/com/linkedin/photon/ml/data/LocalDatasetTest.scala b/photon-api/src/test/scala/com/linkedin/photon/ml/data/LocalDatasetTest.scala index 435dae94..3fdac009 100644 --- a/photon-api/src/test/scala/com/linkedin/photon/ml/data/LocalDatasetTest.scala +++ b/photon-api/src/test/scala/com/linkedin/photon/ml/data/LocalDatasetTest.scala @@ -16,13 +16,12 @@ package com.linkedin.photon.ml.data import java.util.Random -import breeze.linalg.{SparseVector, Vector} +import breeze.linalg.Vector import org.mockito.Mockito._ import org.testng.Assert._ import org.testng.annotations.Test import com.linkedin.photon.ml.constants.MathConst -import com.linkedin.photon.ml.test.CommonTestUtils /** * Unit tests for [[LocalDataset]]. @@ -90,193 +89,4 @@ class LocalDatasetTest { assertEquals(labeledPoint.offset, offset, MathConst.EPSILON) } } - - /** - * Test the stable Pearson correlation score computation. - */ - @Test(groups = Array[String]("testPearsonCorrelationScore", "testCore")) - def testPearsonCorrelationScore(): Unit = { - - // Test input data - val labels = Array(1.0, 4.0, 6.0, 9.0) - val features = Array( - Vector(0.0, 0.0, 2.0), Vector(5.0, 0.0, -3.0), Vector(7.0, 0.0, -8.0), Vector(0.0, 0.0, -1.0)) - val expected = Map(0 -> 0.05564149, 1 -> 0.0, 2 -> -0.40047142) - val labelAndFeatures = labels.zip(features) - val computed = LocalDataset.stableComputePearsonCorrelationScore(labelAndFeatures) - - computed.foreach { case (key, value) => - assertEquals( - expected(key), - value, - CommonTestUtils.LOW_PRECISION_TOLERANCE, - s"Computed Pearson correlation score is $value, while the expected value is ${expected(key)}.") - } - } - - /** - * Test the stable Pearson correlation score computation on sparse feature vectors. - */ - @Test(groups = Array[String]("testPearsonCorrelationScore", "testCore")) - def testStablePearsonCorrelationScoreOnSparseVector(): Unit = { - - // Test input data - val labels = Array(1.0, 4.0, 6.0, 9.0) - val numFeatures = 3 - - val features = Array[Vector[Double]]( - new SparseVector[Double](Array(2), Array(2.0), numFeatures), - new SparseVector[Double](Array(0, 2), Array(5.0, -3.0), numFeatures), - new SparseVector[Double](Array(0, 2), Array(7.0, -8.0), numFeatures), - new SparseVector[Double](Array(2), Array(-1.0), numFeatures) - ) - - val expected = Map(0 -> 0.05564149, 1 -> 0.0, 2 -> -0.40047142) - - val labelAndFeatures = labels.zip(features) - - val computed = LocalDataset.stableComputePearsonCorrelationScore(labelAndFeatures) - - computed.foreach { case (key, value) => - assertEquals( - expected(key), - value, - CommonTestUtils.LOW_PRECISION_TOLERANCE, - s"Computed Pearson correlation score is $value, while the expected value is ${expected(key)}.") - } - } - - /** - * Test that the stable Pearson correlation score computation properly recognizes an intercept column. - */ - @Test(groups = Array[String]("testPearsonCorrelationScore", "testCore")) - def testPearsonCorrelationScoreForIntercept(): Unit = { - - // Test input data - val labels = Array(1.0, 4.0, 6.0, 9.0) - val features = Array( - Vector(0.0, 0.0, 1.0, 2.0), - Vector(5.0, 0.0, 1.0, -3.0), - Vector(7.0, 0.0, 1.0, -8.0), - Vector(0.0, 0.0, 1.0, -1.0) - ) - val expected = Map(0 -> 0.05564149, 1 -> 0.0, 2 -> 1.0, 3 -> -0.40047142) - val labelAndFeatures = labels.zip(features) - val computed = LocalDataset.stableComputePearsonCorrelationScore(labelAndFeatures) - - computed.foreach { case (key, value) => - assertEquals( - expected(key), - value, - CommonTestUtils.LOW_PRECISION_TOLERANCE, - s"Computed Pearson correlation score is $value, while the expected value is ${expected(key)}.") - } - } - - /** - * Test the stable Pearson correlation score numerical stability. - */ - @Test(groups = Array[String]("testPearsonCorrelationScore", "testCore")) - def testStablePearsonCorrelationScoreStability(): Unit = { - - // Test input data: this is a pathological example in which a naive algorithm would fail due to numerical - // unstability. - val labels = Array(10000000.0, 10000000.1, 10000000.2) - val features = Array(Vector(0.0), Vector(0.1), Vector(0.2)) - - val expected = Map(0 -> 1.0) - - val labelAndFeatures = labels.zip(features) - val computed = LocalDataset.stableComputePearsonCorrelationScore(labelAndFeatures) - - computed.foreach { case (key, value) => - assertEquals( - expected(key), - value, - CommonTestUtils.LOW_PRECISION_TOLERANCE, - s"Computed Pearson correlation score is $value, while the expected value is ${expected(key)}.") - } - } - - /** - * Test feature filtering using the stable Pearson correlation score. - */ - @Test(dependsOnGroups = Array[String]("testPearsonCorrelationScore", "testCore")) - def testFilterFeaturesByPearsonCorrelationScore(): Unit = { - - val numSamples = 10 - val random = new Random(MathConst.RANDOM_SEED) - val labels = Array.fill(numSamples)(if (random.nextDouble() > 0.5) 1.0 else -1.0) - val numFeatures = 10 - // Each data point has 10 features, and each of them is designed as following: - // 0: Intercept - // 1: Positively correlated with the label - // 2: Negatively correlated with the label - // 3: Un-correlated with the label - // 4: Dummy feature 1 - // 5: Dummy feature 2 - // 6-9: Missing features - val intercept = 1.0 - val variance = 0.001 - val featureIndices = Array(0, 1, 2, 3, 4, 5) - val features = Array.tabulate(numSamples) { i => - val featureValues = Array( - intercept, - labels(i) + variance * random.nextGaussian(), - -labels(i) + variance * random.nextGaussian(), - random.nextDouble(), - 1.0, - 1.0) - new SparseVector[Double](featureIndices, featureValues, numFeatures) - } - val localDataset = - LocalDataset( - Array.tabulate(numSamples)(i => (i.toLong, LabeledPoint(labels(i), features(i))))) - - // don't keep any features - val filteredDataPoints0 = localDataset.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep = 0).dataPoints - assertEquals(filteredDataPoints0.length, numSamples) - assertTrue(filteredDataPoints0.forall(_._2.features.activeSize == 0)) - - // keep 1 feature - val filteredDataPoints1 = localDataset.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep = 1).dataPoints - val filteredDataPointsKeySet1 = filteredDataPoints1.flatMap(_._2.features.activeKeysIterator).toSet - assertEquals(filteredDataPoints1.length, numSamples) - assertTrue(filteredDataPoints1.forall(_._2.features.activeSize == 1)) - assertTrue( - filteredDataPointsKeySet1.size == 1 && - (filteredDataPointsKeySet1.contains(0) || - filteredDataPointsKeySet1.contains(4) || - filteredDataPointsKeySet1.contains(5)), - s"$filteredDataPointsKeySet1") - - // keep 3 features - val filteredDataPoints3 = localDataset.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep = 3).dataPoints - val filteredDataPointsKeySet3 = filteredDataPoints3.flatMap(_._2.features.activeKeysIterator).toSet - assertEquals(filteredDataPoints3.length, numSamples) - assertTrue(filteredDataPoints3.forall(_._2.features.activeSize == 3)) - assertTrue( - filteredDataPointsKeySet3.size == 3 && - filteredDataPointsKeySet3.contains(1) && - filteredDataPointsKeySet3.contains(2) && - (filteredDataPointsKeySet3.contains(0) || - filteredDataPointsKeySet3.contains(4) || - filteredDataPointsKeySet3.contains(5)), - s"$filteredDataPointsKeySet3") - - // keep 5 features - val filteredDataPoints5 = localDataset.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep = 5).dataPoints - val filteredDataPointsKeySet5 = filteredDataPoints5.flatMap(_._2.features.activeKeysIterator).toSet - assertEquals(filteredDataPoints5.length, numSamples) - assertTrue(filteredDataPoints5.forall(_._2.features.activeSize == 5)) - assertTrue(filteredDataPointsKeySet5.forall(_ < 6), s"$filteredDataPointsKeySet5") - - // keep all features - val filteredDataPointsAll = localDataset.filterFeaturesByPearsonCorrelationScore(numFeaturesToKeep = numFeatures) - .dataPoints - assertEquals(filteredDataPointsAll.length, numSamples) - assertTrue( - filteredDataPointsAll - .forall(dataPoint => dataPoint._2.features.activeKeysIterator.toSet == Set(0, 1, 2, 3, 4, 5))) - } }