Skip to content

Commit

Permalink
Fix type erasure warnings in files not scheduled to be removed by oth…
Browse files Browse the repository at this point in the history
…er commits
  • Loading branch information
Alex Shelkovnykov authored and ashelkovnykov committed May 18, 2020
1 parent 3d9c1e2 commit ec8764b
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ class GameTransformer(val sc: SparkContext, implicit val logger: Logger) extends
randomEffectTypes: Set[REType],
featureShards: Set[FeatureShardId]): RDD[(UniqueSampleId, GameDatum)] = {

val parallelism = sc.getConf.get("spark.default.parallelism", s"${sc.getExecutorStorageStatus.length * 3}").toInt
val partitioner = new LongHashPartitioner(parallelism)
val idTagSet = randomEffectTypes ++
get(validationEvaluators).map(MultiEvaluatorType.getMultiEvaluatorIdTags).getOrElse(Seq())
val gameDataset = GameConverters
Expand All @@ -220,7 +218,6 @@ class GameTransformer(val sc: SparkContext, implicit val logger: Logger) extends
idTagSet,
isResponseRequired = false,
getOrDefault(inputColumnNames))
.partitionBy(partitioner)
.setName("Game dataset with UIDs for scoring")
.persist(StorageLevel.DISK_ONLY)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ class GameScoringDriverIntegTest extends SparkTestUtils with TestTemplateWithTmp

@DataProvider
def numOutputFilesProvider(): Array[Array[Any]] = Array(
Array(1, 1),
Array(10, 3))
Array(1, 1, 1),
Array(10, 5, 5),
Array(5, 10, 5))

