From ec8764bc07d0247874a58bb3926598a0cf31dbd0 Mon Sep 17 00:00:00 2001 From: Alex Shelkovnykov Date: Thu, 20 Feb 2020 09:37:22 -0800 Subject: [PATCH] Fix type erasure warnings in files not scheduled to be removed by other commits --- .../ml/transformers/GameTransformer.scala | 3 - .../scoring/GameScoringDriverIntegTest.scala | 11 +- .../GameTrainingDriverIntegTest.scala | 16 +- .../data/avro/AvroDataReaderIntegTest.scala | 12 +- .../data/avro/AvroDataWriterIntegTest.scala | 34 +-- .../cli/game/scoring/GameScoringDriver.scala | 15 +- .../game/training/GameTrainingDriver.scala | 5 +- .../linkedin/photon/ml/data/DataReader.scala | 208 ++++++++++-------- .../photon/ml/data/avro/AvroDataReader.scala | 34 +-- .../photon/ml/data/avro/AvroDataWriter.scala | 1 + .../com/linkedin/photon/ml/util/Utils.scala | 42 ++-- .../training/GameTrainingDriverTest.scala | 1 - .../ml/data/scoring/ModelDataScores.scala | 2 + 13 files changed, 209 insertions(+), 175 deletions(-) diff --git a/photon-api/src/main/scala/com/linkedin/photon/ml/transformers/GameTransformer.scala b/photon-api/src/main/scala/com/linkedin/photon/ml/transformers/GameTransformer.scala index 82fb6c39..e29e89db 100644 --- a/photon-api/src/main/scala/com/linkedin/photon/ml/transformers/GameTransformer.scala +++ b/photon-api/src/main/scala/com/linkedin/photon/ml/transformers/GameTransformer.scala @@ -209,8 +209,6 @@ class GameTransformer(val sc: SparkContext, implicit val logger: Logger) extends randomEffectTypes: Set[REType], featureShards: Set[FeatureShardId]): RDD[(UniqueSampleId, GameDatum)] = { - val parallelism = sc.getConf.get("spark.default.parallelism", s"${sc.getExecutorStorageStatus.length * 3}").toInt - val partitioner = new LongHashPartitioner(parallelism) val idTagSet = randomEffectTypes ++ get(validationEvaluators).map(MultiEvaluatorType.getMultiEvaluatorIdTags).getOrElse(Seq()) val gameDataset = GameConverters @@ -220,7 +218,6 @@ class GameTransformer(val sc: SparkContext, implicit val logger: Logger) extends idTagSet, isResponseRequired = false, getOrDefault(inputColumnNames)) - .partitionBy(partitioner) .setName("Game dataset with UIDs for scoring") .persist(StorageLevel.DISK_ONLY) diff --git a/photon-client/src/integTest/scala/com/linkedin/photon/ml/cli/game/scoring/GameScoringDriverIntegTest.scala b/photon-client/src/integTest/scala/com/linkedin/photon/ml/cli/game/scoring/GameScoringDriverIntegTest.scala index 9233e911..d0b6e36b 100644 --- a/photon-client/src/integTest/scala/com/linkedin/photon/ml/cli/game/scoring/GameScoringDriverIntegTest.scala +++ b/photon-client/src/integTest/scala/com/linkedin/photon/ml/cli/game/scoring/GameScoringDriverIntegTest.scala @@ -69,8 +69,9 @@ class GameScoringDriverIntegTest extends SparkTestUtils with TestTemplateWithTmp @DataProvider def numOutputFilesProvider(): Array[Array[Any]] = Array( - Array(1, 1), - Array(10, 3)) + Array(1, 1, 1), + Array(10, 5, 5), + Array(5, 10, 5)) /** * Test that the scoring job can correctly limit the maximum number of output files. @@ -79,12 +80,16 @@ class GameScoringDriverIntegTest extends SparkTestUtils with TestTemplateWithTmp * @param expectedOutputFiles The expected number of output files */ @Test(dataProvider = "numOutputFilesProvider") - def testNumOutputFiles(outputFilesLimit: Int, expectedOutputFiles: Int): Unit = sparkTest("testNumOutputFiles") { + def testNumOutputFiles( + scoringPartitions: Int, + outputFilesLimit: Int, + expectedOutputFiles: Int): Unit = sparkTest("testNumOutputFiles") { val outputPath = new Path(getTmpDir) val scoresPath = new Path(outputPath, s"${GameScoringDriver.SCORES_DIR}") val params = fixedEffectArgs .put(GameScoringDriver.rootOutputDirectory, outputPath) + .put(GameScoringDriver.scoringPartitions, scoringPartitions) .put(GameScoringDriver.outputFilesLimit, outputFilesLimit) runDriver(params) diff --git a/photon-client/src/integTest/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriverIntegTest.scala b/photon-client/src/integTest/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriverIntegTest.scala index 5e4cb3b0..1127c7f5 100644 --- a/photon-client/src/integTest/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriverIntegTest.scala +++ b/photon-client/src/integTest/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriverIntegTest.scala @@ -33,7 +33,6 @@ import com.linkedin.photon.ml.cli.game.GameDriver import com.linkedin.photon.ml.constants.MathConst import com.linkedin.photon.ml.data.{FixedEffectDataConfiguration, GameConverters, RandomEffectDataConfiguration} import com.linkedin.photon.ml.data.avro._ -import com.linkedin.photon.ml.estimators.GameEstimator import com.linkedin.photon.ml.evaluation.RMSEEvaluator import com.linkedin.photon.ml.io.{FeatureShardConfiguration, FixedEffectCoordinateConfiguration, ModelOutputMode, RandomEffectCoordinateConfiguration} import com.linkedin.photon.ml.normalization.NormalizationType @@ -69,8 +68,8 @@ class GameTrainingDriverIntegTest extends SparkTestUtils with GameTestUtils with /** * Test GAME training with a fixed effect model only, and an intercept. * - * @note Intercepts are optional in [[GameEstimator]], but [[GameDriver]] will setup an intercept by default. This - * happens in [[GameDriver.prepareFeatureMapsDefault()]], and there only. + * @note Intercepts are optional in [[com.linkedin.photon.ml.estimators.GameEstimator]], but [[GameDriver]] will setup + * an intercept by default. This happens in [[GameDriver.prepareFeatureMapsDefault()]], and there only. */ @Test def testFixedEffectsWithIntercept(): Unit = sparkTest("testFixedEffectsWithIntercept") { @@ -105,8 +104,8 @@ class GameTrainingDriverIntegTest extends SparkTestUtils with GameTestUtils with * Test GAME training with a fixed effect model only, and an intercept, and no validation, and only the best model is * output. * - * @note Intercepts are optional in [[GameEstimator]], but [[GameDriver]] will setup an intercept by default. This - * happens in [[GameDriver.prepareFeatureMapsDefault()]], and there only. + * @note Intercepts are optional in [[com.linkedin.photon.ml.estimators.GameEstimator]], but [[GameDriver]] will setup + * an intercept by default. This happens in [[GameDriver.prepareFeatureMapsDefault()]], and there only. */ @Test def testFixedEffectsWithAdditionalOpts(): Unit = sparkTest("testFixedEffectsWithIntercept") { @@ -331,9 +330,6 @@ class GameTrainingDriverIntegTest extends SparkTestUtils with GameTestUtils with runDriver(mixedEffectSeriousRunArgs.put(GameTrainingDriver.rootOutputDirectory, outputDir)) val globalModelPath = bestModelPath(outputDir, AvroConstants.FIXED_EFFECT, "global") - val userModelPath = bestModelPath(outputDir, AvroConstants.RANDOM_EFFECT, "per-user") - val songModelPath = bestModelPath(outputDir, AvroConstants.RANDOM_EFFECT, "per-song") - val artistModelPath = bestModelPath(outputDir, AvroConstants.RANDOM_EFFECT, "per-artist") val fs = outputDir.getFileSystem(sc.hadoopConfiguration) assertTrue(fs.exists(globalModelPath)) @@ -593,7 +589,7 @@ class GameTrainingDriverIntegTest extends SparkTestUtils with GameTestUtils with Seq(testPath.toString), indexMapLoadersOpt, featureShardConfigs, - numPartitions = 2) + None) val partitioner = new LongHashPartitioner(testData.rdd.partitions.length) val gameDataset = GameConverters @@ -644,7 +640,7 @@ class GameTrainingDriverIntegTest extends SparkTestUtils with GameTestUtils with Seq(testPath.toString), indexMapLoadersOpt, featureShardConfigs, - numPartitions = 2) + None) val partitioner = new LongHashPartitioner(testData.rdd.partitions.length) val gameDataset = GameConverters diff --git a/photon-client/src/integTest/scala/com/linkedin/photon/ml/data/avro/AvroDataReaderIntegTest.scala b/photon-client/src/integTest/scala/com/linkedin/photon/ml/data/avro/AvroDataReaderIntegTest.scala index b832a02e..f1717e95 100644 --- a/photon-client/src/integTest/scala/com/linkedin/photon/ml/data/avro/AvroDataReaderIntegTest.scala +++ b/photon-client/src/integTest/scala/com/linkedin/photon/ml/data/avro/AvroDataReaderIntegTest.scala @@ -24,7 +24,7 @@ import org.testng.Assert._ import org.testng.annotations.Test import com.linkedin.photon.ml.Constants -import com.linkedin.photon.ml.index.{IndexMap, PalDBIndexMapLoader} +import com.linkedin.photon.ml.index. PalDBIndexMapLoader import com.linkedin.photon.ml.io.FeatureShardConfiguration import com.linkedin.photon.ml.test.{CommonTestUtils, SparkTestUtils} @@ -43,7 +43,7 @@ class AvroDataReaderIntegTest extends SparkTestUtils { val dr = new AvroDataReader() val dataPath = new Path(INPUT_DIR, "avroMap") val featureConfigMap = Map("shard1" -> FeatureShardConfiguration(Set("xgboost_click"), hasIntercept = true)) - val (df, _) = dr.readMerged(dataPath.toString, featureConfigMap, 1) + val (df, _) = dr.readMerged(dataPath.toString, featureConfigMap, None) assertEquals(df.count(), 2) } @@ -60,7 +60,7 @@ class AvroDataReaderIntegTest extends SparkTestUtils { } /** - * Test reading a [[DataFrame]], using an existing [[IndexMap]]. + * Test reading a [[DataFrame]], using an existing [[com.linkedin.photon.ml.index.IndexMap]]. */ @Test def testReadWithFeatureIndex(): Unit = sparkTest("testReadWithIndex") { @@ -118,7 +118,7 @@ class AvroDataReaderIntegTest extends SparkTestUtils { assertEquals(df.select(col(shardId)).take(1)(0).getAs[SparseVector](0).numActives, 30) // Assert that the intercept is not in the IndexMap - assertTrue(indexMapLoaders(shardId).indexMapForDriver().get(Constants.INTERCEPT_KEY).isEmpty) + assertFalse(indexMapLoaders(shardId).indexMapForDriver().contains(Constants.INTERCEPT_KEY)) // Assert that all rows have been read assertEquals(df.count, 34810) @@ -151,7 +151,7 @@ class AvroDataReaderIntegTest extends SparkTestUtils { @Test(expectedExceptions = Array(classOf[IllegalArgumentException])) def testReadInvalidPartitions(): Unit = sparkTest("testReadInvalidPartitions") { val dr = new AvroDataReader() - dr.read(TRAIN_INPUT_PATH.toString, -1) + dr.read(TRAIN_INPUT_PATH.toString, Some(-1)) } /** @@ -172,7 +172,7 @@ object AvroDataReaderIntegTest { private val TEST_INPUT_PATH = new Path(INPUT_DIR, "test") private val DUPLICATE_FEATURES_PATH = new Path(INPUT_DIR, "duplicateFeatures") private val INDEX_MAP_PATH = new Path(INPUT_DIR, "feature-indexes") - private val NUM_PARTITIONS = 4 + private val NUM_PARTITIONS = Some(4) private val FEATURE_SHARD_CONFIGS_MAP = Map( "shard1" -> FeatureShardConfiguration(Set("userFeatures", "songFeatures"), hasIntercept = true), "shard2" -> FeatureShardConfiguration(Set("userFeatures"), hasIntercept = true), diff --git a/photon-client/src/integTest/scala/com/linkedin/photon/ml/data/avro/AvroDataWriterIntegTest.scala b/photon-client/src/integTest/scala/com/linkedin/photon/ml/data/avro/AvroDataWriterIntegTest.scala index d669d7eb..f4fb8229 100644 --- a/photon-client/src/integTest/scala/com/linkedin/photon/ml/data/avro/AvroDataWriterIntegTest.scala +++ b/photon-client/src/integTest/scala/com/linkedin/photon/ml/data/avro/AvroDataWriterIntegTest.scala @@ -31,35 +31,35 @@ class AvroDataWriterIntegTest extends SparkTestUtils with TestTemplateWithTmpDir @Test def testWrite(): Unit = sparkTest("testRead") { val dr = new AvroDataReader() - val (df, indexMapLoadersMap) = dr.readMerged(inputPath.toString, featureShardConfigurationsMap, numPartitions) + val (df, indexMapLoadersMap) = dr.readMerged(INPUT_PATH.toString, FEATURE_SHARD_CONFIGURATIONS_MAP, NUM_PARTITIONS) val outputDir = new Path(getTmpDir) - assertTrue(df.columns.contains(featureColumn)) - assertTrue(df.columns.contains(responseColumn)) + assertTrue(df.columns.contains(FEATURE_COLUMN)) + assertTrue(df.columns.contains(RESPONSE_COLUMN)) assertEquals(df.count, 34810) - assertTrue(indexMapLoadersMap.contains(featureColumn)) + assertTrue(indexMapLoadersMap.contains(FEATURE_COLUMN)) - val indexMapLoader = indexMapLoadersMap(featureColumn) + val indexMapLoader = indexMapLoadersMap(FEATURE_COLUMN) val writer = new AvroDataWriter - writer.write(df, outputDir.toString, indexMapLoader, responseColumn, featureColumn, overwrite = true) + writer.write(df, outputDir.toString, indexMapLoader, RESPONSE_COLUMN, FEATURE_COLUMN, overwrite = true) val fs = FileSystem.get(sc.hadoopConfiguration) val files = fs.listStatus(outputDir).filter(_.getPath.getName.startsWith("part")) - assertEquals(files.length, numPartitions) + assertEquals(files.length, NUM_PARTITIONS) - val (writeData, _) = dr.read(outputDir.toString, numPartitions) - assertTrue(writeData.columns.contains(responseColumn)) - assertTrue(writeData.columns.contains(featureColumn)) + val (writeData, _) = dr.read(outputDir.toString, NUM_PARTITIONS) + assertTrue(writeData.columns.contains(RESPONSE_COLUMN)) + assertTrue(writeData.columns.contains(FEATURE_COLUMN)) assertEquals(writeData.count(), 34810) } } object AvroDataWriterIntegTest { - private val inputDir = getClass.getClassLoader.getResource("GameIntegTest/input").getPath - private val inputPath = new Path(inputDir, "train") - private val numPartitions = 4 - private val featureColumn = "features" - private val responseColumn = "response" - private val featureShardConfigurationsMap = Map( - featureColumn -> FeatureShardConfiguration(Set("userFeatures", "songFeatures"), hasIntercept = false)) + private val INPUT_DIR = getClass.getClassLoader.getResource("GameIntegTest/input").getPath + private val INPUT_PATH = new Path(INPUT_DIR, "train") + private val NUM_PARTITIONS = Some(4) + private val FEATURE_COLUMN = "features" + private val RESPONSE_COLUMN = "response" + private val FEATURE_SHARD_CONFIGURATIONS_MAP = Map( + FEATURE_COLUMN -> FeatureShardConfiguration(Set("userFeatures", "songFeatures"), hasIntercept = false)) } diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/scoring/GameScoringDriver.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/scoring/GameScoringDriver.scala index 55f0a458..0f286238 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/scoring/GameScoringDriver.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/scoring/GameScoringDriver.scala @@ -17,8 +17,8 @@ package com.linkedin.photon.ml.cli.game.scoring import org.apache.commons.cli.MissingArgumentException import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext -import org.apache.spark.ml.param.{Param, ParamMap, Params} -import org.apache.spark.sql.DataFrame +import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators, Params} +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.storage.StorageLevel import com.linkedin.photon.ml.{Constants, DataValidationType, SparkSessionConfiguration, TaskType} @@ -53,6 +53,11 @@ object GameScoringDriver extends GameDriver { // Parameters // + val scoringPartitions: Param[Int] = ParamUtils.createParam( + "scoring partitions", + "Number of partitions to use for the data being scored", + ParamValidators.gt[Int](0.0)) + val modelId: Param[String] = ParamUtils.createParam( "model id", "ID to tag scores with.") @@ -206,8 +211,6 @@ object GameScoringDriver extends GameDriver { featureShardIdToIndexMapLoaderMapOpt: Option[Map[FeatureShardId, IndexMapLoader]]) : (DataFrame, Map[FeatureShardId, IndexMapLoader]) = { - val parallelism = sc.getConf.get("spark.default.parallelism", s"${sc.getExecutorStorageStatus.length * 3}").toInt - // Handle date range input val dateRangeOpt = IOUtils.resolveRange(get(inputDataDateRange), get(inputDataDaysRange), getOrDefault(timeZone)) val recordsPaths = pathsForDateRange(getRequiredParam(inputDataDirectories), dateRangeOpt) @@ -218,7 +221,7 @@ object GameScoringDriver extends GameDriver { recordsPaths.map(_.toString), featureShardIdToIndexMapLoaderMapOpt, getRequiredParam(featureShardConfigurations), - parallelism) + get(scoringPartitions)) } /** @@ -246,7 +249,7 @@ object GameScoringDriver extends GameDriver { } val scoredItemsToBeSaved = get(outputFilesLimit) match { - case Some(limit) if limit < scoredItems.partitions.length => scoredItems.coalesce(getOrDefault(outputFilesLimit)) + case Some(limit) if limit < scoredItems.partitions.length => scoredItems.coalesce(limit) case _ => scoredItems } val scoresDir = new Path(getRequiredParam(rootOutputDirectory), SCORES_DIR) diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala index 2e7a3a74..3f3bc882 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriver.scala @@ -213,7 +213,6 @@ object GameTrainingDriver extends GameDriver { override protected def setDefaultParams(): Unit = { setDefault(inputColumnNames, InputColumnsNames()) - setDefault(minValidationPartitions, 1) setDefault(outputMode, ModelOutputMode.BEST) setDefault(overrideOutputDirectory, false) setDefault(normalization, NormalizationType.NONE) @@ -550,7 +549,7 @@ object GameTrainingDriver extends GameDriver { trainingRecordsPath.map(_.toString), featureIndexMapLoadersOpt, getRequiredParam(featureShardConfigurations), - numPartitions) + Some(numPartitions)) } /** @@ -577,7 +576,7 @@ object GameTrainingDriver extends GameDriver { validationRecordsPath.map(_.toString), featureIndexMapLoaders, getRequiredParam(featureShardConfigurations), - getOrDefault(minValidationPartitions)) + get(minValidationPartitions)) } /** diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/data/DataReader.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/data/DataReader.scala index a412c2df..dcb46c41 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/data/DataReader.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/data/DataReader.scala @@ -38,66 +38,73 @@ abstract class DataReader( defaultFeatureColumn -> FeatureShardConfiguration(Set(defaultFeatureColumn), defaultIntercept)) /** - * Reads the file at the given path into a DataFrame, assuming the default feature vector. + * Reads the file at the given path into a [[DataFrame]], assuming the default feature vector. * * @param path The path to the file or folder - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ - def read(path: String, numPartitions: Int): (DataFrame, IndexMapLoader) = read(Seq(path), numPartitions) + def read(path: String, numPartitionsOpt: Option[Int]): (DataFrame, IndexMapLoader) = read(Seq(path), numPartitionsOpt) /** * Reads the file at the given path into a DataFrame, assuming the default feature vector. * * @param path The path to the file or folder - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @param intercept Whether to add a feature representing the intercept to the feature vector * @return The loaded and transformed DataFrame */ - def read(path: String, numPartitions: Int, intercept: Boolean): (DataFrame, IndexMapLoader) = - read(Seq(path), numPartitions, intercept) + def read(path: String, numPartitionsOpt: Option[Int], intercept: Boolean): (DataFrame, IndexMapLoader) = + read(Seq(path), numPartitionsOpt, intercept) /** * Reads the file at the given path into a DataFrame, using the given index map for feature names. * * @param path The path to the file or folder * @param indexMapLoaders A map of index map loaders, containing one loader for each merged feature column - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ - def read(path: String, indexMapLoaders: Map[MergedColumnName, IndexMapLoader], numPartitions: Int): DataFrame = - read(Seq(path), indexMapLoaders, numPartitions) + def read( + path: String, + indexMapLoaders: Map[MergedColumnName, IndexMapLoader], + numPartitionsOpt: Option[Int]): DataFrame = + read(Seq(path), indexMapLoaders, numPartitionsOpt) /** * Reads the files at the given paths into a DataFrame, assuming the default feature vector. * * @param paths The paths to the files or folders - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ - def read(paths: Seq[String], numPartitions: Int): (DataFrame, IndexMapLoader) = { + def read(paths: Seq[String], numPartitionsOpt: Option[Int]): (DataFrame, IndexMapLoader) = { - val (data, indexMapLoaders) = readMerged(paths, defaultFeatureConfigMap, numPartitions) + val (data, indexMapLoaders) = readMerged(paths, defaultFeatureConfigMap, numPartitionsOpt) (data, indexMapLoaders(defaultFeatureColumn)) } @@ -106,19 +113,20 @@ abstract class DataReader( * Reads the files at the given paths into a DataFrame, assuming the default feature vector. * * @param paths The paths to the files or folders - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @param intercept Whether to add a feature representing the intercept to the feature vector * @return The loaded and transformed DataFrame */ - def read(paths: Seq[String], numPartitions: Int, intercept: Boolean): (DataFrame, IndexMapLoader) = { + def read(paths: Seq[String], numPartitionsOpt: Option[Int], intercept: Boolean): (DataFrame, IndexMapLoader) = { val featureConfigMap = Map(defaultFeatureColumn -> FeatureShardConfiguration(Set(defaultFeatureColumn), intercept)) - val (data, indexMapLoaders) = readMerged(paths, featureConfigMap, numPartitions) + val (data, indexMapLoaders) = readMerged(paths, featureConfigMap, numPartitionsOpt) (data, indexMapLoaders(defaultFeatureColumn)) } @@ -128,16 +136,20 @@ abstract class DataReader( * * @param paths The paths to the files or folders * @param indexMapLoaders A map of index map loaders, containing one loader for each merged feature column - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ - def read(paths: Seq[String], indexMapLoaders: Map[MergedColumnName, IndexMapLoader], numPartitions: Int): DataFrame = - readMerged(paths, indexMapLoaders, defaultFeatureConfigMap, numPartitions) + def read( + paths: Seq[String], + indexMapLoaders: Map[MergedColumnName, IndexMapLoader], + numPartitionsOpt: Option[Int]): DataFrame = + readMerged(paths, indexMapLoaders, defaultFeatureConfigMap, numPartitionsOpt) /** * Reads the file at the given path into a DataFrame, using the given index map for feature names. Merges source @@ -156,20 +168,21 @@ abstract class DataReader( * * This configuration merges the "profileFeatures" and "titleFeatures" columns into a * single column named "userFeatures". - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ def readMerged( path: String, indexMapLoadersOpt: Option[Map[MergedColumnName, IndexMapLoader]], featureColumnConfigsMap: Map[MergedColumnName, FeatureShardConfiguration], - numPartitions: Int): (DataFrame, Map[MergedColumnName, IndexMapLoader]) = - readMerged(Seq(path), indexMapLoadersOpt, featureColumnConfigsMap, numPartitions) + numPartitionsOpt: Option[Int]): (DataFrame, Map[MergedColumnName, IndexMapLoader]) = + readMerged(Seq(path), indexMapLoadersOpt, featureColumnConfigsMap, numPartitionsOpt) /** * Reads the file at the given path into a DataFrame, using the given index map for feature names. Merges source @@ -188,25 +201,26 @@ abstract class DataReader( * * This configuration merges the "profileFeatures" and "titleFeatures" columns into a * single column named "userFeatures". - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ def readMerged( paths: Seq[String], indexMapLoadersOpt: Option[Map[MergedColumnName, IndexMapLoader]], featureColumnConfigsMap: Map[MergedColumnName, FeatureShardConfiguration], - numPartitions: Int): (DataFrame, Map[MergedColumnName, IndexMapLoader]) = + numPartitionsOpt: Option[Int]): (DataFrame, Map[MergedColumnName, IndexMapLoader]) = indexMapLoadersOpt match { case Some(indexMapLoaders) => - (readMerged(paths, indexMapLoaders, featureColumnConfigsMap, numPartitions), indexMapLoaders) + (readMerged(paths, indexMapLoaders, featureColumnConfigsMap, numPartitionsOpt), indexMapLoaders) case None => - readMerged(paths, featureColumnConfigsMap, numPartitions) + readMerged(paths, featureColumnConfigsMap, numPartitionsOpt) } /** @@ -224,19 +238,20 @@ abstract class DataReader( * * This configuration merges the "profileFeatures" and "titleFeatures" columns into a * single column named "userFeatures". - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ def readMerged( path: String, featureColumnConfigsMap: Map[MergedColumnName, FeatureShardConfiguration], - numPartitions: Int): (DataFrame, Map[MergedColumnName, IndexMapLoader]) = - readMerged(Seq(path), featureColumnConfigsMap, numPartitions) + numPartitionsOpt: Option[Int]): (DataFrame, Map[MergedColumnName, IndexMapLoader]) = + readMerged(Seq(path), featureColumnConfigsMap, numPartitionsOpt) /** * Reads the file at the given path into a DataFrame, using the given index map for feature names. Merges source @@ -254,20 +269,21 @@ abstract class DataReader( * * This configuration merges the "profileFeatures" and "titleFeatures" columns into a * single column named "userFeatures". - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ def readMerged( path: String, indexMapLoaders: Map[MergedColumnName, IndexMapLoader], featureColumnConfigsMap: Map[MergedColumnName, FeatureShardConfiguration], - numPartitions: Int): DataFrame = - readMerged(Seq(path), indexMapLoaders, featureColumnConfigsMap, numPartitions) + numPartitionsOpt: Option[Int]): DataFrame = + readMerged(Seq(path), indexMapLoaders, featureColumnConfigsMap, numPartitionsOpt) /** * Reads the files at the given paths into a DataFrame, generating a default index map for feature names. Merges @@ -284,18 +300,19 @@ abstract class DataReader( * * This configuration merges the "profileFeatures" and "titleFeatures" columns into a * single column named "userFeatures". - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ def readMerged( paths: Seq[String], featureColumnConfigsMap: Map[MergedColumnName, FeatureShardConfiguration], - numPartitions: Int): (DataFrame, Map[MergedColumnName, IndexMapLoader]) + numPartitionsOpt: Option[Int]): (DataFrame, Map[MergedColumnName, IndexMapLoader]) /** * Reads the files at the given paths into a DataFrame, using the given index map for feature names. Merges source @@ -313,17 +330,18 @@ abstract class DataReader( * * This configuration merges the "profileFeatures" and "titleFeatures" columns into a * single column named "userFeatures". - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ def readMerged( paths: Seq[String], indexMapLoaders: Map[MergedColumnName, IndexMapLoader], featureColumnConfigsMap: Map[MergedColumnName, FeatureShardConfiguration], - numPartitions: Int): DataFrame + numPartitionsOpt: Option[Int]): DataFrame } diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroDataReader.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroDataReader.scala index a415a138..6f87481a 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroDataReader.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroDataReader.scala @@ -74,18 +74,21 @@ class AvroDataReader(defaultFeatureColumn: String = InputColumnsNames.FEATURES_D * * This configuration merges the "profileFeatures" and "titleFeatures" columns into a * single column named "userFeatures". - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ override def readMerged( paths: Seq[String], featureColumnConfigsMap: Map[MergedColumnName, FeatureShardConfiguration], - numPartitions: Int): (DataFrame, Map[MergedColumnName, IndexMapLoader]) = { + numPartitionsOpt: Option[Int]): (DataFrame, Map[MergedColumnName, IndexMapLoader]) = { + + val numPartitions = numPartitionsOpt.getOrElse(SparkSession.builder.getOrCreate.sparkContext.defaultParallelism) require(paths.nonEmpty, "No paths specified. You must specify at least one input path.") require(numPartitions >= 0, "Partition count cannot be negative.") @@ -114,19 +117,22 @@ class AvroDataReader(defaultFeatureColumn: String = InputColumnsNames.FEATURES_D * * This configuration merges the "profileFeatures" and "titleFeatures" columns into a * single column named "userFeatures". - * @param numPartitions The minimum number of partitions. Spark is generally moving away from manually specifying - * partition counts like this, in favor of inferring it. However, Photon currently still exposes - * partition counts as a means for tuning job performance. The auto-inferred counts are usually - * much lower than the necessary counts for Photon (especially GAME), so this caused a lot of - * shuffling when repartitioning from the auto-partitioned data to the GAME data. We expose this - * setting here to avoid the shuffling. + * @param numPartitionsOpt An optional minimum number of partitions for the [[DataFrame]]. If no minimum is provided, + * the default parallelism is used. Spark is generally moving away from manually specifying + * partition counts like this, in favor of inferring it. However, Photon still exposes + * partition counts as a means for tuning job performance. The auto-inferred counts are + * usually much lower than the necessary counts for Photon (especially GAME). This causes a + * lot of shuffling when repartitioning from the auto-partitioned data to the processed GAME + * data. This setting is exposed to allow tuning which can avoid the shuffling. * @return The loaded and transformed DataFrame */ override def readMerged( paths: Seq[String], indexMapLoaders: Map[MergedColumnName, IndexMapLoader], featureColumnConfigsMap: Map[MergedColumnName, FeatureShardConfiguration], - numPartitions: Int): DataFrame = { + numPartitionsOpt: Option[Int]): DataFrame = { + + val numPartitions = numPartitionsOpt.getOrElse(SparkSession.builder.getOrCreate.sparkContext.defaultParallelism) require(paths.nonEmpty, "No paths specified. You must specify at least one input path.") require(numPartitions >= 0, "Partition count cannot be negative.") diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroDataWriter.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroDataWriter.scala index 001c14ae..91cd4ad5 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroDataWriter.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/data/avro/AvroDataWriter.scala @@ -70,6 +70,7 @@ class AvroDataWriter { val response = getValueAsDouble(r, responseColumn) val offset = if (hasOffset) getValueAsDouble(r, OFFSET) else DEFAULTS(OFFSET) val weight = if (hasWeight) getValueAsDouble(r, WEIGHT) else DEFAULTS(WEIGHT) + rowBuilder .setResponse(response) .setOffset(offset) diff --git a/photon-client/src/main/scala/com/linkedin/photon/ml/util/Utils.scala b/photon-client/src/main/scala/com/linkedin/photon/ml/util/Utils.scala index 76f92a23..0ea9797d 100644 --- a/photon-client/src/main/scala/com/linkedin/photon/ml/util/Utils.scala +++ b/photon-client/src/main/scala/com/linkedin/photon/ml/util/Utils.scala @@ -120,28 +120,36 @@ object Utils { * Fetch the java map from an Avro map field. * * @param record The Avro generic record - * @param key The field key + * @param field The field key * @return A java map of String -> Object */ def getMapAvro( record: GenericRecord, - key: String, + field: String, isNullOK: Boolean = false): Map[String, JObject] = { - type T = java.util.Map[Any, JObject] // to avoid type erasure warning - record.get(key) match { - case map: T => map.asScala.map { - case (k, value) => (k.toString, value match { - // Need to convert Utf8 values to String here, because otherwise we get schema casting errors and misleading - // equivalence failures downstream. - case s@(_: Utf8 | _: JString) => s.toString - case x@(_: Number | _: JBoolean) => x - case _ => null - }) - }.filter(_._2 != null).toMap - - case obj: JObject => throw new IllegalArgumentException(s"$obj is not map type.") - case _ => if (isNullOK) null else throw new IllegalArgumentException(s"field $key is null") + val map = record.get(field).asInstanceOf[java.util.Map[Any, JObject]] + + if (map == null && isNullOK) { + null + } else if (map == null) { + throw new IllegalArgumentException(s"field '$field' is null") + } else { + map + .asScala + .flatMap { case (key, value) => + + val keyString = key.toString + + value match { + // Need to convert Utf8 values to String here, because otherwise we get schema casting errors and misleading + // equivalence failures downstream. + case s@(_: Utf8 | _: JString) => Some((keyString, s.toString)) + case x@(_: Number | _: JBoolean) => Some((keyString, x)) + case _ => None + } + } + .toMap } } @@ -291,7 +299,7 @@ object Utils { @throws(classOf[IllegalArgumentException]) def getKeyFromMapOrElse[T](map: Map[String, Any], key: String, elseBranch: Either[String, T]): T = { map.get(key) match { - case Some(x: T) => x // type erasure warning here + case Some(x) => x.asInstanceOf[T] case _ => elseBranch match { case Left(errorMsg) => throw new IllegalArgumentException(errorMsg) diff --git a/photon-client/src/test/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriverTest.scala b/photon-client/src/test/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriverTest.scala index 13fe4ed6..88324bab 100644 --- a/photon-client/src/test/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriverTest.scala +++ b/photon-client/src/test/scala/com/linkedin/photon/ml/cli/game/training/GameTrainingDriverTest.scala @@ -259,7 +259,6 @@ class GameTrainingDriverTest { GameTrainingDriver.clear() GameTrainingDriver.getOrDefault(GameTrainingDriver.inputColumnNames) - GameTrainingDriver.getOrDefault(GameTrainingDriver.minValidationPartitions) GameTrainingDriver.getOrDefault(GameTrainingDriver.outputMode) GameTrainingDriver.getOrDefault(GameTrainingDriver.overrideOutputDirectory) GameTrainingDriver.getOrDefault(GameTrainingDriver.normalization) diff --git a/photon-lib/src/main/scala/com/linkedin/photon/ml/data/scoring/ModelDataScores.scala b/photon-lib/src/main/scala/com/linkedin/photon/ml/data/scoring/ModelDataScores.scala index 798770e6..375bc483 100644 --- a/photon-lib/src/main/scala/com/linkedin/photon/ml/data/scoring/ModelDataScores.scala +++ b/photon-lib/src/main/scala/com/linkedin/photon/ml/data/scoring/ModelDataScores.scala @@ -52,6 +52,8 @@ class ModelDataScores(override val scoresRdd: RDD[(UniqueSampleId, ScoredGameDat case (Some(thisScore), Some(thatScore)) => op(thisScore, thatScore) case (Some(thisScore), None) => op(thisScore, thisScore.copy(score = MathConst.DEFAULT_SCORE)) case (None, Some(thatScore)) => op(thatScore.copy(score = MathConst.DEFAULT_SCORE), thatScore) + // Only included so that Scala doesn't throw a compiler warning. Obviously, this case can never happen + case (None, None) => throw new UnsupportedOperationException("No scores to merge") } })