/**
* Test that the scoring job can correctly limit the maximum number of output files.
Expand All @@ -79,12 +80,16 @@ class GameScoringDriverIntegTest extends SparkTestUtils with TestTemplateWithTmp
* @param expectedOutputFiles The expected number of output files
*/
@Test(dataProvider = "numOutputFilesProvider")
def testNumOutputFiles(outputFilesLimit: Int, expectedOutputFiles: Int): Unit = sparkTest("testNumOutputFiles") {
def testNumOutputFiles(
scoringPartitions: Int,
outputFilesLimit: Int,
expectedOutputFiles: Int): Unit = sparkTest("testNumOutputFiles") {

val outputPath = new Path(getTmpDir)
val scoresPath = new Path(outputPath, s"${GameScoringDriver.SCORES_DIR}")
val params = fixedEffectArgs
.put(GameScoringDriver.rootOutputDirectory, outputPath)
.put(GameScoringDriver.scoringPartitions, scoringPartitions)
.put(GameScoringDriver.outputFilesLimit, outputFilesLimit)

runDriver(params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import com.linkedin.photon.ml.cli.game.GameDriver
import com.linkedin.photon.ml.constants.MathConst
import com.linkedin.photon.ml.data.{FixedEffectDataConfiguration, GameConverters, RandomEffectDataConfiguration}
import com.linkedin.photon.ml.data.avro._
import com.linkedin.photon.ml.estimators.GameEstimator
import com.linkedin.photon.ml.evaluation.RMSEEvaluator
import com.linkedin.photon.ml.io.{FeatureShardConfiguration, FixedEffectCoordinateConfiguration, ModelOutputMode, RandomEffectCoordinateConfiguration}
import com.linkedin.photon.ml.normalization.NormalizationType
Expand Down Expand Up @@ -69,8 +68,8 @@ class GameTrainingDriverIntegTest extends SparkTestUtils with GameTestUtils with
/**
* Test GAME training with a fixed effect model only, and an intercept.
*
* @note Intercepts are optional in [[GameEstimator]], but [[GameDriver]] will setup an intercept by default. This
* happens in [[GameDriver.prepareFeatureMapsDefault()]], and there only.
* @note Intercepts are optional in [[com.linkedin.photon.ml.estimators.GameEstimator]], but [[GameDriver]] will setup
* an intercept by default. This happens in [[GameDriver.prepareFeatureMapsDefault()]], and there only.
*/
@Test
def testFixedEffectsWithIntercept(): Unit = sparkTest("testFixedEffectsWithIntercept") {
Expand Down Expand Up @@ -105,8 +104,8 @@ class GameTrainingDriverIntegTest extends SparkTestUtils with GameTestUtils with
* Test GAME training with a fixed effect model only, and an intercept, and no validation, and only the best model is
* output.
*
* @note Intercepts are optional in [[GameEstimator]], but [[GameDriver]] will setup an intercept by default. This
* happens in [[GameDriver.prepareFeatureMapsDefault()]], and there only.
* @note Intercepts are optional in [[com.linkedin.photon.ml.estimators.GameEstimator]], but [[GameDriver]] will setup
* an intercept by default. This happens in [[GameDriver.prepareFeatureMapsDefault()]], and there only.
*/
@Test
def testFixedEffectsWithAdditionalOpts(): Unit = sparkTest("testFixedEffectsWithIntercept") {
Expand Down Expand Up @@ -331,9 +330,6 @@ class GameTrainingDriverIntegTest extends SparkTestUtils with GameTestUtils with
runDriver(mixedEffectSeriousRunArgs.put(GameTrainingDriver.rootOutputDirectory, outputDir))

val globalModelPath = bestModelPath(outputDir, AvroConstants.FIXED_EFFECT, "global")
val userModelPath = bestModelPath(outputDir, AvroConstants.RANDOM_EFFECT, "per-user")
val songModelPath = bestModelPath(outputDir, AvroConstants.RANDOM_EFFECT, "per-song")
val artistModelPath = bestModelPath(outputDir, AvroConstants.RANDOM_EFFECT, "per-artist")
val fs = outputDir.getFileSystem(sc.hadoopConfiguration)

assertTrue(fs.exists(globalModelPath))
Expand Down Expand Up @@ -593,7 +589,7 @@ class GameTrainingDriverIntegTest extends SparkTestUtils with GameTestUtils with
Seq(testPath.toString),
indexMapLoadersOpt,
featureShardConfigs,
numPartitions = 2)
None)
val partitioner = new LongHashPartitioner(testData.rdd.partitions.length)

val gameDataset = GameConverters
Expand Down Expand Up @@ -644,7 +640,7 @@ class GameTrainingDriverIntegTest extends SparkTestUtils with GameTestUtils with
Seq(testPath.toString),
indexMapLoadersOpt,
featureShardConfigs,
numPartitions = 2)
None)
val partitioner = new LongHashPartitioner(testData.rdd.partitions.length)

val gameDataset = GameConverters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.testng.Assert._
import org.testng.annotations.Test

import com.linkedin.photon.ml.Constants
import com.linkedin.photon.ml.index.{IndexMap, PalDBIndexMapLoader}
import com.linkedin.photon.ml.index. PalDBIndexMapLoader
import com.linkedin.photon.ml.io.FeatureShardConfiguration
import com.linkedin.photon.ml.test.{CommonTestUtils, SparkTestUtils}

Expand All @@ -43,7 +43,7 @@ class AvroDataReaderIntegTest extends SparkTestUtils {
val dr = new AvroDataReader()
val dataPath = new Path(INPUT_DIR, "avroMap")
val featureConfigMap = Map("shard1" -> FeatureShardConfiguration(Set("xgboost_click"), hasIntercept = true))
val (df, _) = dr.readMerged(dataPath.toString, featureConfigMap, 1)
val (df, _) = dr.readMerged(dataPath.toString, featureConfigMap, None)

assertEquals(df.count(), 2)
}
Expand All @@ -60,7 +60,7 @@ class AvroDataReaderIntegTest extends SparkTestUtils {
}

/**
* Test reading a [[DataFrame]], using an existing [[IndexMap]].
* Test reading a [[DataFrame]], using an existing [[com.linkedin.photon.ml.index.IndexMap]].
*/
@Test
def testReadWithFeatureIndex(): Unit = sparkTest("testReadWithIndex") {
Expand Down Expand Up @@ -118,7 +118,7 @@ class AvroDataReaderIntegTest extends SparkTestUtils {
assertEquals(df.select(col(shardId)).take(1)(0).getAs[SparseVector](0).numActives, 30)

// Assert that the intercept is not in the IndexMap
assertTrue(indexMapLoaders(shardId).indexMapForDriver().get(Constants.INTERCEPT_KEY).isEmpty)
assertFalse(indexMapLoaders(shardId).indexMapForDriver().contains(Constants.INTERCEPT_KEY))

// Assert that all rows have been read
assertEquals(df.count, 34810)
Expand Down Expand Up @@ -151,7 +151,7 @@ class AvroDataReaderIntegTest extends SparkTestUtils {
@Test(expectedExceptions = Array(classOf[IllegalArgumentException]))
def testReadInvalidPartitions(): Unit = sparkTest("testReadInvalidPartitions") {
val dr = new AvroDataReader()
dr.read(TRAIN_INPUT_PATH.toString, -1)
dr.read(TRAIN_INPUT_PATH.toString, Some(-1))
}

/**
Expand All @@ -172,7 +172,7 @@ object AvroDataReaderIntegTest {
private val TEST_INPUT_PATH = new Path(INPUT_DIR, "test")
private val DUPLICATE_FEATURES_PATH = new Path(INPUT_DIR, "duplicateFeatures")
private val INDEX_MAP_PATH = new Path(INPUT_DIR, "feature-indexes")
private val NUM_PARTITIONS = 4
private val NUM_PARTITIONS = Some(4)
private val FEATURE_SHARD_CONFIGS_MAP = Map(
"shard1" -> FeatureShardConfiguration(Set("userFeatures", "songFeatures"), hasIntercept = true),
"shard2" -> FeatureShardConfiguration(Set("userFeatures"), hasIntercept = true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,35 @@ class AvroDataWriterIntegTest extends SparkTestUtils with TestTemplateWithTmpDir
@Test
def testWrite(): Unit = sparkTest("testRead") {
val dr = new AvroDataReader()
val (df, indexMapLoadersMap) = dr.readMerged(inputPath.toString, featureShardConfigurationsMap, numPartitions)
val (df, indexMapLoadersMap) = dr.readMerged(INPUT_PATH.toString, FEATURE_SHARD_CONFIGURATIONS_MAP, NUM_PARTITIONS)
val outputDir = new Path(getTmpDir)

assertTrue(df.columns.contains(featureColumn))
assertTrue(df.columns.contains(responseColumn))
assertTrue(df.columns.contains(FEATURE_COLUMN))
assertTrue(df.columns.contains(RESPONSE_COLUMN))
assertEquals(df.count, 34810)
assertTrue(indexMapLoadersMap.contains(featureColumn))
assertTrue(indexMapLoadersMap.contains(FEATURE_COLUMN))

val indexMapLoader = indexMapLoadersMap(featureColumn)
val indexMapLoader = indexMapLoadersMap(FEATURE_COLUMN)
val writer = new AvroDataWriter
writer.write(df, outputDir.toString, indexMapLoader, responseColumn, featureColumn, overwrite = true)
writer.write(df, outputDir.toString, indexMapLoader, RESPONSE_COLUMN, FEATURE_COLUMN, overwrite = true)

val fs = FileSystem.get(sc.hadoopConfiguration)
val files = fs.listStatus(outputDir).filter(_.getPath.getName.startsWith("part"))
assertEquals(files.length, numPartitions)
assertEquals(files.length, NUM_PARTITIONS)

val (writeData, _) = dr.read(outputDir.toString, numPartitions)
assertTrue(writeData.columns.contains(responseColumn))
assertTrue(writeData.columns.contains(featureColumn))
val (writeData, _) = dr.read(outputDir.toString, NUM_PARTITIONS)
assertTrue(writeData.columns.contains(RESPONSE_COLUMN))
assertTrue(writeData.columns.contains(FEATURE_COLUMN))
assertEquals(writeData.count(), 34810)
}
}

object AvroDataWriterIntegTest {
private val inputDir = getClass.getClassLoader.getResource("GameIntegTest/input").getPath
private val inputPath = new Path(inputDir, "train")
private val numPartitions = 4
private val featureColumn = "features"
private val responseColumn = "response"
private val featureShardConfigurationsMap = Map(
featureColumn -> FeatureShardConfiguration(Set("userFeatures", "songFeatures"), hasIntercept = false))
private val INPUT_DIR = getClass.getClassLoader.getResource("GameIntegTest/input").getPath
private val INPUT_PATH = new Path(INPUT_DIR, "train")
private val NUM_PARTITIONS = Some(4)
private val FEATURE_COLUMN = "features"
private val RESPONSE_COLUMN = "response"
private val FEATURE_SHARD_CONFIGURATIONS_MAP = Map(
FEATURE_COLUMN -> FeatureShardConfiguration(Set("userFeatures", "songFeatures"), hasIntercept = false))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package com.linkedin.photon.ml.cli.game.scoring
import org.apache.commons.cli.MissingArgumentException
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import org.apache.spark.ml.param.{Param, ParamMap, Params}
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators, Params}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel

import com.linkedin.photon.ml.{Constants, DataValidationType, SparkSessionConfiguration, TaskType}
Expand Down Expand Up @@ -53,6 +53,11 @@ object GameScoringDriver extends GameDriver {
// Parameters
//

val scoringPartitions: Param[Int] = ParamUtils.createParam(
"scoring partitions",
"Number of partitions to use for the data being scored",
ParamValidators.gt[Int](0.0))

val modelId: Param[String] = ParamUtils.createParam(
"model id",
"ID to tag scores with.")
Expand Down Expand Up @@ -206,8 +211,6 @@ object GameScoringDriver extends GameDriver {
featureShardIdToIndexMapLoaderMapOpt: Option[Map[FeatureShardId, IndexMapLoader]])
: (DataFrame, Map[FeatureShardId, IndexMapLoader]) = {

val parallelism = sc.getConf.get("spark.default.parallelism", s"${sc.getExecutorStorageStatus.length * 3}").toInt

// Handle date range input
val dateRangeOpt = IOUtils.resolveRange(get(inputDataDateRange), get(inputDataDaysRange), getOrDefault(timeZone))
val recordsPaths = pathsForDateRange(getRequiredParam(inputDataDirectories), dateRangeOpt)
Expand All @@ -218,7 +221,7 @@ object GameScoringDriver extends GameDriver {
recordsPaths.map(_.toString),
featureShardIdToIndexMapLoaderMapOpt,
getRequiredParam(featureShardConfigurations),
parallelism)
get(scoringPartitions))
}

/**
Expand Down Expand Up @@ -246,7 +249,7 @@ object GameScoringDriver extends GameDriver {
}

val scoredItemsToBeSaved = get(outputFilesLimit) match {
case Some(limit) if limit < scoredItems.partitions.length => scoredItems.coalesce(getOrDefault(outputFilesLimit))
case Some(limit) if limit < scoredItems.partitions.length => scoredItems.coalesce(limit)
case _ => scoredItems
}
val scoresDir = new Path(getRequiredParam(rootOutputDirectory), SCORES_DIR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ object GameTrainingDriver extends GameDriver {
override protected def setDefaultParams(): Unit = {

setDefault(inputColumnNames, InputColumnsNames())
setDefault(minValidationPartitions, 1)
setDefault(outputMode, ModelOutputMode.BEST)
setDefault(overrideOutputDirectory, false)
setDefault(normalization, NormalizationType.NONE)
Expand Down Expand Up @@ -550,7 +549,7 @@ object GameTrainingDriver extends GameDriver {
trainingRecordsPath.map(_.toString),
featureIndexMapLoadersOpt,
getRequiredParam(featureShardConfigurations),
numPartitions)
Some(numPartitions))
}

/**
Expand All @@ -577,7 +576,7 @@ object GameTrainingDriver extends GameDriver {
validationRecordsPath.map(_.toString),
featureIndexMapLoaders,
getRequiredParam(featureShardConfigurations),
getOrDefault(minValidationPartitions))
get(minValidationPartitions))
}

/**
Expand Down
Loading

0 comments on commit ec8764b

Please sign in to comment.